Cấu hình Scheduler để chạy Pipeline dbt tự động
Triển khai Apache Airflow trên Kubernetes
Bạn cần một hệ thống orchestration để kích thích các job dbt chạy theo lịch trình. Chúng ta sẽ dùng Helm chart của Apache Airflow để deploy lên cluster K8s đã chuẩn bị.
Tại sao: Airflow quản lý phụ thuộc giữa các task, cung cấp UI để xem lịch sử chạy và tích hợp sẵn connector cho dbt.
Kết quả mong đợi: Service Airflow Webserver và Scheduler đang chạy trạng thái Running trên K8s.
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm install airflow apache-airflow/airflow --namespace data-mesh --set executor=CeleryExecutor --set webserver.password=admin --set dbt.enabled=true
Sau khi deploy xong, bạn cần expose service Webserver để truy cập dashboard.
kubectl port-forward svc/airflow-webserver 8080:8080 -n data-mesh
Mở trình duyệt vào http://localhost:8080, đăng nhập user/password (admin/admin) để vào giao diện quản lý.
Tạo DAG để kích hoạt dbt model
Bạn cần viết một DAG (Directed Acyclic Graph) trong thư mục dags của Airflow để gọi dbt CLI.
Tại sao: DAG định nghĩa logic chạy: khi nào chạy, chạy file nào, và truyền biến môi trường ra sao.
Kết quả mong đợi: Task "dbt_run" xuất hiện trong DAG và chuyển sang trạng thái Success sau khi chạy.
File config: /opt/airflow/dags/dbt_pipeline.py
from airflow import DAG
from airflow.providers.dbt.core.operators.dbt import DbtRunOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_engineering',
'start_date': datetime(2023, 10, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'dbt_sales_data_product',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
) as dag:
run_dbt = DbtRunOperator(
task_id='dbt_run',
project_dir='/opt/dbt_project',
profile='iceberg_profile',
target='prod',
models=['stg_sales', 'fct_orders'],
env={
'AIRFLOW_VAR_CATALOG_URI': 's3://data-mesh-bucket/catalog',
'AIRFLOW_VAR_WAREHOUSE_URI': 'jdbc:postgresql://postgres:5432/iceberg',
},
)
Verify kết quả: Vào Airflow UI, tìm DAG "dbt_sales_data_product", click "Play" để kích hoạt chạy thủ công. Quan sát log của task để đảm bảo dbt đã kết nối thành công với Iceberg Catalog.
Triển khai Prometheus và Grafana để giám sát
Cài đặt Prometheus Operator và ServiceMonitor
Hạ tầng giám sát cần thu thập metrics từ cả dbt (thông qua Airflow exporter) và Iceberg Catalog (thông qua JDBC Gateway hoặc REST API).
Tại sao: Prometheus là chuẩn de-facto cho monitoring trên K8s, cho phép scrape metrics theo thời gian thực.
Kết quả mong đợi: Prometheus đang chạy và đã phát hiện target là Airflow và Iceberg Catalog.
kubectl apply -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/master/bundle.yaml -n monitoring
File config: /etc/prometheus/config/service-monitor-airflow.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: airflow-monitor
namespace: data-mesh
labels:
release: prometheus
spec:
selector:
matchLabels:
app: airflow
endpoints:
- port: metrics
path: /metrics
interval: 30s
namespaceSelector:
matchNames:
- data-mesh
File config: /etc/prometheus/config/service-monitor-iceberg.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: iceberg-catalog-monitor
namespace: data-mesh
labels:
release: prometheus
spec:
selector:
matchLabels:
app: iceberg-catalog
endpoints:
- port: 8080
path: /metrics
interval: 15s
namespaceSelector:
matchNames:
- data-mesh
Deploy ServiceMonitor vào cluster:
kubectl apply -f /etc/prometheus/config/service-monitor-airflow.yaml
kubectl apply -f /etc/prometheus/config/service-monitor-iceberg.yaml
Verify kết quả: Truy cập Prometheus UI (port-forward service prometheus-server), vào tab "Status" -> "Targets". Bạn phải thấy cả Airflow và Iceberg Catalog có trạng thái "UP".
Cấu hình Dashboard Grafana
Bạn cần dashboard để trực quan hóa các metric: thời gian chạy job, số lượng snapshot được tạo, và độ trễ của Iceberg.
Tại sao: Dashboard giúp Ops team nhìn nhanh tình trạng hệ thống mà không cần đọc log hay query PromQL thủ công.
Kết quả mong đợi: Dashboard Grafana hiển thị biểu đồ "dbt_run_duration_seconds" và "iceberg_snapshot_count".
File config: /etc/grafana/dashboards/data-mesh-overview.json
{
"dashboard": {
"title": "Data Mesh Overview",
"panels": [
{
"id": 1,
"title": "dbt Run Duration",
"targets": [
{
"expr": "dbt_run_duration_seconds",
"legendFormat": "{{ task_id }}"
}
],
"type": "graph"
},
{
"id": 2,
"title": "Iceberg Snapshot Count",
"targets": [
{
"expr": "iceberg_snapshot_count_total",
"legendFormat": "{{ table_name }}"
}
],
"type": "graph"
}
]
}
}
Import dashboard vào Grafana:
curl -X POST http://grafana:3000/api/dashboards/db -H "Content-Type: application/json" -H "Authorization: Bearer " -d @/etc/grafana/dashboards/data-mesh-overview.json
Verify kết quả: Truy cập Grafana UI, tìm dashboard "Data Mesh Overview". Đảm bảo các biểu đồ có dữ liệu cập nhật theo thời gian thực.
Thiết lập cảnh báo (Alerting) cho sự cố
Cấu hình Alertmanager và Rule cho dbt
Bạn cần quy tắc cảnh báo khi pipeline dbt bị lỗi hoặc chạy quá lâu (timeout).
Tại sao: Phát hiện sự cố ngay lập tức để team có thể xử lý trước khi ảnh hưởng đến báo cáo kinh doanh.
Kết quả mong đợi: Khi một task dbt bị lỗi, Alertmanager gửi cảnh báo đến kênh Slack hoặc Email.
File config: /etc/prometheus/rules/dbt-alerts.yaml
groups:
- name: dbt_alerts
rules:
- alert: DbtPipelineFailed
expr: dbt_task_state == "failed"
for: 5m
labels:
severity: critical
annotations:
summary: "Pipeline dbt bị lỗi: {{ $labels.task_id }}"
description: "Task {{ $labels.task_id }} trong DAG {{ $labels.dag_id }} đã fail."
- alert: DbtPipelineSlow
expr: dbt_run_duration_seconds > 3600
for: 10m
labels:
severity: warning
annotations:
summary: "Pipeline dbt chạy quá lâu: {{ $labels.task_id }}"
description: "Task {{ $labels.task_id }} đang chạy quá 1 giờ."
Reload config của Prometheus để áp dụng rule mới:
curl -X POST http://prometheus:9090/-/reload
Cấu hình cảnh báo chất lượng dữ liệu (Data Quality)
Giám sát các metric về độ trễ dữ liệu (Data Latency) từ Iceberg Catalog.
Tại sao: Nếu snapshot mới nhất quá cũ, dữ liệu báo cáo sẽ không còn giá trị.
Kết quả mong đợi: Cảnh báo khi thời gian của snapshot mới nhất vượt quá ngưỡng cho phép (ví dụ: 24h).
File config: /etc/prometheus/rules/iceberg-quality-alerts.yaml
groups:
- name: iceberg_quality
rules:
- alert: IcebergDataStaleness
expr: (time() - iceberg_snapshot_timestamp_seconds) > 86400
for: 1h
labels:
severity: warning
annotations:
summary: "Dữ liệu Iceberg bị cũ: {{ $labels.table_name }}"
description: "Snapshot mới nhất của table {{ $labels.table_name }} đã hơn 24h."
Verify kết quả: Giả lập một tình huống lỗi bằng cách kill một pod dbt đang chạy hoặc tạm dừng job Airflow. Quan sát tab "Alerts" trong Prometheus và kiểm tra kênh Slack/Email xem đã nhận thông báo chưa.
Tự động hóa Backup và Restore Catalog Iceberg
Chiến lược Backup Catalog
Backup Iceberg Catalog không chỉ là sao chép file metadata mà còn cần đồng bộ state của catalog vào object storage (S3).
Tại sao: Metadata (manifests, snapshots) nằm ở S3, nhưng catalog definition (namespace mapping) thường nằm trong Database (PostgreSQL/MySQL). Cần backup cả 2.
Kết quả mong đợi: Script backup chạy tự động, tạo ra một snapshot của database catalog và tarball metadata.
File config: /scripts/backup-iceberg-catalog.sh
#!/bin/bash
set -e
BACKUP_DIR="/var/backups/iceberg"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
CATALOG_DB="iceberg_catalog"
CATALOG_USER="admin"
CATALOG_PASS="password"
S3_BUCKET="data-mesh-backup"
AWS_REGION="us-east-1"
mkdir -p $BACKUP_DIR
echo "Starting backup at $TIMESTAMP..."
# Backup Database Catalog
PGDUMP_FILE="$BACKUP_DIR/catalog_db_$TIMESTAMP.sql"
PGPASSWORD=$CATALOG_PASS pgdump -U $CATALOG_USER -h postgres -d $CATALOG_DB > $PGDUMP_FILE
# Backup Metadata (Optional: nếu metadata nằm trong S3 riêng biệt, dùng aws s3 sync)
# aws s3 sync s3://data-mesh-bucket/catalog $BACKUP_DIR/iceberg_metadata_$TIMESTAMP
# Compress and Upload to S3
tar -czf "$BACKUP_DIR/backup_$TIMESTAMP.tar.gz" $PGDUMP_FILE
aws s3 cp "$BACKUP_DIR/backup_$TIMESTAMP.tar.gz" "s3://$S3_BUCKET/iceberg-catalog-backups/"
# Cleanup local
rm -rf $BACKUP_DIR/*
echo "Backup completed and uploaded to S3."
Chạy script trong container cronjob hoặc Airflow:
chmod +x /scripts/backup-iceberg-catalog.sh
bash /scripts/backup-iceberg-catalog.sh
Quy trình Restore Catalog
Kịch bản restore cần khôi phục database trước, sau đó mount lại bucket S3 chứa metadata.
Tại sao: Đảm bảo tính nhất quán giữa catalog definition và metadata thực tế trên object storage.
Kết quả mong đợi: Catalog Iceberg hoạt động lại bình thường, các table vẫn truy cập được.
File config: /scripts/restore-iceberg-catalog.sh
#!/bin/bash
set -e
RESTORE_FILE="$1"
CATALOG_DB="iceberg_catalog"
CATALOG_USER="admin"
CATALOG_PASS="password"
if [ -z "$RESTORE_FILE" ]; then
echo "Usage: $0 "
exit 1
fi
# Download backup from S3 (nếu chưa có file local)
# aws s3 cp "s3://data-mesh-backup/iceberg-catalog-backups/$RESTORE_FILE" "$RESTORE_FILE"
# Extract backup
tar -xzf "$RESTORE_FILE" -C /var/restores/
SQL_FILE=$(ls /var/restores/*.sql | head -n 1)
# Restore Database
PGPASSWORD=$CATALOG_PASS psql -U $CATALOG_USER -h postgres -d $CATALOG_DB -f $SQL_FILE
echo "Catalog restored from $RESTORE_FILE"
Verify kết quả: Chạy restore script với file backup vừa tạo. Sau đó truy cập vào Iceberg Catalog (qua Spark hoặc JDBC) và chạy lệnh `SHOW TABLES` để xác nhận các bảng vẫn tồn tại.
Tối ưu hóa chi phí lưu trữ và tính toán trên Kubernetes
Cấu hình Horizontal Pod Autoscaler (HPA) cho dbt
Bạn cần scale out các pod chạy dbt khi có nhiều job cần xử lý đồng thời, và scale in khi không có việc làm.
Tại sao: dbt là workload bursty (chạy xong là nghỉ). Giữ nhiều pod chạy liên tục gây lãng phí tài nguyên.
Kết quả mong đợi: Số lượng pod dbt tăng lên khi CPU usage cao và giảm về 0 hoặc 1 khi idle.
File config: /etc/k8s/dbt-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: dbt-runner-hpa
namespace: data-mesh
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: dbt-runner
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: dbt_queue_depth
target:
type: AverageValue
averageValue: 2
Deploy HPA:
kubectl apply -f /etc/k8s/dbt-hpa.yaml
Verify kết quả: Sử dụng `kubectl top pods -n data-mesh` để xem CPU usage. Giả lập load cao (chạy nhiều job dbt), quan sát số lượng pod tăng lên qua lệnh `kubectl get pods -n data-mesh`.
Sử dụng Spot Instances và Storage Tiering
Tối ưu chi phí bằng cách dùng Spot Instances cho worker node chạy dbt và chuyển metadata Iceberg cũ sang Cold Storage.
Tại sao: dbt có thể bị kill bất cứ lúc nào mà không ảnh hưởng dữ liệu (stateless). Iceberg metadata cũ ít khi truy cập, nên lưu trữ rẻ hơn.
Kết quả mong đợi: Chi phí cluster giảm đáng kể, dữ liệu vẫn an toàn.
Cấu hình Lifecycle Policy cho S3 Bucket chứa Iceberg metadata (ví dụ AWS S3):
aws s3api put-bucket-lifecycle-configuration \
--bucket data-mesh-bucket \
--lifecycle-configuration file://s3-lifecycle-config.json
File config: /etc/s3-lifecycle-config.json
{
"Rules": [
{
"ID": "MoveOldMetadataToGlacier",
"Status": "Enabled",
"Filter": {
"Prefix": "catalog/metadata/"
},
"Transitions": [
{
"Days": 90,
"StorageClass": "GLACIER"
}
],
"Expiration": {
"Days": 365
}
}
]
}
Verify kết quả: Kiểm tra chi phí trên cloud console (AWS Cost Explorer). Quan sát số lượng pod chạy trên Spot Instance. Xác nhận các file metadata cũ hơn 90 ngày đã chuyển sang Glacier.
Điều hướng series:
Mục lục: Series: Series: Xây dựng nền tảng Data Mesh phi tập trung với Apache Iceberg, dbt và Kubernetes để chia sẻ dữ liệu an toàn giữa các đơn vị kinh doanh
« Phần 4: Xây dựng Data Product và cơ chế chia sẻ dữ liệu an toàn
Phần 6: Nâng cao hiệu năng, xử lý sự cố và các mẹo triển khai thực tế »