Cấu hình môi trường Spark để ghi dữ liệu vào Iceberg
Bước đầu tiên là chuẩn bị môi trường Spark Executor có thể tương tác trực tiếp với Iceberg Catalog. Chúng ta sẽ sử dụng Spark 3.5+ để tận dụng native support cho Iceberg format.
Mục đích: Cài đặt các dependency cần thiết (Iceberg Spark Runtime và REST Catalog) vào thư mục Spark để Spark có thể đọc/viết bảng Iceberg qua REST Catalog đã triển khai ở Phần 2.
Kết quả mong đợi: Spark Submit có thể nhận tham số `--conf` để chỉ định catalog và thực hiện ghi dữ liệu mà không cần compile lại code.
Trước tiên, tải các JAR file cần thiết vào thư mục lib của Spark (thường là `/opt/spark/jars` hoặc `$SPARK_HOME/jars`).
cd /opt/spark/jars
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.4.0/iceberg-spark-runtime-3.5_2.12-1.4.0.jar
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-rest-1.4.0.jar
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-api-1.4.0.jar
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-parquet-1.4.0.jar
Verify: Kiểm tra file tồn tại và size hợp lệ.
ls -lh /opt/spark/jars | grep iceberg
Tiếp theo, tạo file cấu hình Spark (`spark-defaults.conf`) để định nghĩa catalog mặc định, tránh phải gõ dài dòng trong command line.
cat > /opt/spark/conf/spark-defaults.conf
Kết quả mong đợi: File config được tạo, Spark sẽ tự động load catalog `iceberg_catalog` khi khởi động.
Verify kết quả cấu hình Spark
Chạy Spark shell để kiểm tra catalog đã được register chưa.
spark-shell --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Trong Spark Shell, thực hiện lệnh:
spark.catalog.listTables("iceberg_catalog")
Output sẽ hiển thị danh sách catalog hoặc empty list nếu chưa có bảng nào, nhưng không báo lỗi catalog not found.
Ghi dữ liệu vào Iceberg Table bằng Spark
Sử dụng Spark DataFrame API để tạo bảng mới và ghi dữ liệu ban đầu. Chúng ta sẽ tạo một bảng đơn giản chứa thông tin giao dịch (transactions).
Mục đích: Thiết lập quy trình ETL cơ bản: tạo bảng (Create Table), chèn dữ liệu (Insert), và cập nhật dữ liệu (Update).
Kết quả mong đợi: Bảng `transactions` được tạo trong Iceberg Catalog và chứa dữ liệu Parquet.
Tạo script Spark Python (`write_iceberg.py`) để thực hiện ghi dữ liệu.
cat > /home/ubuntu/write_iceberg.py
Chạy script để thực hiện ghi dữ liệu.
spark-submit --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions /home/ubuntu/write_iceberg.py
Kết quả mong đợi: Script chạy xong, in ra các thông báo thành công, không có lỗi exception.
Verify kết quả ghi dữ liệu
Sử dụng Trino CLI (đã deploy ở Phần 3) để kiểm tra dữ liệu đã ghi.
trino --catalog iceberg_catalog --schema transactions -c "SELECT * FROM transactions;"
Output phải hiển thị 6 dòng dữ liệu, trong đó dòng `transaction_id = 1002` có `status` là `COMPLETED` (đã được update).
Thực hiện Upsert và Merge vào Iceberg
Iceberg hỗ trợ thao tác `MERGE` (Upsert) để đồng bộ dữ liệu từ source vào table đích: nếu key tồn tại thì update, nếu không tồn tại thì insert.
Mục đích: Giải quyết bài toán CDC (Change Data Capture) hoặc đồng bộ data từ hệ thống source vào Data Lake.
Kết quả mong đợi: Dữ liệu mới được thêm vào, dữ liệu cũ có key trùng được cập nhật, dữ liệu không khớp key được thêm mới.
Tạo script Spark (`merge_iceberg.py`) để thực hiện thao tác MERGE. Lưu ý: Thao tác này yêu cầu chỉ định Primary Key (Unique Key) để xác định record nào cần update.
cat > /home/ubuntu/merge_iceberg.py 160
(1007, "CUST_004", 999.99, "2023-10-01 12:00:00", "PENDING") # Insert mới
]
source_df = spark.createDataFrame(source_data, source_schema)
# Thực hiện Merge
# Syntax: df.merge(target_df, on="key").whenMatchedUpdate(...).whenNotMatchedInsert(...)
source_df.merge(spark.table("iceberg_catalog.transactions"),
on="transaction_id",
how="inner") \
.whenMatchedUpdate(
{
"amount": "_source.amount",
"status": "_source.status"
}
) \
.whenNotMatchedInsert(
{
"transaction_id": "_source.transaction_id",
"customer_id": "_source.customer_id",
"amount": "_source.amount",
"created_at": "_source.created_at",
"status": "_source.status"
}
) \
.execute()
print("Merge operation completed successfully.")
spark.stop()
EOF
Chạy script merge.
spark-submit --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions /home/ubuntu/merge_iceberg.py
Kết quả mong đợi: Không có lỗi, script in thông báo thành công.
Verify kết quả Merge
Kiểm tra dữ liệu sau merge qua Trino.
trino --catalog iceberg_catalog --schema transactions -c "SELECT transaction_id, amount, status FROM transactions ORDER BY transaction_id;"
Output kiểm tra:
Dòng `1001`: `amount` phải là `160.00` (đã update).
Dòng `1007`: Phải xuất hiện với `amount` `999.99` (đã insert).
Dòng `1002`: Vẫn giữ nguyên `amount` `200.00` và `status` `COMPLETED` (không bị xóa hay thay đổi).
Truy vấn ACID và quản lý Snapshot trong Iceberg
Iceberg sử dụng cơ chế Snapshot để đảm bảo tính ACID. Mỗi lần ghi (Write) sẽ tạo ra một Snapshot mới. Chúng ta sẽ kiểm tra lịch sử các phiên bản này.
Mục đích: Hiểu cách Iceberg quản lý lịch sử dữ liệu và truy vấn thông tin metadata của các snapshot.
Kết quả mong đợi: Xem được danh sách các snapshot, thời điểm tạo và số lượng record trong mỗi snapshot.
Sử dụng Trino để truy vấn thông tin snapshot của bảng.
trino --catalog iceberg_catalog --schema transactions -c "SELECT * FROM iceberg_catalog.information_schema.iceberg_snapshots WHERE table_name = 'transactions' ORDER BY committed_at DESC;"
Output sẽ hiển thị danh sách các snapshot ID, thời gian commit (`committed_at`), và số lượng file/record.
Để xem chi tiết hơn về lịch sử thay đổi (history) của bảng, bao gồm tác giả (operation type: append, update, merge, delete).
trino --catalog iceberg_catalog --schema transactions -c "SELECT * FROM iceberg_catalog.information_schema.iceberg_history WHERE table_name = 'transactions' ORDER BY committed_at DESC;"
Kết quả mong đợi: Thấy rõ các dòng ghi nhận operation `append` (từ bước ghi dữ liệu đầu), `update` (từ bước update status), và `merge` (từ bước upsert).
Verify kết quả quản lý Snapshot
Đếm số snapshot hiện tại.
trino --catalog iceberg_catalog --schema transactions -c "SELECT COUNT(*) FROM iceberg_catalog.information_schema.iceberg_snapshots WHERE table_name = 'transactions';"
Output phải lớn hơn 1 (ít nhất là 3 snapshots tương ứng với 3 thao tác ghi ở trên).
Thực hiện Time Travel để xem dữ liệu quá khứ
Tính năng mạnh mẽ nhất của Iceberg là Time Travel: khả năng truy vấn dữ liệu tại một thời điểm trong quá khứ hoặc tại một snapshot ID cụ thể.
Mục đích: Kiểm tra dữ liệu trước khi các thay đổi (update/merge) xảy ra, phục vụ cho việc debug hoặc khôi phục dữ liệu.
Kết quả mong đợi: Truy vấn trả về dữ liệu ở trạng thái cũ, khác với dữ liệu hiện tại.
Phương pháp 1: Time Travel theo thời gian (Timestamp).
Giả sử thao tác `UPDATE` (thay đổi status của ID 1002) xảy ra vào `2023-10-01 10:15:00`. Nếu chúng ta query vào thời điểm `2023-10-01 10:12:00`, ta sẽ thấy status của ID 1002 vẫn là `PENDING`.
trino --catalog iceberg_catalog --schema transactions -c "SELECT transaction_id, status FROM transactions AS OF TIMESTAMP '2023-10-01 10:12:00' WHERE transaction_id = 1002;"
Output mong đợi: `status` phải là `PENDING` (trạng thái trước khi update).
Phương pháp 2: Time Travel theo Snapshot ID.
Đầu tiên, lấy Snapshot ID của trạng thái trước khi update từ query history ở phần trên (giả sử snapshot ID là 100).
trino --catalog iceberg_catalog --schema transactions -c "SELECT transaction_id, status FROM transactions AS OF SNAPSHOT 100 WHERE transaction_id = 1002;"
Output mong đợi: Tương tự như trên, trả về trạng thái cũ.
Verify kết quả Time Travel
So sánh dữ liệu hiện tại và dữ liệu quá khứ của cùng một record.
trino --catalog iceberg_catalog --schema transactions -c "
SELECT 'CURRENT' as version, transaction_id, status FROM iceberg_catalog.transactions WHERE transaction_id = 1002
UNION ALL
SELECT 'PAST' as version, transaction_id, status FROM iceberg_catalog.transactions AS OF TIMESTAMP '2023-10-01 10:12:00' WHERE transaction_id = 1002;
"
Output phải hiển thị 2 dòng:
Hàng 1: `version='CURRENT'`, `status='COMPLETED'`.
Hàng 2: `version='PAST'`, `status='PENDING'`.
Điều này chứng tỏ tính năng Time Travel hoạt động chính xác.
Điều hướng series:
Mục lục: Series: Xây dựng nền tảng Data Fabric hiện đại với Apache Iceberg, Trino và Kubernetes
« Phần 3: Triển khai Trino trên Kubernetes để truy vấn dữ liệu Iceberg
Phần 4: Quản lý dữ liệu: Ghi, đọc và cập nhật dữ liệu Iceberg »