Cấu hình dbt để kết nối với Apache Iceberg Catalog
Bước đầu tiên là định nghĩa cấu hình kết nối trong file profiles.yml để dbt có thể giao tiếp với Iceberg Catalog thông qua Spark hoặc Trino connector. Ở đây chúng ta sử dụng adapter dbt-spark hoặc dbt-trino tùy thuộc vào compute engine đã setup ở Phần 2.
Tạo file cấu hình tại đường dẫn ~/.dbt/profiles.yml với nội dung hoàn chỉnh dưới đây. Thay thế các biến $CATALOG_URI và $CREDENTIALS bằng thông tin thực tế của bạn.
cat > ~/.dbt/profiles.yml
Kết quả mong đợi: File profiles.yml được tạo thành công, dbt có thể đọc cấu hình này để thiết lập session JDBC/Thrift với Iceberg Catalog mà không cần hardcode trong code.
Viết mã SQL trong dbt để tạo bảng Staging và Model chính
Chúng ta sẽ tạo hai lớp xử lý: staging để làm sạch dữ liệu thô (raw) và models để chuẩn hóa thành Data Product. Cấu trúc thư mục trong project dbt cần tuân thủ chuẩn dbt_project.yml.
Đầu tiên, tạo file staging/schema.yml tại dbt_project/staging/schema.yml để định nghĩa metadata và các test cơ bản cho nguồn dữ liệu.
cat > dbt_project/staging/schema.yml
Kết quả mong đợi: File schema.yml xác định rõ ràng nguồn dữ liệu, các trường bắt buộc và các ràng buộc chất lượng (freshness, unique, not_null).
Tiếp theo, tạo file SQL thực thi tại dbt_project/staging/stg_clickstream_events.sql để lọc dữ liệu và chuẩn hóa định dạng.
cat > dbt_project/staging/stg_clickstream_events.sql
Kết quả mong đợi: File SQL thực hiện casting kiểu dữ liệu, xóa giá trị null và lọc các giá trị không hợp lệ, tạo ra bảng staging sạch sẽ.
Định nghĩa model chính (Data Product) tại dbt_project/models/core/core_user_events.sql để tổng hợp dữ liệu từ staging.
cat > dbt_project/models/core/core_user_events.sql
Kết quả mong đợi: File SQL tạo ra bảng model chính với các chỉ số tổng hợp (aggregations) và window functions, sẵn sàng để xuất ra Data Product.
Sử dụng dbt tests để đảm bảo chất lượng dữ liệu
Trước khi xuất dữ liệu ra Data Product, chúng ta phải chạy các test để đảm bảo tính toàn vẹn. dbt hỗ trợ ba loại test: schema tests (định nghĩa trong schema.yml), generic tests và custom tests.
Tạo file test tùy chỉnh tại dbt_project/tests/test_data_quality.sql để kiểm tra logic phức tạp hơn, ví dụ: kiểm tra dữ liệu không có sự kiện trong tương lai.
cat > dbt_project/tests/test_data_quality.sql current_timestamp()
EOF
Kết quả mong đợi: Nếu query trả về bất kỳ dòng nào, test sẽ thất bại và ngăn chặn việc deploy pipeline.
Cấu hình dbt_project.yml để áp dụng các test mặc định cho toàn bộ project, bao gồm cả test unique và not_null cho các cột quan trọng.
cat > dbt_project/dbt_project.yml
Kết quả mong đợi: Project dbt được cấu hình để materialize các bảng staging và core dưới dạng bảng vật lý (table) và áp dụng các tag để quản lý scope.
Thực thi lệnh dbt build để chạy toàn bộ pipeline: compile, run models và run tests.
dbt build --target dev --select +core_user_events --models '+core_*'
Kết quả mong đợi: Lệnh chạy thành công (Exit code 0), không có test nào bị fail. Nếu có lỗi, quá trình sẽ dừng lại ngay lập tức.
Tích hợp dbt với CI/CD pipeline trên Kubernetes
Để tự động hóa việc xử lý dữ liệu, chúng ta sẽ đóng gói dbt vào Docker image và chạy dưới dạng Job trên Kubernetes. Kubernetes Job sẽ chỉ chạy một lần và tự xóa khi hoàn thành.
Tạo file Dockerfile tại dbt_project/Dockerfile để đóng gói môi trường dbt với các dependency cần thiết.
cat > dbt_project/Dockerfile
Kết quả mong đợi: Docker image được build thành công, chứa đầy đủ runtime của dbt và các adapter.
Tạo file Kubernetes Manifest dbt-job.yaml để định nghĩa Job chạy dbt. Job này sẽ mount Secret chứa thông tin kết nối để đảm bảo an toàn.
cat > dbt_job.yaml
Kết quả mong đợi: File YAML định nghĩa Job, khi áp dụng vào cluster, K8s sẽ kéo image, mount secret và configmap, sau đó chạy lệnh dbt build.
Để áp dụng Job vào cluster, chạy lệnh sau (giả định đã có Secret và ConfigMap được tạo trước đó).
kubectl apply -f dbt_job.yaml
Kết quả mong đợi: Job dbt-transform-job được tạo và trạng thái chuyển từ Pending sang Running rồi Completed.
Verify kết quả bằng cách xem log của Pod chạy Job.
kubectl logs -f job/dbt-transform-job --tail=50
Kết quả mong đợi: Log hiển thị quá trình compile, run models và tests của dbt. Dòng cuối cùng là Finished at ... in X seconds và Done. PASS.
Cấu hình Partitioning và Sorting để tối ưu hiệu suất Iceberg
Apache Iceberg sử dụng partitioning và sorting để tối ưu hóa việc quét dữ liệu (scan). Việc cấu hình đúng trong dbt giúp giảm chi phí lưu trữ và tăng tốc độ truy vấn.
Trong dbt_project.yml, chúng ta đã định nghĩa partition và sort ở phần models. Tuy nhiên, để kiểm soát chính xác hơn, chúng ta có thể sử dụng macro hoặc cấu hình inline trong file SQL.
Định nghĩa macro set_iceberg_properties tại dbt_project/macros/iceberg_utils.sql để tự động thiết lập các thuộc tính partition và sort cho Iceberg.
cat > dbt_project/macros/iceberg_utils.sql
Kết quả mong đợi: Macro được tạo để chuyển đổi cấu hình dbt sang cú pháp SQL T-SQL/Spark SQL phù hợp với Iceberg.
Sử dụng macro này trong file model SQL để áp dụng partitioning tự động.
cat > dbt_project/models/core/core_user_events.sql
Kết quả mong đợi: dbt sẽ tự động sinh ra lệnh CREATE TABLE ... PARTITIONED BY ... SORTED BY ... phù hợp với Iceberg Catalog khi chạy dbt run.
Verify cấu hình partitioning bằng cách truy vấn trực tiếp vào Iceberg Catalog hoặc dùng Spark SQL để kiểm tra metadata của bảng.
spark-sql -e "DESCRIBE EXTENDED default.core_user_events | grep -E 'Partition|Sort'"
Kết quả mong đợi: Output hiển thị các dòng Partition Keys và Sort Keys khớp với cấu hình đã định nghĩa trong file SQL.
Verify toàn bộ quy trình
Để đảm bảo toàn bộ pipeline hoạt động trơn tru, thực hiện các bước kiểm tra cuối cùng sau khi deploy.
1. Kiểm tra trạng thái các bảng trong Iceberg Catalog.
curl -X GET "http://catalog-service:8080/v1/namespaces/default/tables" | jq '.[] | select(.name == "core_user_events")'
Kết quả mong đợi: JSON trả về chứa thông tin bảng core_user_events với current_snapshot_id khác null, chứng tỏ dữ liệu đã được ghi.
2. Kiểm tra số lượng dòng dữ liệu thực tế trong bảng.
spark-sql -e "SELECT COUNT(*) as total_rows FROM default.core_user_events"
Kết quả mong đợi: Số lượng dòng trả về khớp với số lượng record đã được load vào staging và qua các bước test.
3. Kiểm tra hiệu suất truy vấn với dữ liệu partitioned.
spark-sql -e "EXPLAIN SELECT * FROM default.core_user_events WHERE event_timestamp >= '2023-10-01' AND event_timestamp < '2023-10-02'" | grep -i "partition"
Kết quả mong đợi: Plan thực thi (Execution Plan) hiển thị Partition Pruning, chứng tỏ Iceberg chỉ quét các file partition liên quan thay vì toàn bộ bảng.
Điều hướng series:
Mục lục: Series: Series: Xây dựng nền tảng Data Mesh phi tập trung với Apache Iceberg, dbt và Kubernetes để chia sẻ dữ liệu an toàn giữa các đơn vị kinh doanh
« Phần 2: Thiết lập hạ tầng Apache Iceberg và quản lý Catalog
Phần 4: Xây dựng Data Product và cơ chế chia sẻ dữ liệu an toàn »