Chuẩn bị môi trường và tạo bảng Iceberg mẫu
Cài đặt và cấu hình Spark trên Kubernetes
Để ghi dữ liệu vào Iceberg, chúng ta cần một engine xử lý phân tán. Trong môi trường Kubernetes đã được thiết lập ở phần trước, chúng ta sẽ sử dụng Spark 3.4+ với Iceberg Spark Runtime.
Trước tiên, cần đảm bảo JAR của Iceberg Spark Runtime đã được copy vào thư mục driver và executor của Spark. Nếu chưa có, hãy tạo file JAR hoặc thêm vào classpath.
Để đơn giản hóa việc chạy, chúng ta sẽ sử dụng `spark-submit` hoặc Spark Shell với các tham số cần thiết để kết nối với REST Catalog đã triển khai ở Phần 2.
Tạo file cấu hình Spark (`spark-defaults.conf`) tại `/etc/spark/conf/spark-defaults.conf` trên node Spark (hoặc trong container nếu chạy qua Spark Operator).
# /etc/spark/conf/spark-defaults.conf
spark.sql.catalog.iceberg.catalog.impl org.apache.iceberg.rest.RESTCatalog
spark.sql.catalog.iceberg.catalog.uri http://iceberg-rest-catalog.default.svc.cluster.local:8181
spark.sql.catalog.iceberg.catalog.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.iceberg.catalog.warehouse s3://my-iceberg-warehouse
spark.sql.catalog.iceberg.catalog.credentials.access-key your-access-key
spark.sql.catalog.iceberg.catalog.credentials.secret-key your-secret-key
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Kết quả mong đợi: File cấu hình được ghi, Spark biết cách kết nối tới Catalog và S3 để lưu trữ dữ liệu.
Tạo bảng Iceberg từ Spark
Bây giờ, hãy mở Spark Shell để tạo bảng mẫu. Chúng ta sẽ tạo một bảng có tên `customer_data` với các cột cơ bản và sử dụng Iceberg như một catalog.
spark-shell --conf spark.sql.catalog.iceberg.catalog.impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.iceberg.catalog.uri=http://iceberg-rest-catalog.default.svc.cluster.local:8181 \
--conf spark.sql.catalog.iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.iceberg.catalog.warehouse=s3://my-iceberg-warehouse \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg.catalog.credentials.access-key=your-access-key \
--conf spark.sql.catalog.iceberg.catalog.credentials.secret-key=your-secret-key
Trong Spark Shell, thực hiện lệnh SQL sau để tạo bảng:
CREATE TABLE iceberg.customer_data (
customer_id BIGINT,
name STRING,
email STRING,
purchase_date TIMESTAMP,
amount DOUBLE
) USING iceberg
TBLPROPERTIES (
"write.delete.default" = "true"
);
Kết quả mong đợi: Bảng `customer_data` được tạo thành công trong namespace `iceberg` của catalog REST. Không có lỗi về schema hoặc connection.
Ghi và Cập nhật dữ liệu (Upsert/Merge)
Ghi dữ liệu ban đầu vào Iceberg
Sử dụng cú pháp `INSERT INTO` của Spark để ghi dữ liệu mẫu vào bảng vừa tạo. Iceberg sẽ tự động tạo Snapshot đầu tiên.
INSERT INTO iceberg.customer_data VALUES
(1001, 'Nguyen Van A', 'a@example.com', TIMESTAMP '2023-10-01 10:00:00', 1500000.00),
(1002, 'Tran Thi B', 'b@example.com', TIMESTAMP '2023-10-02 11:30:00', 2500000.00),
(1003, 'Le Van C', 'c@example.com', TIMESTAMP '2023-10-03 14:45:00', 750000.00);
Kết quả mong đợi: Spark báo thành công ghi 3 dòng. Iceberg tạo một file data Parquet mới và cập nhật metadata (manifest list) trong snapshot.
Thực hiện Upsert và Merge vào Iceberg
Iceberg hỗ trợ Upsert (Update or Insert) thông qua cú pháp `MERGE`. Đây là tính năng quan trọng cho các use-case CDC (Change Data Capture) hoặc ETL lặp lại.
Giả sử chúng ta có một DataFrame `updates` chứa dữ liệu mới cần đồng bộ. Chúng ta sẽ thực hiện Merge dựa trên khóa chính `customer_id`.
val updates = Seq(
(1001, "Nguyen Van A (Updated)", "a_new@example.com", "2023-10-05 09:00:00", 3000000.00),
(1004, "Hoang Thi D", "d@example.com", "2023-10-04 16:00:00", 500000.00)
).toDF("customer_id", "name", "email", "purchase_date", "amount")
updates.as("src").merge(
spark.table("iceberg.customer_data").as("tgt"),
"src.customer_id = tgt.customer_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
Giải thích: `whenMatchedUpdateAll()` cập nhật toàn bộ cột nếu khóa chính trùng khớp. `whenNotMatchedInsertAll()` chèn dòng mới nếu khóa chính chưa tồn tại. Dòng 1001 sẽ được cập nhật, dòng 1004 sẽ được chèn mới.
Kết quả mong đợi: Snapshot mới được tạo. Dòng 1001 trong file Parquet cũ bị đánh dấu là "deleted" (logical delete) và dòng mới được ghi vào file mới. File cũ vẫn tồn tại cho đến khi chạy `expire_snapshots`.
Verify kết quả ghi và cập nhật
Chạy lệnh `SELECT` để kiểm tra dữ liệu hiện tại:
SELECT * FROM iceberg.customer_data;
Kết quả mong đợi: Thấy 4 dòng dữ liệu. Dòng của `1001` có thông tin mới (Updated), dòng `1004` xuất hiện mới.
Truy vấn dữ liệu qua Trino với tính năng ACID
Cấu hình Trino Catalog cho Iceberg
Để Trino truy vấn Iceberg, cần cấu hình catalog trong file `iceberg.properties` của Trino Coordinator và Worker.
Đường dẫn file: `/etc/trino/catalog/iceberg.properties` (trên container Trino).
connector.name=iceberg
hive.s3.path-style-access=true
hive.s3.endpoint=http://minio.default.svc.cluster.local:9000
hive.s3.aws-access-key=your-access-key
hive.s3.aws-secret-key=your-secret-key
hive.metastore-catalog-name=iceberg_catalog
catalog.name=iceberg
catalog.type=rest
catalog.uri=http://iceberg-rest-catalog.default.svc.cluster.local:8181
catalog.warehouse=s3://my-iceberg-warehouse
Kết quả mong đợi: Trino restart và nhận diện được catalog `iceberg` mới. Có thể thấy catalog này trong CLI Trino.
Truy vấn và kiểm chứng tính ACID
Tính năng ACID của Iceberg đảm bảo rằng khi một transaction đang ghi (write) dữ liệu, các query đọc (read) sẽ không bị block và luôn nhìn thấy dữ liệu nhất quán (consistent snapshot).
Mở Trino CLI và thực hiện truy vấn:
presto> SELECT * FROM iceberg.customer_data;
Đồng thời, ở một session khác (hoặc terminal khác), chạy lại lệnh `MERGE` của Spark để cập nhật thêm dữ liệu.
-- Chạy trong Spark Shell song song
val moreUpdates = Seq((1005, "User E", "e@example.com", "2023-10-06 12:00:00", 1000000.00)).toDF(...)
moreUpdates.as("src").merge(...).execute()
Kết quả mong đợi: Query trên Trino sẽ không bị lỗi "Table is locked" hay "Transaction conflict". Khi Spark commit transaction xong, Trino sẽ tự động nhìn thấy snapshot mới nhất mà không cần refresh thủ công (tùy vào cấu hình cache của Trino, nhưng Iceberg đảm bảo consistency).
Verify kết quả truy vấn
Chạy lại query trên Trino để đảm bảo dữ liệu mới từ Spark đã hiển thị:
presto> SELECT customer_id, name FROM iceberg.customer_data WHERE customer_id = 1005;
Kết quả mong đợi: Trả về 1 dòng dữ liệu của user 1005.
Quản lý Snapshot và Lịch sử thay đổi
Xem lịch sử Snapshot trong Spark
Iceberg lưu trữ lịch sử thay đổi dưới dạng Snapshots. Mỗi lần ghi (insert, update, merge, delete) đều tạo một snapshot mới.
Sử dụng lệnh `DESCRIBE HISTORY` trong Spark để xem log:
DESCRIBE HISTORY iceberg.customer_data;
Kết quả mong đợi: Xuất ra bảng hiển thị các cột: `timestamp`, `version`, `operation` (append, overwrite, delete), `summary` (thông tin chi tiết như number of rows added/removed).
Xem lịch sử Snapshot trong Trino
Trino cũng hỗ trợ xem lịch sử này thông qua bảng hệ thống `INFORMATION_SCHEMA` hoặc hệ thống catalog của Iceberg.
presto> SELECT * FROM iceberg.information_schema.table_partitions WHERE table_name = 'customer_data';
-- Hoặc xem snapshot history nếu connector hỗ trợ:
presto> SELECT * FROM iceberg.information_schema.table_snapshots WHERE table_name = 'customer_data';
Tùy vào phiên bản Trino, bạn có thể dùng lệnh:
presto> SHOW CREATE TABLE iceberg.customer_data;
Để xem thông tin metadata cơ bản, nhưng để xem chi tiết lịch sử, Spark `DESCRIBE HISTORY` là mạnh nhất.
Xóa Snapshot cũ (Expire Snapshots)
Để quản lý chi phí lưu trữ và giảm độ phức tạp của metadata, cần xóa các snapshot cũ không còn cần thiết. Iceberg sẽ xóa các file data Parquet không được tham chiếu bởi bất kỳ snapshot nào còn lại.
Thực hiện trong Spark Shell:
CALL iceberg.system.expire_snapshots(
'customer_data',
TIMESTAMP '2023-01-01 00:00:00'
);
Lệnh này giữ lại snapshot gần nhất và xóa tất cả snapshot trước ngày 01/01/2023. Lưu ý: Điều này sẽ xóa dữ liệu lịch sử (Time Travel không còn khả thi cho các thời điểm trước đó).
Kết quả mong đợi: Spark báo số lượng snapshot bị xóa và số lượng file Parquet đã được xóa khỏi S3. Metadata của bảng được cập nhật nhẹ hơn.
Truy vấn thời gian (Time Travel)
Khái niệm Time Travel trong Iceberg
Time Travel cho phép truy vấn dữ liệu tại một thời điểm trong quá khứ hoặc tại một snapshot cụ thể. Điều này cực kỳ hữu ích để debug dữ liệu bị sai lệch hoặc phục hồi dữ liệu.
Truy vấn theo thời điểm (Timestamp)
Sử dụng cú pháp `TIMESTAMP AS OF` trong Spark hoặc Trino.
Trong Spark:
SELECT * FROM iceberg.customer_data TIMESTAMP AS OF '2023-10-01 10:00:00';
Lệnh này sẽ đọc dữ liệu của bảng `customer_data` như nó tồn tại vào lúc 10:00 ngày 01/10/2023. Dữ liệu cập nhật sau thời điểm này (ví dụ dòng 1001 được update thành "Updated") sẽ không hiển thị, và dòng mới (1004, 1005) cũng chưa xuất hiện.
Kết quả mong đợi: Chỉ hiển thị dữ liệu gốc ban đầu (3 dòng đầu tiên với thông tin chưa update).
Truy vấn theo Snapshot ID
Thay vì dùng thời gian, có thể dùng ID snapshot chính xác đã lấy được từ `DESCRIBE HISTORY`.
SELECT * FROM iceberg.customer_data SNAPSHOT AS OF 1234567890123456789;
Thay `1234567890123456789` bằng snapshot ID thực tế từ bước trước.
Kết quả mong đợi: Trả về dữ liệu chính xác tại snapshot đó.
Time Travel trong Trino
Trino hỗ trợ Time Travel thông qua tham số `FOR TIMESTAMP AS OF` hoặc `FOR VERSION AS OF`.
presto> SELECT * FROM iceberg.customer_data FOR TIMESTAMP AS OF '2023-10-01 10:00:00';
Kết quả mong đợi: Tương tự như Spark, trả về dữ liệu tại thời điểm quá khứ.
Verify kết quả Time Travel
So sánh kết quả của query hiện tại và query time travel:
-- Query hiện tại (có dữ liệu mới)
SELECT customer_id, name FROM iceberg.customer_data WHERE customer_id = 1001;
-- Query quá khứ (dữ liệu cũ)
SELECT customer_id, name FROM iceberg.customer_data TIMESTAMP AS OF '2023-10-01 10:00:00' WHERE customer_id = 1001;
Kết quả mong đợi: Dòng đầu tiên trả về tên "Nguyen Van A (Updated)", dòng thứ hai trả về tên "Nguyen Van A" (chưa update). Điều này chứng minh Time Travel hoạt động chính xác.
Điều hướng series:
Mục lục: Series: Xây dựng nền tảng Data Fabric hiện đại với Apache Iceberg, Trino và Kubernetes
« Phần 4: Quản lý dữ liệu: Ghi, đọc và cập nhật dữ liệu Iceberg
Phần 5: Tối ưu hóa hiệu năng với Partitioning và Indexing trong Iceberg »