Thiết kế Pipeline ETL Serverless với Python và DuckDB
Chúng ta sẽ xây dựng một script Python đơn giản nhưng đầy đủ chức năng để thực hiện quy trình ETL: Extract (trích xuất), Transform (biến đổi), và Load (tải) vào Iceberg table.
Mục tiêu là tạo một file script độc lập, không phụ thuộc vào framework nặng nề như Airflow, giúp chạy trực tiếp trên server Ubuntu hoặc container serverless.
Tạo file script etl_pipeline.py trong thư mục làm việc của dự án với nội dung sau:
#!/usr/bin/env python3
import duckdb
import pandas as pd
import sys
from datetime import datetime
# Cấu hình
INPUT_PATH = "s3://my-bucket/raw/sales_2024.csv" # Đường dẫn nguồn (S3 hoặc local file)
CATALOG_PATH = "./iceberg_catalog" # Đường dẫn local catalog
WAREHOUSE_PATH = "./iceberg_warehouse" # Đường dẫn warehouse Iceberg
TABLE_NAME = "sales_transformed" # Tên bảng đích
def run_etl():
# 1. Khởi tạo kết nối DuckDB
con = duckdb.connect(database=":memory:")
# 2. Đăng ký extension Iceberg
try:
con.execute("INSTALL iceberg")
con.execute("LOAD iceberg")
print("[INFO] Iceberg extension loaded successfully.")
except Exception as e:
print(f"[ERROR] Failed to load Iceberg extension: {e}")
sys.exit(1)
# 3. Thiết lập Catalog
con.execute(f"""
CREATE ICEBERG CATALOG my_catalog (
'warehouse' = '{WAREHOUSE_PATH}',
'catalog_impl' = 'io.github.iceberg.rest.catalog.RestCatalog',
'uri' = 'http://localhost:8080'
)
""")
# Lưu ý: Trong môi trường local thực tế không có server REST,
# chúng ta sẽ dùng 'HadoopCatalog' hoặc 'FileSystemCatalog' nếu chạy local thuần.
# Để đơn giản cho bài này, giả sử dùng FileSystemCatalog trực tiếp:
con.execute(f"""
CREATE ICEBERG CATALOG file_catalog (
'warehouse' = '{WAREHOUSE_PATH}',
'catalog_impl' = 'org.apache.iceberg.hadoop.HadoopCatalog'
)
""")
# 4. EXTRACT: Đọc dữ liệu thô
print(f"[INFO] Extracting data from {INPUT_PATH}...")
# DuckDB tự động đọc CSV từ S3 hoặc Local nếu có quyền truy cập
raw_df = con.execute(f"SELECT * FROM read_csv_auto('{INPUT_PATH}')").fetchdf()
print(f"[INFO] Loaded {len(raw_df)} rows.")
# 5. TRANSFORM: Biến đổi dữ liệu
print("[INFO] Applying transformations...")
# Chuyển đổi kiểu dữ liệu
raw_df['amount'] = raw_df['amount'].astype(float)
raw_df['transaction_date'] = pd.to_datetime(raw_df['transaction_date'])
# Lọc dữ liệu (chỉ giữ giao dịch thành công)
transformed_df = raw_df[raw_df['status'] == 'SUCCESS'].copy()
# Thêm cột timestamp thời điểm ETL
transformed_df['etl_timestamp'] = datetime.now()
# Tính toán cột mới (ví dụ: nhóm giao dịch theo ngày)
transformed_df['transaction_day'] = transformed_df['transaction_date'].dt.strftime('%Y-%m-%d')
# 6. LOAD: Viết vào Iceberg Table
print(f"[INFO] Loading {len(transformed_df)} rows to Iceberg table '{TABLE_NAME}'...")
# Tạo bảng nếu chưa tồn tại với partitioning
con.execute(f"""
CREATE TABLE IF NOT EXISTS file_catalog.my_db.{TABLE_NAME} (
id BIGINT,
customer_name VARCHAR,
amount DOUBLE,
transaction_date TIMESTAMP,
status VARCHAR,
etl_timestamp TIMESTAMP,
transaction_day VARCHAR
) PARTITIONED BY (transaction_day)
""")
# Append dữ liệu (DuckDB tự động xử lý merge nếu cần, ở đây dùng INSERT)
con.execute(f"""
INSERT INTO file_catalog.my_db.{TABLE_NAME}
SELECT * FROM '{transformed_df}'
""")
# Commit và đóng kết nối
con.close()
print("[SUCCESS] ETL Pipeline completed successfully.")
if __name__ == "__main__":
run_etl()
Kết quả mong đợi: Script được tạo sẵn với logic đầy đủ từ đọc file CSV, làm sạch dữ liệu, thêm metadata, và ghi vào bảng Iceberg có phân vùng (partitioned by date).
Thiết lập biến môi trường và chạy thử script
Để script chạy được, bạn cần cấp quyền thực thi và đảm bảo thư mục warehouse tồn tại.
Thực hiện các lệnh sau trong terminal Ubuntu:
chmod +x etl_pipeline.py
mkdir -p ./iceberg_warehouse ./iceberg_catalog
Kết quả mong đợi: Không có lỗi, thư mục mới được tạo sẵn sàng cho việc lưu trữ dữ liệu.
Chạy script với dữ liệu mẫu (giả sử bạn đã có file sales_2024.csv ở thư mục local):
python3 etl_pipeline.py
Kết quả mong đợi: Xuất hiện log "[SUCCESS] ETL Pipeline completed successfully." trên terminal.
Verify kết quả ETL trên Iceberg Table
Sau khi script chạy xong, bạn cần xác minh dữ liệu đã được ghi đúng vào Iceberg table và cấu trúc partition đã hoạt động.
Chạy lệnh DuckDB CLI để kiểm tra số lượng dòng và cấu trúc bảng:
duckdb -c "INSTALL iceberg; LOAD iceberg; CREATE ICEBERG CATALOG file_catalog ('warehouse' = './iceberg_warehouse', 'catalog_impl' = 'org.apache.iceberg.hadoop.HadoopCatalog'); SELECT COUNT(*) FROM file_catalog.my_db.sales_transformed;"
Kết quả mong đợi: Trả về số lượng dòng tương ứng với dữ liệu đã lọc (chỉ các dòng status = SUCCESS).
Để kiểm tra cấu trúc partition (metadata), sử dụng lệnh:
duckdb -c "INSTALL iceberg; LOAD iceberg; CREATE ICEBERG CATALOG file_catalog ('warehouse' = './iceberg_warehouse', 'catalog_impl' = 'org.apache.iceberg.hadoop.HadoopCatalog'); SHOW PARTITIONS FROM file_catalog.my_db.sales_transformed;"
Kết quả mong đợi: Liệt kê các giá trị partition (ví dụ: transaction_day = '2024-01-01', '2024-01-02',...).
Tự động hóa quy trình ETL với Cron Job
Để chuyển từ script thủ công sang quy trình tự động (Serverless/On-premise), chúng ta sẽ sử dụng Cron để chạy script định kỳ.
Mục tiêu: Chạy script ETL vào lúc 2:00 sáng mỗi ngày để xử lý dữ liệu mới từ ngày hôm trước.
1. Tạo script wrapper để log và xử lý lỗi
Trực tiếp chạy script trong Cron có thể gặp vấn đề về biến môi trường (PATH). Ta tạo một wrapper script bash để đảm bảo môi trường sạch và ghi log.
Tạo file /usr/local/bin/run_etl.sh:
#!/bin/bash
# Wrapper script cho ETL Pipeline
# Định nghĩa biến môi trường cần thiết
export PATH=/usr/local/bin:/usr/bin:/bin
export HOME=/home/ubuntu
export PROJECT_DIR=/opt/data-lakehouse
# Chuyển sang thư mục dự án
cd $PROJECT_DIR
# Đường dẫn file log
LOG_FILE=$PROJECT_DIR/logs/etl_$(date +\%F).log
# Chạy script Python và ghi log (stdout & stderr)
echo "[$(date)] Starting ETL Pipeline..." >> $LOG_FILE
python3 etl_pipeline.py >> $LOG_FILE 2>&1
EXIT_CODE=$?
# Kiểm tra kết quả
if [ $EXIT_CODE -eq 0 ]; then
echo "[$(date)] ETL Pipeline completed successfully." >> $LOG_FILE
else
echo "[$(date)] ETL Pipeline FAILED with exit code $EXIT_CODE." >> $LOG_FILE
# Ở đây có thể thêm lệnh gửi email cảnh báo
fi
exit $EXIT_CODE
Cấp quyền thực thi cho wrapper script:
chmod +x /usr/local/bin/run_etl.sh
Tạo thư mục log nếu chưa có:
mkdir -p /opt/data-lakehouse/logs
Kết quả mong đợi: Wrapper script sẵn sàng, có khả năng ghi log chi tiết và xử lý lỗi.
2. Cấu hình Cron Job
Mở crontab của user hiện tại để thêm job định kỳ.
crontab -e
Khi file crontab mở ra (thường là trong nano hoặc vi), thêm dòng sau vào cuối file:
0 2 * * * /usr/local/bin/run_etl.sh
Giải thích cấu trúc Cron:
- 0: Phút thứ 0.
- 2: Giờ 2 AM.
- * * *: Mỗi ngày, mỗi tháng, mọi thứ 7 trong tuần.
- /usr/local/bin/run_etl.sh: Đường dẫn đầy đủ đến script wrapper.
Lưu file và thoát (Ctrl+O, Enter, Ctrl+X nếu dùng nano).
Kiểm tra cron job đã được thêm chưa:
crontab -l
Kết quả mong đợi: Hiển thị dòng job vừa thêm trong danh sách.
3. Test Cron Job thủ công
Để đảm bảo Cron hoạt động đúng mà không phải chờ đến 2h sáng, hãy chạy thử wrapper script một lần.
/usr/local/bin/run_etl.sh
Kiểm tra file log được tạo ra:
cat /opt/data-lakehouse/logs/etl_$(date +%F).log
Kết quả mong đợi: File log xuất hiện và chứa nội dung "[SUCCESS] ETL Pipeline completed successfully." hoặc thông báo lỗi rõ ràng nếu có.
4. Kiểm tra log hệ thống Cron
Nếu bạn muốn xem log hệ thống về việc Cron đã kích hoạt job chưa, hãy xem file log của hệ thống.
grep CRON /var/log/syslog | tail -n 20
Kết quả mong đợi: Có dòng log xác nhận Cron đã chạy script vào thời điểm đã định.
Điều hướng series:
Mục lục: Series: Xây dựng Data Lakehouse Serverless với DuckDB, Apache Iceberg và Ubuntu 24.04
« Phần 4: Kết nối DuckDB với Apache Iceberg để truy vấn dữ liệu phân tán
Phần 6: Quản lý phiên bản dữ liệu và Time Travel trong Iceberg »