Cấu hình Spark Session với Apache Iceberg
Bước đầu tiên là tích hợp thư viện Apache Iceberg vào môi trường Spark đang chạy trên Kubernetes. Chúng ta cần thêm dependency vào file cấu hình hoặc truyền vào lúc khởi tạo session để Spark có thể nhận diện các bảng Iceberg.
Trong môi trường Kubernetes, cách phổ biến nhất là sử dụng tham số `--packages` khi chạy `spark-submit` hoặc cấu hình trong `spark-defaults.conf` của Spark Operator. Chúng ta sẽ sử dụng phiên bản stable hiện tại của Iceberg Spark Runtime.
Tạo file cấu hình `spark-defaults.conf` trong thư mục cấu hình của Spark Driver (thường là `/opt/spark/conf/` trên container) với nội dung sau để tự động load thư viện Iceberg:
spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.0
spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_catalog.catalog-impl=org.apache.iceberg.rest.RESTCatalog
spark.sql.catalog.iceberg_catalog.uri=http://iceberg-rest-server:8181
spark.sql.catalog.iceberg_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.iceberg_catalog.s3.endpoint=http://minio:9000
File này chỉ định Spark tải runtime Iceberg phiên bản 1.4.0 và cấu hình catalog REST để kết nối với service Iceberg REST Server đã được deploy trong các phần trước. Tham số `io-impl` cấu hình để đọc ghi trực tiếp lên S3 (MinIO trong môi trường dev).
Để verify kết quả, hãy chạy một lệnh kiểm tra version trong Spark SQL:
spark-sql -e "SELECT current_catalog();"
Kết quả mong đợi là output hiển thị tên catalog `iceberg_catalog` hoặc `spark_catalog` nếu chưa set default, chứng tỏ engine Spark đã load thành công driver Iceberg.
Tạo bảng Iceberg với Partitioning
Sau khi đã có Spark Session hỗ trợ Iceberg, chúng ta tiến hành tạo bảng đầu tiên. Iceberg hỗ trợ nhiều loại partitioning, trong đó `Identity Partition` (partition theo giá trị cột) và `Truncate Partition` (partition theo prefix) là phổ biến nhất để cân bằng giữa số lượng file và hiệu năng query.
Chúng ta sẽ tạo một bảng dữ liệu đơn hàng (`orders`) với 2 trường partition: `order_date` dùng Identity và `region` dùng Truncate để giảm số lượng partition files.
Thực thi lệnh SQL trong Spark SQL để tạo bảng và định nghĩa schema:
CREATE TABLE iceberg_catalog.default.orders (
order_id BIGINT,
customer_id BIGINT,
amount DOUBLE,
order_date DATE,
region STRING,
status STRING
)
PARTITIONED BY (
order_date,
truncate(region, 1)
)
USING iceberg;
Lệnh này tạo bảng trong catalog `iceberg_catalog`, schema database `default`. Cột `order_date` sẽ tạo một partition file cho mỗi ngày, còn `region` sẽ nhóm các giá trị bắt đầu bằng ký tự đầu tiên (ví dụ: "North America" và "North West" sẽ cùng một partition nếu truncate 1 ký tự).
Để verify kết quả, kiểm tra thông tin bảng vừa tạo:
DESCRIBE EXTENDED iceberg_catalog.default.orders;
Kết quả mong đợi hiển thị phần `Partitioning` với các trường `order_date` và `region` (truncated), cùng với `Current Snapshot ID` là `None` (vì bảng mới tạo chưa có dữ liệu).
Thực hiện thao tác DML cơ bản
Điểm mạnh nhất của Iceberg so với các định dạng file truyền thống (Parquet/ORC) là khả năng hỗ trợ ACID transactions và các thao tác DML (Update, Delete, Merge) trực tiếp trên data lake mà không cần viết lại toàn bộ partition (Full Rewrite).
Đầu tiên, chúng ta chèn dữ liệu mẫu vào bảng `orders` để có dữ liệu để thao tác:
INSERT INTO iceberg_catalog.default.orders VALUES
(101, 5001, 150.00, '2023-10-01', 'North America', 'pending'),
(102, 5002, 200.00, '2023-10-01', 'Europe', 'shipped'),
(103, 5003, 75.50, '2023-10-02', 'North America', 'pending');
Lệnh INSERT này sẽ tạo một snapshot mới (Snapshot 1) và ghi dữ liệu vào các manifest file tương ứng. Dữ liệu được lưu dưới dạng file Parquet trong S3.
Tiếp theo, thực hiện thao tác UPDATE để thay đổi trạng thái đơn hàng 101 từ "pending" sang "shipped":
UPDATE iceberg_catalog.default.orders
SET status = 'shipped'
WHERE order_id = 101;
Iceberg sẽ không ghi đè file Parquet gốc. Thay vào đó, nó tạo một file Parquet mới chứa bản ghi đã cập nhật và cập nhật manifest file để chỉ trỏ vào file mới, đồng thời đánh dấu bản ghi cũ là đã xóa trong metadata. Đây là cơ chế Write-Ahead Log (WAL) và Snapshot Isolation.
Thực hiện thao tác DELETE để xóa đơn hàng 103:
DELETE FROM iceberg_catalog.default.orders
WHERE order_id = 103;
Lệnh DELETE cũng tạo snapshot mới (Snapshot 3) và cập nhật manifest để loại bỏ tham chiếu đến file chứa đơn hàng 103. File Parquet gốc vẫn còn nằm trên S3 nhưng sẽ bị xóa sau khi chạy quá trình compaction hoặc expire snapshots.
Thực hiện thao tác MERGE (Upsert) để chèn mới hoặc cập nhật nếu tồn tại:
MERGE INTO iceberg_catalog.default.orders AS target
USING (SELECT 104, 5004, 300.00, '2023-10-03', 'Asia', 'pending') AS source (order_id, customer_id, amount, order_date, region, status)
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET status = 'shipped'
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, amount, order_date, region, status);
Lệnh MERGE kiểm tra khóa chính `order_id`. Nếu tồn tại thì update trường `status`, nếu không tồn tại thì insert toàn bộ record mới. Thao tác này đảm bảo tính nhất quán dữ liệu (Data Consistency) trong môi trường phân tán.
Để verify kết quả của tất cả các thao tác DML trên:
SELECT * FROM iceberg_catalog.default.orders ORDER BY order_id;
Kết quả mong đợi: Hiển thị 3 dòng dữ liệu (ID 101, 102, 104). ID 101 có status là "shipped", ID 103 đã biến mất, ID 104 mới được thêm vào với status "pending".
Hiểu cơ chế Metadata và Manifest File
Để Iceberg hỗ trợ các thao tác DML hiệu quả, nó sử dụng một cấu trúc metadata riêng biệt nằm trong thư mục của bảng (thường là thư mục `.metadata/` trong S3 bucket). Đây là nơi lưu trữ lịch sử snapshots và chỉ mục của các file data.
Cấu trúc này bao gồm các thành phần chính:
- Metadata File: Chứa thông tin schema, partition spec, và danh sách các snapshot ID.
- Snapshot File: Mỗi lần thực hiện write (Insert/Update/Delete), một snapshot mới được tạo. File này lưu trạng thái của bảng tại thời điểm đó.
- Manifest File: Là một file Parquet nhỏ chứa danh sách các file data (data files) thuộc về snapshot đó. Nó lưu trữ thông tin về range giá trị (min/max) của các cột trong file data để Spark có thể loại bỏ (prune) các file không cần thiết khi query.
- Data File: File Parquet thực tế chứa dữ liệu người dùng.
Để xem trực tiếp cấu trúc metadata trong Spark, chúng ta truy cập vào table metadata thông qua command `DESCRIBE HISTORY`:
DESCRIBE HISTORY iceberg_catalog.default.orders;
Lệnh này trả về bảng lịch sử các snapshots. Bạn sẽ thấy các cột: `operation` (insert, update, delete, merge), `snapshot_id`, `summary` (số lượng row added/removed), và `timestamp`.
Kết quả mong đợi hiển thị 4 dòng lịch sử tương ứng với 4 thao tác DML chúng ta đã thực hiện (Insert, Update, Delete, Merge). Cột `summary` sẽ hiển thị chi tiết như `added-data-files=3`, `removed-data-files=1`.
Để hiểu sâu hơn về Manifest List, bạn có thể query trực tiếp vào hệ thống catalog (nếu dùng REST Catalog) hoặc inspect file `.metadata/` trong S3 bằng AWS CLI hoặc MinIO Browser. Tuy nhiên, trong Spark, cách kiểm tra số lượng file data hiện tại là:
SELECT COUNT(*) FROM iceberg_catalog.default.orders;
-- Để xem chi tiết file data (chỉ hoạt động với một số catalog implementation cụ thể)
-- SELECT * FROM iceberg_catalog.default.orders.files;
Trong Spark SQL chuẩn, lệnh `SELECT * FROM table.files` (tùy thuộc vào version Iceberg và catalog) sẽ trả về danh sách các data files và manifest files đang được quản lý bởi snapshot hiện tại, giúp bạn hình dung rõ cơ chế phân mảnh và tổ chức dữ liệu của Iceberg.
Điều hướng series:
Mục lục: Series: Series: Xây dựng nền tảng Data Lakehouse hiện đại với Apache Iceberg, Delta Lake và Spark trên Kubernetes
« Phần 2: Giới thiệu kiến trúc Data Lakehouse: Apache Iceberg vs Delta Lake
Phần 4: Triển khai Delta Lake: Tạo bảng và tối ưu hóa lưu trữ »