Xử lý lỗi thường gặp khi chạy Spark Job với Apache Iceberg
Trong quá trình vận hành, lỗi phổ biến nhất là xung đột phiên bản thư viện hoặc lỗi phân vùng (partitioning) khi ghi dữ liệu.
1. Khắc phục lỗi "Class not found" hoặc xung đột thư viện Iceberg
Lỗi này xảy ra khi Spark phiên bản cũ hoặc cấu hình không tương thích với thư viện Iceberg mới nhất, dẫn đến lỗi runtime.
Để kiểm tra và sửa, cần xác minh các dependency trong file `spark-defaults.conf`.
Truy cập file cấu hình tại: /etc/spark/spark-defaults.conf
Sửa nội dung file, đảm bảo khai báo đúng version của Spark và Iceberg. Ví dụ cho Spark 3.5.0 và Iceberg 1.4.0:
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type rest
spark.sql.catalog.spark_catalog.uri http://trino-server:8080/v1
spark.sql.catalog.spark_catalog.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.spark_catalog.warehouse s3://my-iceberg-warehouse/
spark.jars.packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.0
Kết quả mong đợi: Khi chạy job Spark, không còn lỗi `ClassNotFoundException` và Spark có thể nhận diện catalog Iceberg.
2. Xử lý lỗi "Manifest file not found" hoặc "Corrupted manifest"
Lỗi này thường xảy ra khi file manifest bị xóa thủ công hoặc lỗi trong quá trình compact, khiến Iceberg không đọc được snapshot cũ.
Thực hiện lệnh `iceberg-tools` hoặc dùng Spark SQL để rebuild metadata.
Mở Spark SQL shell và chạy lệnh sửa chữa:
spark-sql --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog"
Trong shell, chạy lệnh optimize để tạo lại manifest file:
OPTIMIZE table_name WHERE false;
Kết quả mong đợi: Iceberg sẽ quét lại dữ liệu và tạo manifest mới, xóa các tham chiếu lỗi. Lỗi "Manifest not found" biến mất.
3. Lỗi "Transaction conflict" khi ghi đồng thời
Khi nhiều executor ghi vào cùng một partition mà không có cơ chế locking, dữ liệu có thể bị ghi đè hoặc lỗi transaction.
Giải pháp: Bật tính năng locking và đảm bảo catalog hỗ trợ locking (ví dụ: sử dụng catalog dựa trên REST với backend như Postgres hoặc DynamoDB).
Cấu hình lại catalog trong Spark:
spark.sql.catalog.spark_catalog.lock-impl org.apache.iceberg.rest.RESTCatalogLock
Kết quả mong đợi: Các job ghi đồng thời sẽ được xếp hàng hoặc xử lý theo cơ chế optimistic locking, tránh xung đột dữ liệu.
Phân tích và tối ưu hiệu năng Query trong Trino
Trino thường gặp vấn đề query chậm do dữ liệu bị phân mảnh (data skew) hoặc thiếu index phù hợp.
1. Chẩn đoán nguyên nhân Query chậm
Sử dụng Trino CLI hoặc UI để xem execution plan và thống kê tài nguyên.
Chạy lệnh phân tích query đã thực thi (thay thế `query_id` bằng ID thực tế từ log Trino):
trino-cli --server trino-server:8080 -c "SELECT * FROM system.runtime.queries WHERE query_id = '20240520_120000_00000_abcde';"
Hoặc xem execution plan chi tiết:
EXPLAIN (ANALYZE, COSTS) SELECT * FROM iceberg_db.sales WHERE date = '2024-01-01';
Kết quả mong đợi: Bạn sẽ thấy cột `actual_rows` và `estimated_rows`. Nếu chênh lệch lớn, nghĩa là thống kê metadata không chính xác, gây ra full scan không cần thiết.
2. Tối ưu hóa với lệnh OPTIMIZE và REPAIR
Khi dữ liệu bị phân mảnh thành nhiều file nhỏ (small files), Trino phải mở quá nhiều file handle, làm chậm query.
Chạy lệnh OPTIMIZE trong Trino để merge các file nhỏ thành file lớn hơn (Z-ordering hoặc partition pruning):
OPTIMIZE iceberg_db.sales
Kết quả mong đợi: Số lượng file (manifest entries) giảm xuống, thời gian query (wall time) giảm đáng kể.
3. Cấu hình Cache kết quả trong Trino
Để tăng tốc các query lặp lại, bật tính năng Result Cache.
Chỉnh sửa file cấu hình Trino tại: /etc/trino/catalog/iceberg.properties
Thêm nội dung:
connector.name=iceberg
iceberg.catalog-type=rest
iceberg.catalog-uri=http://trino-server:8080/v1
iceberg.warehouse=s3://my-iceberg-warehouse/
iceberg.cache-result=true
Và trong file /etc/trino/config.properties, tăng bộ nhớ cache:
query.max-memory=12GB
query.max-memory-per-node=4GB
Kết quả mong đợi: Các query giống nhau chạy lại sẽ trả về kết quả gần như tức thì từ cache thay vì quét lại Data Lake.
Mở rộng kiến trúc sang Multi-node Cluster
Để xử lý khối lượng dữ liệu lớn hơn, cần triển khai Trino và Spark trên nhiều node sử dụng YARN hoặc Kubernetes.
1. Triển khai Trino trên Kubernetes (Helm Chart)
Sử dụng Helm để deploy Trino Coordinator và Worker trên K8s, đảm bảo khả năng tự động mở rộng (auto-scaling).
Chuẩn bị file giá trị values.yaml tại đường dẫn /k8s/trino/values.yaml:
coordinator:
replicas: 1
resources:
limits:
cpu: "2"
memory: "8Gi"
worker:
replicas: 3
resources:
limits:
cpu: "4"
memory: "16Gi"
autoscaling:
enabled: true
minReplicas: 3
maxReplicas: 20
targetCPUUtilizationPercentage: 70
catalogs:
iceberg:
type: iceberg
properties:
catalog-type: rest
catalog-uri: http://trino-coordinator:8080/v1
warehouse: s3://my-iceberg-warehouse/
Chạy lệnh deploy:
helm upgrade --install trino trino/trino -f /k8s/trino/values.yaml -n data-lake
Kết quả mong đợi: Kubernetes tạo ra 1 pod Coordinator và 3 pod Worker. Trạng thái pod là `Running`. Load balancer phân phối traffic vào Coordinator.
2. Tích hợp Spark với YARN trong cluster
Cấu hình Spark để chạy trên YARN ResourceManager, tận dụng tài nguyên động của cluster.
Chỉnh sửa file /etc/spark/conf/spark-env.sh:
export SPARK_YARN_MODE=true
export SPARK_YARN_RESOURCEMANAGER_HOST=yarn-resourcemanager
export SPARK_YARN_RESOURCEMANAGER_PORT=8032
Chạy job Spark với chế độ YARN cluster:
spark-submit --master yarn --deploy-mode cluster --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.spark_catalog.uri=http://trino-coordinator:8080/v1 /path/to/your-job.jar
Kết quả mong đợi: Job Spark được phân phối vào các node YARN, không chạy trên driver local. Bạn có thể mở rộng số lượng worker YARN để tăng tốc độ xử lý.
Chiến lược Backup, Restore và Disaster Recovery
Data Lakehouse lưu trữ dữ liệu dạng file (Parquet) trên Object Storage (S3/MinIO), do đó chiến lược backup khác biệt so với database truyền thống.
1. Backup Metadata Catalog (Quan trọng nhất)
Nếu catalog (metadata) bị mất, các file Parquet trong S3 vẫn còn nhưng không thể truy cập được. Catalog lưu trữ thông tin về schema, version, và location của file.
Đối với Iceberg Catalog dựa trên REST (sử dụng backend Postgres hoặc DynamoDB), cần backup database backend này.
Sử dụng lệnh `pg_dump` để backup catalog Postgres:
pg_dump -U iceberg_user -h postgres-db -d iceberg_catalog > /backup/iceberg_metadata_$(date +%F).sql
Hoặc sử dụng snapshot của S3 bucket chứa metadata (nếu dùng catalog file-based):
aws s3 sync s3://my-iceberg-warehouse/.metadata /backup/metadata-$(date +%F)
Kết quả mong đợi: File SQL hoặc folder metadata được lưu trữ an toàn ở nơi khác. Đây là "chìa khóa" để phục hồi toàn bộ Data Lake.
2. Restore Data Lake sau sự cố
Quy trình khôi phục bao gồm: Khôi phục metadata trước, sau đó đảm bảo file data vẫn tồn tại trên S3.
Khôi phục metadata từ backup SQL:
psql -U iceberg_user -h postgres-db -d iceberg_catalog -f /backup/iceberg_metadata_2024-05-20.sql
Đảm bảo bucket S3 đã được restore (nếu bucket bị xóa):
aws s3 sync /backup/data-lake-files s3://my-iceberg-warehouse/
Kết quả mong đợi: Trino và Spark có thể truy vấn lại dữ liệu ngay lập tức vì metadata đã khớp với file data trên S3.
3. Chiến lược Disaster Recovery với Time Travel
Lợi ích lớn nhất của Iceberg là Time Travel. Nếu có lỗi logic trong ETL xóa nhầm dữ liệu, không cần restore từ backup mà dùng snapshot cũ.
Thực hiện restore toàn bộ table về trạng thái trước 24h:
ALTER TABLE iceberg_db.sales SET LOCATION 's3://my-iceberg-warehouse/sales';
CALL iceberg_db.system.repair_table('iceberg_db.sales', TIMESTAMP '2024-05-19 12:00:00');
Kết quả mong đợi: Table `sales` quay về trạng thái tại thời điểm 12:00 ngày hôm qua, dữ liệu bị xóa nhầm hoặc ghi sai được khôi phục hoàn toàn.
Điều hướng series:
Mục lục: Series: Xây dựng Data Lakehouse với Apache Iceberg, Trino và Ubuntu 24.04
« Phần 5: Tối ưu hóa hiệu năng và bảo mật cho Lakehouse