Xử lý lỗi đồng bộ dữ liệu lớn giữa dbt và Iceberg
Khi đồng bộ lượng dữ liệu lớn, dbt thường gặp lỗi timeout hoặc ghi đè snapshot không đầy đủ do cơ chế merge mặc định.
Giải pháp là cấu hình chiến lược merge cụ thể cho từng bảng Iceberg và tăng giới hạn thời gian chờ.
Trước tiên, chỉnh sửa file dbt_project.yml để áp dụng chiến lược merge an toàn cho các bảng Iceberg.
version: 2
models:
my_data_product:
+materialized: table
+seeds:
+file_format: parquet
+on_schema_change: append_new_columns
+iceberg:
write_mode: merge
merge_key: transaction_id
+incremental_strategy: merge
Kết quả: dbt sẽ thực hiện thao tác MERGE thay vì DROP+CREATE, đảm bảo không mất dữ liệu cũ trong quá trình cập nhật incremental.
Tiếp theo, cấu hình biến môi trường để xử lý lỗi timeout khi chạy trên Kubernetes.
export DBT_THREADS=4
export DBT_LOG_PATH=/var/log/dbt
export DBT_LOG_FORMAT=json
export DBT_SEND_ANONYMOUS_USAGE_STATS=false
export DBT_PROFILES_DIR=/opt/dbt/profiles
Kết quả: Các luồng chạy song song được giới hạn để tránh OOM (Out Of Memory), log được định dạng JSON để tiện phân tích sau này.
Để xử lý lỗi "Table already exists" khi chạy lại job bị lỗi giữa chừng, thêm flag idempotent vào command chạy dbt.
dbt run --models my_data_product --full-refresh --fail-fast
Kết quả: Nếu job lỗi, các bảng bị ảnh hưởng sẽ được làm sạch và tạo lại hoàn toàn, đảm bảo tính nhất quán.
Verify kết quả bằng cách kiểm tra trạng thái của bảng trong Iceberg Catalog.
trino -c "SELECT * FROM iceberg_catalog.my_schema.my_data_product LIMIT 10;"
Kết quả mong đợi: Query chạy thành công, không báo lỗi schema conflict, dữ liệu mới nhất được hiển thị.
Chiến lược Partitioning và Compaction để tối ưu chi phí
Việc chọn sai chiến lược phân vùng (partitioning) sẽ dẫn đến "small file problem", làm tăng chi phí metadata và giảm tốc độ query.
Áp dụng chiến lược partitioning dựa trên thời gian (time-based) cho dữ liệu có tần suất truy vấn cao theo ngày.
Cấu hình file /opt/dbt/models/my_data_product.sql với thuộc tính partition_spec.
{{
config(
materialized='table',
partition_by={
"strategy": "range",
"field": "event_date",
"start": "2023-01-01",
"end": "2024-12-31",
"interval": "1"
}
)
}}
SELECT *
FROM raw_events
WHERE event_date >= '2023-01-01'
Kết quả: Iceberg sẽ tự động tạo các partition theo ngày, giúp query chỉ quét dữ liệu cần thiết.
Để giải quyết vấn đề file nhỏ, thiết lập lịch compaction tự động thông qua Spark hoặc Trino.
Tạo file job /opt/airflow/dags/iceberg_compaction.py để chạy compaction định kỳ.
from pyspark.sql import SparkSession
from pyiceberg.catalog import load_catalog
catalog = load_catalog('iceberg_catalog')
spark = SparkSession.builder \
.appName("IcebergCompaction") \
.config("spark.sql.catalog.iceberg_catalog", catalog) \
.getOrCreate()
df = spark.read.table("iceberg_catalog.my_schema.my_data_product")
df.write \
.format("iceberg") \
.mode("overwrite") \
.option("write.merge", "true") \
.option("spark.sql.iceberg.compaction.enabled", "true") \
.save("iceberg_catalog.my_schema.my_data_product")
spark.stop()
Kết quả: Các file nhỏ được hợp nhất thành file lớn hơn (mục tiêu 512MB), giảm số lượng file metadata.
Verify hiệu quả compaction bằng cách đếm số lượng file data.
trino -c "SELECT count(*) as file_count FROM iceberg_catalog.my_schema.my_data_product__files WHERE file_size > 0;"
Kết quả mong đợi: Số lượng file giảm đáng kể so với trước khi compaction, kích thước file trung bình tăng lên.
Giải pháp xử lý xung đột khi nhiều đơn vị cập nhật Data Product
Khi nhiều team cùng ghi vào một Data Product, xung đột xảy ra khi commit đồng thời. Iceberg hỗ trợ cơ chế optimistic concurrency control.
Đảm bảo dbt sử dụng cơ chế "append-only" hoặc "upsert" với key duy nhất để tránh ghi đè dữ liệu quan trọng.
Cấu hình trong /opt/dbt/profiles.yml để bật tính năng transaction isolation.
my_data_mesh_profile:
target: prod
outputs:
prod:
type: databricks
host: https://adb-12345.databricks.com
http_path: /sql/1.0/warehouses/warehouse_id
token: ${DBT_DATABRICKS_TOKEN}
catalog: hive_metastore
schema: my_data_product_schema
iceberg_catalog: true
iceberg_commit_strategy: optimistic
Kết quả: Nếu có xung đột, commit thứ hai sẽ bị từ chối (conflict error) thay vì ghi đè sai lệch, buộc team phải giải quyết thủ công.
Triển khai cơ chế "Staging Area" trung gian để các team dump dữ liệu vào trước khi merge vào Production.
Tạo bảng staging trong Iceberg với schema mở rộng.
CREATE TABLE IF NOT EXISTS iceberg_catalog.my_schema.my_data_product_staging (
transaction_id BIGINT,
event_date DATE,
amount DOUBLE,
source_unit VARCHAR(50),
created_at TIMESTAMP
) USING iceberg
PARTITIONED BY (year(event_date), month(event_date));
Kết quả: Dữ liệu từ các đơn vị khác nhau được lưu vào bảng staging, tránh xung đột trực tiếp vào bảng chính.
Viết job merge từ Staging vào Production chỉ chạy vào khung giờ cố định (ví dụ: 2h sáng).
dbt run --models my_data_product_staging --target prod --vars '{"merge_strategy": "upsert", "merge_key": "transaction_id"}'
Kết quả: Dữ liệu được hợp nhất an toàn, xung đột được phát hiện và ghi log trước khi commit.
Verify bằng cách kiểm tra log commit của Iceberg.
trino -c "SELECT commit_id, committed_by, committed_at FROM iceberg_catalog.my_schema.my_data_product_commits ORDER BY committed_at DESC LIMIT 5;"
Kết quả mong đợi: Thấy rõ ai là người commit, thời gian commit và không có record bị ghi đè sai.
Tối ưu hóa query trong Trino/Presto với Iceberg
Query chậm thường do Trino quét toàn bộ file hoặc không tận dụng được partition pruning.
Bật tính năng "Predicate Pushdown" và "Partition Pruning" trong config của Trino Coordinator.
Sửa file /etc/trino/etc/catalog/iceberg.properties.
connector.name=iceberg
hive.metastore.uri=thrift://metastore:9083
hive.s3.endpoint=s3.amazonaws.com
hive.s3.aws-region=us-east-1
iceberg.checkpointing.enabled=true
iceberg.checkpointing.snapshot-id=latest
iceberg.cache.enabled=true
iceberg.cache.max-size=1000
iceberg.cache.max-age=1h
Kết quả: Trino sẽ cache metadata của Iceberg, giảm latency khi mở file và tăng tốc độ phân tích.
Áp dụng chỉ mục (Index) cho các cột thường dùng trong điều kiện WHERE nếu sử dụng Spark hoặc viết lại query để tận dụng partition.
Viết query tối ưu tránh SELECT * và đảm bảo điều kiện WHERE lọc đúng partition key.
SELECT
source_unit,
SUM(amount) as total_amount
FROM iceberg_catalog.my_schema.my_data_product
WHERE event_date BETWEEN '2023-10-01' AND '2023-10-05'
AND source_unit = 'sales_department'
GROUP BY source_unit;
Kết quả: Query chỉ quét các partition của tháng 10 năm 2023, thời gian thực thi giảm từ phút xuống giây.
Để debug query chậm, sử dụng EXPLAIN để xem plan thực thi.
trino -c "EXPLAIN SELECT * FROM iceberg_catalog.my_schema.my_data_product WHERE event_date = '2023-10-01';"
Kết quả mong đợi: Plan hiển thị "Filter" nằm trước "TableScan", chứng tỏ predicate pushdown hoạt động, chỉ quét file cần thiết.
Verify hiệu năng bằng cách so sánh thời gian chạy query trước và sau khi tối ưu.
time trino -c "SELECT COUNT(*) FROM iceberg_catalog.my_schema.my_data_product WHERE event_date = '2023-10-01';"
Kết quả mong đợi: Thời gian thực thi dưới 1 giây cho bảng có hàng triệu dòng.
Hướng dẫn mở rộng quy mô (Scaling) hệ thống
Khi số lượng Data Product tăng lên, metadata của Catalog (Hive/Glue) có thể trở thành điểm nghẽn.
Triển khai mô hình "Sharding Catalog" bằng cách chia các schema vào các namespace riêng biệt hoặc sử dụng nhiều Catalog instance.
Cấu hình Kubernetes HPA (Horizontal Pod Autoscaler) cho Trino Worker để scale tự động theo tải.
Tạo file /k8s/configs/trino-worker-hpa.yaml.
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: trino-worker-hpa
namespace: data-mesh
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: trino-worker
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Kết quả: Khi CPU hoặc RAM vượt quá ngưỡng, Kubernetes tự động thêm pod Worker để xử lý query song song.
Đối với dbt, scale bằng cách tăng số lượng thread hoặc chia nhỏ job thành nhiều pipeline song song.
Cấu hình /opt/dbt/profiles.yml để tăng số luồng song song.
my_data_mesh_profile:
target: prod
outputs:
prod:
type: databricks
threads: 16
# ... config khác
Kết quả: dbt chạy nhiều model song song, giảm tổng thời gian pipeline từ 2h xuống 30p.
Để mở rộng Iceberg storage, đảm bảo backend object storage (S3/MinIO) được cấu hình để xử lý lượng request cao.
Chạy lệnh kiểm tra thông lượng của storage.
aws s3api list-objects --bucket my-data-lake --prefix iceberg/ --max-items 1000 --query 'Contents[].Size' | awk '{sum+=$1} END {print sum/1024/1024 " MB"}'
Kết quả mong đợi: Tổng kích thước dữ liệu tăng lên nhưng latency truy cập vẫn ổn định.
Verify khả năng scale bằng cách chạy stress test với nhiều query đồng thời.
for i in {1..10}; do
trino -c "SELECT COUNT(*) FROM iceberg_catalog.my_schema.my_data_product" &
done
wait
echo "All queries finished"
Kết quả mong đợi: Tất cả 10 query chạy xong trong thời gian chấp nhận được, không có lỗi timeout hoặc OOM.
Điều hướng series:
Mục lục: Series: Series: Xây dựng nền tảng Data Mesh phi tập trung với Apache Iceberg, dbt và Kubernetes để chia sẻ dữ liệu an toàn giữa các đơn vị kinh doanh
« Phần 5: Tự động hóa vận hành và giám sát hệ thống Data Mesh