1. Cấu hình Row-level Security và Column-level Security
Thiết lập chính sách Row-level Security (RLS)
Chúng ta sẽ áp dụng bộ lọc dữ liệu dựa trên vai trò người dùng để giới hạn hàng (row) có thể truy cập được trong bảng Iceberg.
Sử dụng cơ chế `filter` trong Spark SQL kết hợp với biến môi trường hoặc session variable để định danh người dùng, sau đó áp dụng vào biểu thức truy vấn.
Giả sử bảng `customer_orders` có trường `region` và `customer_id`. Người dùng từ region "VN" chỉ được xem dữ liệu của mình.
spark-sql --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg.type=hive \
--conf spark.sql.catalog.iceberg.properties=spark.sql.catalog.iceberg.properties \
-e "
CREATE OR REPLACE VIEW customer_orders_rls AS
SELECT * FROM iceberg.customer_orders
WHERE region = 'VN' AND customer_id = current_user();
"
Kết quả mong đợi: View `customer_orders_rls` được tạo thành công. Khi truy vấn view này, Spark tự động áp dụng điều kiện lọc dựa trên user hiện tại, đảm bảo dữ liệu vùng khác bị ẩn.
Thiết lập Column-level Security (CLS)
Để che giấu các cột nhạy cảm (PII) như `ssn` hoặc `credit_card` khỏi người dùng không có quyền, ta sử dụng hàm `mask` hoặc tạo view chỉ chọn các cột an toàn.
Tạo một view mới ẩn cột nhạy cảm bằng cách thay thế giá trị bằng chuỗi dấu sao hoặc NULL.
spark-sql --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg.type=hive \
-e "
CREATE OR REPLACE VIEW customer_orders_masked AS
SELECT
order_id,
region,
order_date,
CASE
WHEN current_user() IN ('admin', 'data_engineer') THEN ssn
ELSE '***'
END AS ssn,
total_amount
FROM iceberg.customer_orders;
"
Kết quả mong đợi: View `customer_orders_masked` được tạo. Người dùng thường thấy `***` ở cột `ssn`, trong khi admin vẫn thấy giá trị thực.
Verify RLS và CLS
Chạy truy vấn dưới vai trò người dùng khác nhau để xác minh quyền truy cập.
spark-sql --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg.type=hive \
-e "SELECT * FROM iceberg.customer_orders_masked WHERE order_id = 123;"
Kết quả mong đợi: Cột `ssn` hiển thị `***` nếu user hiện tại không phải admin. Nếu chạy với user admin, giá trị thực sẽ hiện ra.
2. Tích hợp Kerberos cho xác thực Spark
Cấu hình Client Spark để kết nối qua Kerberos
Kerberos là chuẩn vàng cho xác thực trong môi trường Hadoop/Spark. Ta cần cấu hình `spark-defaults.conf` để Spark sử dụng cơ chế xác thực Kerberos khi kết nối đến Hive Metastore hoặc Object Storage (S3/HDFS).
Định nghĩa các tham số chính: principal, keytab path, và principal của Hive Metastore.
Đường dẫn file cấu hình: `/etc/spark/conf/spark-defaults.conf`
Nội dung hoàn chỉnh của file:
spark.kerberos.principal=hadoop/spark@EXAMPLE.COM
spark.kerberos.keytab=/etc/security/keytabs/spark.headless.keytab
spark.kerberos.minRenewInterval=3600
spark.kerberos.renewalInterval=3600
spark.hadoop.hive.metastore.sasl.enabled=true
spark.hadoop.hive.metastore.principal=hive/_HOST@EXAMPLE.COM
spark.hadoop.hive.metastore.kerberos.principal=hive/metastore@EXAMPLE.COM
spark.hadoop.hive.server2.authentication=KERBEROS
spark.hadoop.hive.server2.authentication.kerberos.principal=hive/_HOST@EXAMPLE.COM
spark.yarn.principal=yarn@EXAMPLE.COM
spark.yarn.keytab=/etc/security/keytabs/yarn.headless.keytab
Kết quả mong đợi: Khi khởi động Spark, nó sẽ tự động đọc file keytab và thực hiện handshake với KDC (Key Distribution Center) để lấy Ticket Granting Ticket (TGT).
Cấu hình Environment Variables cho Kubernetes
Trong môi trường Kubernetes, chúng ta cần inject các biến môi trường liên quan đến Kerberos vào Pod Spark Driver và Executor.
Cấu hình trong file `spark-submit` hoặc Kubernetes Deployment YAML để trỏ đến volume chứa keytab.
export SPARK_HOME=/opt/spark
export KRB5_CONFIG=/etc/krb5.conf
export KRB5CCNAME=/tmp/spark.kcache
# Khởi động Spark với biến môi trường Kerberos
$SPARK_HOME/bin/spark-submit \
--conf spark.kerberos.principal=spark@EXAMPLE.COM \
--conf spark.kerberos.keytab=/etc/security/keytabs/spark.headless.keytab \
--conf spark.kerberos.minRenewInterval=3600 \
--conf spark.kerberos.renewalInterval=3600 \
--conf spark.hadoop.hive.metastore.sasl.enabled=true \
--conf spark.hadoop.hive.metastore.principal=hive/_HOST@EXAMPLE.COM \
--class org.apache.spark.sql.hive.HiveContext \
local://path/to/your-app.jar
Kết quả mong đợi: Process Spark chạy mà không bị lỗi "GSSAuthentication failed" hoặc "User not authenticated".
Verify Kerberos Authentication
Sử dụng lệnh `klist` để kiểm tra xem ticket Kerberos đã được cấp chưa.
klist -k
klist
Kết quả mong đợi: Lệnh `klist` hiển thị danh sách các ticket đã cấp (TGT) cho principal `spark@EXAMPLE.COM` với thời gian hết hạn hợp lệ.
3. Mã hóa dữ liệu At-rest và In-transit
Mã hóa dữ liệu At-rest trên Object Storage (S3)
Dữ liệu trong Iceberg/Delta được lưu dưới dạng file Parquet/ORC trên S3. Để mã hóa at-rest, ta cấu hình bucket S3 sử dụng Server-Side Encryption (SSE-S3 hoặc SSE-KMS).
Cấu hình policy của bucket để bắt buộc tất cả các request upload phải có encryption.
aws s3api put-bucket-encryption \
--bucket my-lakehouse-bucket \
--server-side-encryption-configuration '{
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "arn:aws:kms:region:account-id:key/key-id"
},
"BucketKeyEnabled": true
}
]
}'
Kết quả mong đợi: Bucket được cấu hình mã hóa. Khi Spark viết dữ liệu vào bucket, S3 sẽ tự động mã hóa file trước khi lưu vào disk.
Mã hóa dữ liệu In-transit giữa Spark và Catalog/Storage
Để bảo vệ dữ liệu khi truyền tải, ta cần bật SSL/TLS cho kết nối giữa Spark Driver/Executor và Hive Metastore, cũng như Object Storage.
Cấu hình `core-site.xml` và `hive-site.xml` để bật SSL.
Đường dẫn: `/etc/hadoop/conf/core-site.xml`
fs.s3a.https.enabled
true
fs.s3a.path.style.access
true
fs.s3a.connection.ssl.enabled
true
Đường dẫn: `/etc/hadoop/conf/hive-site.xml`
hive.metastore.uris
thrift://hive-metastore:9083
hive.metastore.sasl.enabled
true
hive.metastore.client.transport.mode
binary
hive.metastore.client.socket.timeout
10000
hive.server2.authentication
KERBEROS
hive.server2.authentication.kerberos.principal
hive/_HOST@EXAMPLE.COM
Kết quả mong đợi: Tất cả lưu lượng truy cập đến Hive Metastore và S3 đều được bảo mật qua HTTPS và SASL/SSL.
Verify Encryption
Khởi tạo một job Spark nhỏ để ghi dữ liệu, sau đó kiểm tra headers của request hoặc metadata của file trên S3.
aws s3api head-object --bucket my-lakehouse-bucket --key iceberg/customer_orders/v1/data/file-00001.parquet
Kết quả mong đợi: Trong output JSON, trường `ServerSideEncryption` phải hiện giá trị `aws:kms` hoặc `AES256`.
4. Đấu nhật hoạt động truy cập (Audit Logs)
Cấu hình Audit Log trong Spark
Để ghi lại ai đã truy cập bảng nào, khi nào, và thực hiện thao tác gì (SELECT, INSERT, DELETE), ta cần bật Spark Audit Logging.
Sử dụng `org.apache.spark.sql.utils.QueryExecutionListener` hoặc cấu hình custom listener để ghi log vào file hoặc bảng metadata.
Đường dẫn: `/etc/spark/conf/spark-defaults.conf` (thêm vào cuối file)
spark.sql.queryExecutionListeners=org.apache.spark.sql.utils.QueryExecutionListener
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.ui.enabled=true
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs:///var/log/spark-events
Kết quả mong đợi: Spark bắt đầu ghi lại sự kiện query execution vào thư mục `spark-event-logs`. Các log này có thể được phân tích sau.
Viết Audit Log vào bảng Metadata (Iceberg Table)
Thay vì chỉ ghi log vào file text, ta sẽ tạo một bảng Iceberg `audit_logs` và tự động chèn bản ghi vào bảng này sau mỗi query quan trọng.
Viết một script Spark (Python hoặc Scala) chạy như một scheduled job để đọc event logs và chuyển đổi thành bản ghi trong bảng `audit_logs`.
spark-submit \
--conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg.type=hive \
--class com.example.AuditLogger \
audit-logger.jar
Logic bên trong `AuditLogger` (mô tả):
- Đọc file event log từ HDFS/S3.
- Parse cấu trúc `QueryExecution` để lấy `userName`, `queryText`, `startTime`, `endTime`, `tableAccessed`.
- Thực hiện `INSERT INTO iceberg.audit_logs VALUES (...)`.
Tạo bảng lưu trữ audit log:
spark-sql --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg.type=hive \
-e "
CREATE TABLE IF NOT EXISTS iceberg.audit_logs (
audit_id BIGINT GENERATED ALWAYS AS IDENTITY,
user_name STRING,
operation STRING,
table_name STRING,
query_text STRING,
start_time TIMESTAMP,
end_time TIMESTAMP,
status STRING,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
) USING iceberg;
"
Kết quả mong đợi: Bảng `audit_logs` được tạo trong catalog Iceberg. Dữ liệu audit được lưu trữ dưới dạng Parquet có versioning (time travel).
Verify Audit Logs
Thực hiện một truy vấn test trên bảng dữ liệu, sau đó kiểm tra bảng `audit_logs` để xem bản ghi có được tạo không.
spark-sql --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg.type=hive \
-e "
-- 1. Chạy query test
SELECT * FROM iceberg.customer_orders LIMIT 10;
-- 2. Kiểm tra audit log
SELECT * FROM iceberg.audit_logs ORDER BY created_at DESC LIMIT 5;
"
Kết quả mong đợi: Bảng `audit_logs` hiển thị dòng mới nhất với `operation` là `SELECT`, `table_name` là `customer_orders`, và `user_name` trùng với user đang chạy lệnh.
Điều hướng series:
Mục lục: Series: Series: Xây dựng nền tảng Data Lakehouse hiện đại với Apache Iceberg, Delta Lake và Spark trên Kubernetes
« Phần 8: Xử lý dữ liệu stream: Real-time Ingestion vào Lakehouse
Phần 10: Tối ưu hóa chi phí và Troubleshooting nâng cao cho hệ thống »