Triển khai Apache Airflow trên Kubernetes
Cài đặt Airflow bằng Helm Chart
Chúng ta sẽ sử dụng Helm Chart chính thức của Apache Airflow để triển khai các thành phần Core (Webserver, Scheduler, Worker) lên Kubernetes cluster đã chuẩn bị từ phần 1.
Mục tiêu là có một môi trường Airflow sẵn sàng với cơ chế scaling tự động (KubernetesExecutor) để xử lý các tác vụ ETL.
Trước tiên, thêm repository của Apache Airflow vào Helm.
helm repo add apache-airflow https://airflow.apache.org
helm repo update
Hệ thống sẽ trả về thông báo repository đã được cập nhật thành công.
Tiếp theo, tạo namespace riêng cho Airflow để cách ly tài nguyên.
kubectl create namespace airflow
Namespace 'airflow' được tạo mới.
Sử dụng file values.yaml tùy chỉnh để cấu hình Airflow. Nội dung file này định nghĩa số lượng replica, tài nguyên CPU/RAM và cơ chế executor.
Đường dẫn file: ./airflow-values.yaml
executor: KubernetesExecutor
airflowHome: /opt/airflow
webserver:
replicas: 1
resources:
requests:
cpu: 200m
memory: 512Mi
limits:
cpu: 500m
memory: 1Gi
ingress:
enabled: true
className: nginx
hosts:
- host: airflow.example.com
paths:
- path: /
pathType: Prefix
scheduler:
replicas: 1
resources:
requests:
cpu: 200m
memory: 512Mi
limits:
cpu: 500m
memory: 1Gi
worker:
replicas: 0
resources:
requests:
cpu: 200m
memory: 512Mi
limits:
cpu: 500m
memory: 1Gi
podConcurrencyLimit: 16
redis:
enabled: true
flower:
enabled: false
postgres:
enabled: true
auth:
password: "airflow-secret-pwd"
username: "airflow"
database: "airflow"
kubernetes:
clusterName: kubernetes-cluster
kubeConfigSecretName: ""
useServiceAccount: true
serviceAccountName: "airflow-worker"
namespace: "airflow"
podTemplateFilePath: ""
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- preference:
matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
weight: 1
File cấu hình đã sẵn sàng, định nghĩa Airflow chạy trên KubernetesExecutor, sử dụng PostgreSQL làm metadata database và Redis làm message broker.
Triển khai Airflow vào cluster.
helm install airflow apache-airflow/airflow -f ./airflow-values.yaml -n airflow
Helm sẽ bắt đầu quá trình deploy, bạn sẽ thấy trạng thái các Pod chuyển dần từ Pending sang Running.
Verify kết quả triển khai
Kiểm tra trạng thái các Pod trong namespace airflow.
kubectl get pods -n airflow
Bạn phải thấy ít nhất các Pod sau đang ở trạng thái Running: airflow-scheduler, airflow-webserver, airflow-redis-master, airflow-postgres.
Kiểm tra dịch vụ Webserver để truy cập giao diện quản trị.
kubectl port-forward svc/airflow-webserver 8080:80 -n airflow
Truy cập http://localhost:8080 trên máy client. Đăng nhập với username airflow và password airflow (hoặc password đã cấu hình trong values.yaml).
Cấu hình Provider trong Airflow để làm việc với Iceberg và Trino
Cài đặt các gói Python cần thiết
Airflow cần các thư viện kết nối (connectors) để giao tiếp với Apache Iceberg và Trino. Chúng ta sẽ cài đặt các gói này vào môi trường Python của Airflow.
Các gói cần thiết: apache-airflow-providers-iceberg và apache-airflow-providers-trino.
Sửa file requirements của Airflow hoặc cài đặt trực tiếp vào container nếu dùng Docker image custom. Ở đây ta giả định dùng image chính thức và cài đặt vào environment.
Đường dẫn file: ./requirements.txt (nằm trong thư mục chứa DAGs hoặc trong image builder).
apache-airflow-providers-iceberg==3.1.0
apache-airflow-providers-trino==4.0.0
pyiceberg==0.6.0
trino==0.320.0
File requirements.txt chứa các phiên bản tương thích cho việc tích hợp.
Để áp dụng ngay lập tức vào container đang chạy (chỉ cho test/demo, trong production nên rebuild image), ta chèn command vào container webserver để cài đặt.
kubectl exec -it airflow-webserver-0 -n airflow -- pip install -r /opt/airflow/requirements.txt
Trình quản lý gói pip sẽ tải và cài đặt các thư viện vào môi trường Python của Airflow.
Cấu hình Connections trong Airflow UI
Sau khi cài đặt provider, bạn cần định nghĩa kết nối (Connections) trong Airflow UI để DAGs có thể sử dụng.
Vào menu Admin > Connections trong Airflow UI, nhấn + Add a Connection.
Tạo kết nối cho Trino:
- Connection Type: Trino
- Connection Id: trino_catalog
- Host: trino-trino (tên service Trino trong cluster)
- Port: 8080
- Schema: iceberg_catalog (tên catalog đã tạo ở phần 2)
- Extra:
{"cert": "/path/to/ca.crt", "http_scheme": "https"} (nếu có bảo mật SSL, nếu không thì bỏ trống)
Tạo kết nối cho Iceberg REST Catalog (dùng để quản lý metadata nếu cần thao tác trực tiếp với Catalog API):
- Connection Type: HTTP (hoặc dùng custom nếu provider hỗ trợ Iceberg Catalog riêng, thường dùng Trino connector để tương tác với Iceberg Table)
- Connection Id: iceberg_rest
- Host: iceberg-rest-catalog
- Port: 8080
- Extra:
{"auth": "basic", "username": "admin", "password": "secret"} (nếu cần auth)
Kết quả mong đợi: Hai kết nối mới xuất hiện trong danh sách Connections với trạng thái kiểm tra kết nối thành công.
Xây dựng DAG mẫu để nạp dữ liệu vào Iceberg Table
Cấu trúc thư mục DAGs
Airflow trên Kubernetes cần một Persistent Volume (PVC) để lưu trữ các file DAG. Nếu bạn đã cấu hình PVC trong phần values.yaml, các file sẽ được mount vào /opt/airflow/dags.
Đường dẫn file DAG mẫu: ./dags/iceberg_etl_pipeline.py
Viết DAG tích hợp Trino và Iceberg
DAG này sẽ thực hiện 3 bước chính: (1) Lấy dữ liệu từ nguồn (ví dụ CSV trên S3/GCS), (2) Chạy lệnh SQL qua Trino để chuyển đổi và ghi vào Iceberg Table, (3) Kích hoạt quá trình Optimize.
from airflow import DAG
from airflow.providers.trino.operators.trino import TrinoOperator
from airflow.providers.iceberg.operators.iceberg import IcebergOperator
from datetime import datetime, timedelta
import os
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'iceberg_etl_refresh',
default_args=default_args,
description='ETL Pipeline để nạp dữ liệu vào Apache Iceberg qua Trino',
schedule_interval='@daily',
start_date=datetime(2023, 10, 1),
catchup=False,
tags=['iceberg', 'trino', 'etl']
)
# Task 1: Chuẩn bị dữ liệu nguồn (Giả sử dữ liệu đã nằm trên S3)
# Trong thực tế, bạn có thể dùng S3Hook để tải file lên hoặc mount volume
prep_data_task = TrinoOperator(
task_id='prep_source_data',
trino_conn_id='trino_catalog',
sql="""
CREATE TABLE IF NOT EXISTS iceberg_catalog.staging.sales_staging (
transaction_id VARCHAR,
amount DOUBLE,
created_at TIMESTAMP
) WITH (
format = 'PARQUET',
location = 's3://my-bucket/staging/sales/'
);
""",
dag=dag
)
# Task 2: Chuyển đổi và ghi vào Iceberg Table chính thức
# Sử dụng Trino để insert data từ staging vào Iceberg Table
write_iceberg_task = TrinoOperator(
task_id='write_to_iceberg',
trino_conn_id='trino_catalog',
sql="""
INSERT INTO iceberg_catalog.production.sales_history
SELECT
transaction_id,
amount,
created_at
FROM iceberg_catalog.staging.sales_staging;
""",
dag=dag
)
# Task 3: Tối ưu hóa Iceberg Table (Expire Snapshots & Rewrite Data Files)
# Iceberg tự động quản lý snapshots, nhưng ta có thể trigger optimize thủ công nếu cần
optimize_task = TrinoOperator(
task_id='optimize_iceberg_table',
trino_conn_id='trino_catalog',
sql="""
OPTIMIZE iceberg_catalog.production.sales_history
WHERE true
USING ZORDER (transaction_id);
""",
dag=dag
)
# Task 4: Dọn dẹp dữ liệu staging
cleanup_task = TrinoOperator(
task_id='cleanup_staging',
trino_conn_id='trino_catalog',
sql="""
DROP TABLE IF EXISTS iceberg_catalog.staging.sales_staging;
""",
dag=dag
)
prep_data_task >> write_iceberg_task >> optimize_task >> cleanup_task
File DAG được lưu vào thư mục dags. Airflow Scheduler sẽ tự động phát hiện và parse file này.
Verify kết quả DAG
Vào Airflow UI, chọn DAG iceberg_etl_refresh.
Nhấn nút Play (Run DAG) để kích hoạt chạy thủ công lần đầu.
Kiểm tra tab Grid hoặc Graph để xem trạng thái các task chuyển sang Success (màu xanh lá).
Chạy lệnh kiểm tra snapshot mới trên Trino CLI để xác nhận dữ liệu đã được ghi:
trino-cli --catalog iceberg_catalog --query "SELECT count(*) FROM production.sales_history"
Truy vấn trả về số lượng dòng dữ liệu > 0.
Lên lịch chạy các tác vụ Refresh Snapshot và Optimize
Điều chỉnh Schedule và Backfill
Để tự động hóa quá trình, ta cần cấu hình lịch chạy (Schedule Interval) và xử lý dữ liệu quá khứ (Backfill).
Sửa tham số schedule_interval trong file DAG để chạy theo lịch mong muốn, ví dụ mỗi giờ:
schedule_interval='0 * * * *'
DAG sẽ tự động trigger vào đầu giờ.
Để chạy lại lịch sử cho các ngày trước đó (Backfill), sử dụng command CLI của Airflow:
airflow dags backfill -s 2023-10-01 -e 2023-10-07 iceberg_etl_refresh
Command này sẽ kích hoạt các run DAG cho khoảng thời gian từ 1/10 đến 7/10.
Tích hợp tác vụ Expire Snapshots tự động
Để ngăn Iceberg Table bị phình to do quá nhiều snapshots cũ, ta thêm task Expire Snapshots vào DAG hoặc tạo DAG riêng chạy định kỳ (ví dụ hàng tuần).
Thêm task vào file DAG cũ hoặc tạo file mới ./dags/iceberg_maintenance.py:
from airflow import DAG
from airflow.providers.trino.operators.trino import TrinoOperator
from datetime import datetime, timedelta
dag_maintenance = DAG(
'iceberg_snapshot_maintenance',
schedule_interval='0 0 * * 0', # Chạy vào Chủ Nhật hàng tuần
start_date=datetime(2023, 10, 1),
catchup=False
)
expire_snapshots_task = TrinoOperator(
task_id='expire_old_snapshots',
trino_conn_id='trino_catalog',
sql="""
EXPIRE SNAPSHOT iceberg_catalog.production.sales_history
RETAIN 3;
""",
dag=dag_maintenance
)
# Task này giữ lại 3 snapshots gần nhất, xóa các snapshot cũ hơn
expire_snapshots_task
DAG maintenance sẽ tự động chạy vào Chủ Nhật để dọn dẹp các snapshot không cần thiết, giúp tiết kiệm dung lượng lưu trữ và cải thiện hiệu năng truy vấn.
Verify kết quả lịch trình
Truy cập Airflow UI, vào tab Calendar của DAG iceberg_snapshot_maintenance.
Bạn sẽ thấy các ngày Chủ Nhật được đánh dấu là các run đã lên lịch.
Kiểm tra số lượng snapshot của bảng sau khi task chạy bằng lệnh:
trino-cli --catalog iceberg_catalog --query "SELECT count(*) FROM iceberg_catalog.information_schema.snapshots WHERE table_name = 'sales_history'"
Kết quả trả về số lượng snapshot <= 3.
Giám sát trạng thái pipeline và xử lý lỗi tự động
Cấu hình Alert và Logging
Để đảm bảo pipeline hoạt động ổn định, ta cần cấu hình gửi cảnh báo khi task thất bại và tập trung log để debug.
Trong Airflow UI, vào Admin > Emails để cấu hình SMTP server gửi email cảnh báo.
Sửa file ./airflow-values.yaml để bật thông báo email khi lỗi:
airflowConfigValues:
core:
email_on_failure: 'True'
email_on_retry: 'False'
emails:
smtp_host: 'smtp.example.com'
smtp_user: 'airflow-alerts'
smtp_port: '587'
smtp_ssl: 'True'
smtp_starttls: 'True'
smtp_password: 'secret-pwd'
smtp_mail_from: 'airflow-alerts@example.com'
Khởi động lại Airflow để áp dụng cấu hình mới:
helm upgrade airflow apache-airflow/airflow -f ./airflow-values.yaml -n airflow
Hệ thống sẽ gửi email cho admin khi có task bị lỗi.
Xử lý lỗi tự động (Retry & On Failure Callback)
Để xử lý lỗi tạm thời (ví dụ mất mạng, Trino treo), ta cấu hình retry trong DAG. Nếu retry hết số lần vẫn lỗi, ta có thể kích hoạt một task "On Failure" để ghi log vào bảng lỗi hoặc gửi thông báo Slack.
Cấu hình retry trong DAG đã viết ở phần trước:
'retries': 3,
'retry_delay': timedelta(minutes=2),
'retry_exponential_backoff': True,
Thêm callback để xử lý khi lỗi không thể khắc phục:
def on_failure_callback(context):
task_instance = context['task_instance']
print(f"Task {task_instance.task_id} failed. Error: {context['exception']}")
# Logic gửi thông báo Slack hoặc ghi log vào DB
default_args = {
...
'on_failure_callback': on_failure_callback,
}
Khi task bị lỗi sau 3 lần retry, hàm on_failure_callback sẽ được gọi để thực hiện các hành động dọn dẹp hoặc cảnh báo nâng cao.
Giám sát qua Flower và Prometheus
Sử dụng Flower để giám sát thời gian thực các task đang chạy.
Bật Flower trong airflow-values.yaml:
flower:
enabled: true
servicePort: 5555
basicAuth:
username: "admin"
password: "flower-pwd"
Expose Flower service:
kubectl port-forward svc/airflow-flower 5555:5555 -n airflow
Truy cập http://localhost:5555 để xem biểu đồ task đang chạy, thời gian chạy và trạng thái queue.
Để tích hợp với Prometheus (nếu có monitoring stack), Airflow đã có exporter sẵn.
Thêm vào airflow-values.yaml:
metrics:
enabled: true
port: 9102
Prometheus sẽ scrape metrics tại port 9102 của pod scheduler và webserver để vẽ biểu đồ hiệu năng pipeline.
Verify kết quả giám sát
Tạo một task giả bị lỗi (ví dụ: chạy lệnh exit 1 trong TrinoOperator) để test cơ chế retry và email.
sql="SELECT 1/0"
Kiểm tra:
- Task trong Airflow UI hiển thị trạng thái Failed sau 3 lần retry.
- Email cảnh báo được gửi đến hộp thư đã cấu hình.
- Flower hiển thị task đó đang ở trạng thái "Failed" hoặc "Retry" với thời gian chờ.
- Log trong tab Logs của task hiển thị chi tiết lỗi.
Điều hướng series:
Mục lục: Series: Xây dựng nền tảng Data Fabric hiện đại với Apache Iceberg, Trino và Kubernetes
« Phần 6: Bảo mật và kiểm soát truy cập cho nền tảng Data Fabric
Phần 8: Giám sát, Logging và Troubleshooting nâng cao »