1. Xây dựng Pipeline ETL với PySpark để nạp dữ liệu vào Iceberg
Bước đầu tiên là tạo script Python để đọc dữ liệu thô từ file JSON hoặc CSV và ghi trực tiếp vào bảng Iceberg trên S3 hoặc Local FileSystem đã cấu hình ở các phần trước.
Mục đích của bước này là thiết lập luồng dữ liệu cơ bản, đảm bảo Spark có thể nhận diện catalog của Iceberg và ghi dữ liệu theo định dạng Iceberg thay vì Parquet thông thường.
Kết quả mong đợi là một bảng Iceberg mới được tạo tự động trong catalog, chứa dữ liệu từ file nguồn, có thể truy vấn ngay lập tức.
Trước tiên, hãy tạo file Python script tên là etl_pipeline.py tại thư mục /opt/data-lakehouse/scripts/ với nội dung hoàn chỉnh sau:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp
# Khởi tạo SparkSession với Iceberg
spark = (
SparkSession.builder
.appName("IcebergETLPipeline")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", "file:///var/iceberg/warehouse")
.config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
)
# Đọc dữ liệu nguồn (giả sử file JSON đã có sẵn)
input_path = "/var/data-lakehouse/raw/sales_data.json"
df = spark.read.json(input_path)
# Chuẩn hóa dữ liệu (ví dụ: thêm cột timestamp, đổi tên cột)
df_transformed = df.withColumn("ingestion_time", current_timestamp()) \
.withColumnRenamed("sale_id", "id") \
.withColumnRenamed("amount", "total_amount")
# Ghi vào bảng Iceberg (sẽ tự động tạo bảng nếu chưa tồn tại)
table_name = "default.sales_history"
df_transformed.writeTo(table_name).createOrReplace()
print(f"ETL Pipeline completed. Data written to table: {table_name}")
spark.stop()
Chạy script trên server bằng lệnh dưới đây để kích hoạt pipeline:
cd /opt/data-lakehouse/scripts
spark-submit --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 etl_pipeline.py
Để xác minh kết quả, hãy truy vấn bảng vừa tạo thông qua Spark SQL hoặc Trino:
spark-sql --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog" --conf "spark.sql.catalog.spark_catalog.type=hadoop" --conf "spark.sql.catalog.spark_catalog.warehouse=file:///var/iceberg/warehouse" --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" -e "SELECT COUNT(*) FROM default.sales_history"
Bạn sẽ thấy số lượng dòng dữ liệu trả về tương ứng với số dòng trong file JSON nguồn, xác nhận dữ liệu đã được nạp thành công vào Iceberg.
2. Áp dụng Schema Evolution (Thay đổi cấu trúc bảng)
Iceberg cho phép thay đổi schema của bảng mà không cần ghi lại toàn bộ dữ liệu (rewrite). Đây là tính năng cốt lõi giúp Data Lakehouse linh hoạt hơn Data Lake truyền thống.
Bạn cần thực hiện việc thêm cột mới hoặc sửa kiểu dữ liệu của cột hiện có để Iceberg tự động cập nhật metadata file của bảng.
Kết quả mong đợi là bảng Iceberg chấp nhận các truy vấn trên cột mới hoặc cột đã sửa kiểu mà không bị lỗi, và dữ liệu cũ vẫn hiển thị đúng (hoặc null nếu là cột mới).
Trước khi chạy lệnh, hãy tạo một file JSON mới có thêm cột discount_percent để mô phỏng dữ liệu mới với schema khác:
cat > /var/data-lakehouse/raw/sales_data_v2.json
Sử dụng Spark để cập nhật schema của bảng sales_history bằng cách thêm cột discount_percent vào bảng hiện có:
spark-sql --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog" --conf "spark.sql.catalog.spark_catalog.type=hadoop" --conf "spark.sql.catalog.spark_catalog.warehouse=file:///var/iceberg/warehouse" --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" -e "ALTER TABLE default.sales_history ADD COLUMN discount_percent DOUBLE"
Tiếp theo, hãy chạy lại pipeline ETL với file dữ liệu mới (v2) để ghi thêm dữ liệu có cột mới vào cùng một bảng:
sed -i 's|/var/data-lakehouse/raw/sales_data.json|/var/data-lakehouse/raw/sales_data_v2.json|' /opt/data-lakehouse/scripts/etl_pipeline.py
cd /opt/data-lakehouse/scripts && spark-submit --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 etl_pipeline.py
Để verify kết quả, hãy truy vấn bảng và kiểm tra xem dữ liệu cũ (không có discount) và dữ liệu mới (có discount) có cùng tồn tại và hiển thị chính xác không:
spark-sql --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog" --conf "spark.sql.catalog.spark_catalog.type=hadoop" --conf "spark.sql.catalog.spark_catalog.warehouse=file:///var/iceberg/warehouse" --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" -e "SELECT id, total_amount, discount_percent FROM default.sales_history"
Bạn sẽ thấy các dòng dữ liệu từ batch đầu tiên có giá trị NULL ở cột discount_percent, trong khi các dòng từ batch thứ hai có giá trị số thực tế, chứng tỏ Schema Evolution hoạt động đúng.
3. Quản lý Version dữ liệu thông qua Iceberg Snapshots
Mỗi lần ghi dữ liệu (write) vào bảng Iceberg sẽ tạo ra một Snapshot mới. Snapshot là một bản sao của trạng thái bảng tại một thời điểm cụ thể, bao gồm danh sách các file data và metadata.
Mục đích của việc này là duy trì lịch sử các phiên bản của dữ liệu, cho phép bạn nhìn lại cấu trúc hoặc nội dung dữ liệu ở quá khứ bất kỳ.
Kết quả mong đợi là bạn có thể liệt kê được các snapshot đã tạo ra và nhìn thấy thời gian (timestamp) cùng ID của từng phiên bản.
Sử dụng lệnh Spark SQL để liệt kê lịch sử snapshots của bảng sales_history:
spark-sql --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog" --conf "spark.sql.catalog.spark_catalog.type=hadoop" --conf "spark.sql.catalog.spark_catalog.warehouse=file:///var/iceberg/warehouse" --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" -e "CALL spark_catalog.system.list_snapshots('default', 'sales_history')"
Hoặc nếu bạn muốn xem chi tiết hơn về số lượng file và kích thước của mỗi snapshot, hãy sử dụng hàm history (tùy thuộc vào phiên bản Iceberg, nhưng list_snapshots là chuẩn nhất cho Spark):
spark-sql --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog" --conf "spark.sql.catalog.spark_catalog.type=hadoop" --conf "spark.sql.catalog.spark_catalog.warehouse=file:///var/iceberg/warehouse" --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" -e "SELECT snapshot_id, schema_id, parent_snapshot_id, summary FROM spark_catalog.default.sales_history.snapshots"
Bạn sẽ nhận được một bảng kết quả chứa danh sách các snapshot_id, timestamp và operation (append, overwrite, delete). Lưu lại snapshot_id của phiên bản trước khi bạn thêm cột discount_percent để dùng trong bước tiếp theo.
4. Thực hiện Time Travel để truy vấn dữ liệu theo thời điểm cũ
Time Travel là tính năng cho phép bạn truy vấn dữ liệu của bảng như thể nó ở tại một thời điểm hoặc snapshot cụ thể trong quá khứ. Điều này cực kỳ hữu ích để rollback dữ liệu sai hoặc phân tích xu hướng theo thời gian.
Bạn sẽ sử dụng cú pháp FOR VERSION AS OF hoặc FOR SYSTEM_TIME AS OF trong câu lệnh SELECT để đưa bảng về trạng thái cũ.
Kết quả mong đợi là khi truy vấn bảng ở snapshot cũ, các cột được thêm mới (như discount_percent) sẽ không xuất hiện hoặc dữ liệu mới thêm sẽ không hiển thị.
Giả sử bạn muốn xem dữ liệu trước khi thêm cột discount_percent, hãy lấy snapshot_id từ kết quả của phần 3 (giả sử là 1001) và chạy lệnh truy vấn sau:
spark-sql --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog" --conf "spark.sql.catalog.spark_catalog.type=hadoop" --conf "spark.sql.catalog.spark_catalog.warehouse=file:///var/iceberg/warehouse" --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" -e "SELECT * FROM default.sales_history FOR VERSION AS OF 1001"
Hoặc nếu bạn muốn truy vấn theo thời gian cụ thể (ví dụ: dữ liệu tại thời điểm 10 phút trước), hãy sử dụng cú pháp timestamp:
spark-sql --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog" --conf "spark.sql.catalog.spark_catalog.type=hadoop" --conf "spark.sql.catalog.spark_catalog.warehouse=file:///var/iceberg/warehouse" --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" -e "SELECT * FROM default.sales_history FOR SYSTEM_TIME AS OF '2024-01-20 10:30:00'"
Khi chạy lệnh trên, bạn sẽ thấy kết quả chỉ bao gồm các dòng dữ liệu và các cột tồn tại tại thời điểm đó. Cột discount_percent sẽ biến mất hoàn toàn trong kết quả trả về, xác nhận tính năng Time Travel hoạt động chính xác.
Điều hướng series:
Mục lục: Series: Xây dựng Data Lakehouse với Apache Iceberg, Trino và Ubuntu 24.04
« Phần 3: Triển khai Trino (PrestoSQL) để truy vấn Data Lake
Phần 5: Tối ưu hóa hiệu năng và bảo mật cho Lakehouse »