1. Cấu hình và tích hợp Delta Lake Connector vào Spark trên Kubernetes
Trước khi có thể tạo bảng Delta, chúng ta cần đảm bảo Spark executor và driver có thể tải được thư viện Delta Lake. Trong môi trường Kubernetes, cách chuẩn nhất là thêm JAR vào biến môi trường của Spark hoặc cấu hình qua Spark Submit.
Định nghĩa biến môi trường để Spark tự động tải JAR Delta Lake phiên bản 3.x (tương thích với Spark 3.3+). Chúng ta sẽ sử dụng phiên bản 3.1.0 cho tính ổn định cao.
export SPARK_EXTRA_JARS="https://repo1.maven.org/maven2/io/delta/delta-core_2.12/3.1.0/delta-core_2.12-3.1.0.jar"
Kết quả: Biến môi trường được đặt. Khi chạy lệnh spark-submit hoặc spark-shell, Spark sẽ tự động tải file JAR này vào classpath.
Kiểm tra bằng cách khởi động Spark Shell và list các package đã load.
spark-shell --packages io.delta:delta-core_2.12:3.1.0
Kết quả: Bạn thấy thông báo "Resolving..." và "Downloaded delta-core_2.12-3.1.0" trong log. Dòng lệnh trở về prompt "scala>".
Cấu hình Spark Session để bật Delta Log và đảm bảo các tính năng nâng cao được kích hoạt.
val spark = SparkSession.builder
.appName("DeltaLakeDemo")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
Kết quả: Spark Session được khởi tạo với extension Delta. Bạn có thể chạy lệnh `spark.catalog.listTables()` mà không gặp lỗi về catalog.
Verify kết quả: Chạy lệnh sau trong Spark Shell để đảm bảo Delta Catalog đã sẵn sàng.
spark.sql("SELECT * FROM delta.`s3a://your-bucket-name/delta-table-path`")
*(Lưu ý: Thay `s3a://your-bucket-name/delta-table-path` bằng đường dẫn thực tế của bạn. Nếu chưa có bảng, lệnh này sẽ báo lỗi "Table or view does not exist", điều này là bình thường, quan trọng là không báo lỗi "Delta not found".)*
2. Tạo bảng Delta với Partition và Clustering (Data Skipping)
Bây giờ chúng ta sẽ tạo bảng thực tế. Delta Lake hỗ trợ cả Partitioning (cắt chia dữ liệu theo thư mục) và Clustering (sắp xếp dữ liệu trong file để tối ưu việc skip). Clustering là điểm mạnh của Delta so với Iceberg ở giai đoạn ban đầu.
Tạo bảng Delta mới với 2 partition: `country` (string) và `year` (int). Sử dụng syntax `USING delta` và `PARTITIONED BY`.
spark.sql("""
CREATE TABLE IF NOT EXISTS sales_transactions
USING delta
LOCATION 's3a://your-bucket-name/delta-tables/sales_transactions'
PARTITIONED BY (country, year)
""")
Kết quả: Bảng được tạo trong thư mục S3. Bạn sẽ thấy thư mục `delta-tables/sales_transactions/` xuất hiện, bên trong có file `_delta_log/` chứa metadata.
Để kích hoạt Data Skipping hiệu quả, chúng ta cần định nghĩa Clustering Key. Điều này giúp Spark sắp xếp dữ liệu theo các cột này ngay khi ghi (write), giúp tìm kiếm nhanh hơn mà không cần quét toàn bộ partition.
spark.sql("""
ALTER TABLE sales_transactions
SET TBLPROPERTIES (
'delta.logRetentionDuration' = 'interval 30 days',
'delta.dataRetentionDuration' = 'interval 30 days',
'delta.columnMapping.mode' = 'name',
'delta.clusteringColumns' = '["amount", "transaction_date"]'
)
""")
Kết quả: Thuộc tính bảng được cập nhật. Khi thực hiện lệnh `INSERT` hoặc `CREATE TABLE AS SELECT` (CTAS) sau này, Spark sẽ tự động sắp xếp dữ liệu theo `amount` và `transaction_date`.
Verify kết quả: Kiểm tra metadata của bảng.
spark.sql("DESCRIBE EXTENDED sales_transactions").show()
Kết quả: Trong output, bạn thấy dòng `delta.columnMapping.mode` là `name` và `delta.clusteringColumns` chứa danh sách cột đã định nghĩa.
3. Tối ưu hóa lưu trữ: OPTIMIZE, VACUUM và ZORDER
Khi dữ liệu được ghi vào Delta nhiều lần, các file nhỏ (small files) sẽ tích tụ, gây ra "small file problem" làm chậm đọc. Chúng ta cần áp dụng các lệnh tối ưu.
Thực hiện lệnh `OPTIMIZE` để merge các file nhỏ thành file lớn hơn (thường là 1GB hoặc 4GB tùy cấu hình). Lệnh này chạy compaction trên các partition đã chỉ định.
spark.sql("OPTIMIZE sales_transactions WHERE year = 2023")
Kết quả: Delta đọc log, xác định các file nhỏ trong partition `year=2023`, hợp nhất chúng và ghi lại metadata mới. Số lượng file trong thư mục sẽ giảm đi đáng kể.
Để tối ưu hơn nữa việc tìm kiếm trong các file lớn, chúng ta áp dụng `ZORDER`. ZORDER sắp xếp dữ liệu theo nhiều cột cùng lúc (multidimensional) để tăng hiệu quả Data Skipping.
spark.sql("OPTIMIZE sales_transactions ZORDER BY (amount, country)")
Kết quả: Các file Parquet được viết lại với cấu trúc dữ liệu được sắp xếp theo Z-order. Khi truy vấn lọc theo `amount` hoặc `country`, Spark sẽ skip được nhiều file hơn.
Cảnh báo: Lệnh `VACUUM` sẽ xóa vĩnh viễn các file dữ liệu cũ (đã bị version cũ hơn) không còn được tham chiếu bởi Delta Log. Đây là bước quan trọng để giải phóng dung lượng S3, nhưng phải cẩn thận với `retention period`.
spark.sql("VACUUM sales_transactions RETAIN 24 HOURS")
Kết quả: Các file `.parquet` cũ hơn 24 giờ so với version mới nhất của bảng bị xóa khỏi S3. Log file cũ hơn cũng bị xóa nếu không nằm trong khoảng retention.
Verify kết quả: Kiểm tra số lượng file và kích thước sau khi optimize.
spark.sql("DESCRIBE EXTENDED sales_transactions").show()
Kiểm tra trong thư mục S3: Số lượng file `.parquet` giảm đi, và các file có timestamp cũ đã biến mất.
4. Quản lý phiên bản và Time Travel để Rollback
Delta Lake lưu giữ lịch sử của mọi thay đổi. Mỗi lần `WRITE` hoặc `ALTER`, một version mới được tạo. Chúng ta có thể truy cập dữ liệu ở quá khứ (Time Travel) để phục hồi hoặc so sánh.
Xác định version hiện tại của bảng để làm mốc tham chiếu.
spark.sql("DESCRIBE HISTORY sales_transactions").show()
Kết quả: Bảng hiện thị lịch sử các version (version 0, 1, 2...). Cột `version` tăng dần theo thời gian. Cột `operation` cho biết loại thao tác (Write, Delete, Update, Merge).
Thực hiện Time Travel để đọc dữ liệu tại version cụ thể (ví dụ version 5) mà không cần copy dữ liệu.
val historicalData = spark.table("sales_transactions").versionAsOf(5)
historicalData.show(5)
Kết quả: Bạn thấy dữ liệu đúng như trạng thái của bảng tại version 5, ngay cả khi version hiện tại đã xóa hoặc cập nhật các dòng đó.
Thực hiện Time Travel theo thời gian cụ thể (ví dụ: dữ liệu cách đây 2 giờ).
val timeTravelData = spark.table("sales_transactions").timestampAsOf("2023-10-25T14:30:00Z")
timeTravelData.show(5)
Kết quả: Spark tự động tìm version gần nhất trước thời điểm `2023-10-25T14:30:00Z` và trả về dữ liệu tương ứng.
Rollback toàn bộ bảng về version cũ để khôi phục sau lỗi (ví dụ: xóa nhầm toàn bộ dữ liệu).
spark.sql("""
RESTORE TABLE sales_transactions
TO VERSION AS OF 5
""")
Kết quả: Bảng `sales_transactions` được khôi phục hoàn toàn về trạng thái của version 5. Tất cả các version sau đó (6, 7, 8...) vẫn nằm trong log history nhưng không còn là version "hiện tại" (current version).
Verify kết quả: Chạy lại `DESCRIBE HISTORY`.
spark.sql("DESCRIBE HISTORY sales_transactions").show()
Kết quả: Dòng mới nhất trong history sẽ có `operation` là `RESTORE` và `version` mới được tạo ra, trỏ về nội dung của version 5. Dữ liệu đã quay lại trạng thái cũ.
Điều hướng series:
Mục lục: Series: Series: Xây dựng nền tảng Data Lakehouse hiện đại với Apache Iceberg, Delta Lake và Spark trên Kubernetes
« Phần 3: Xây dựng bảng Iceberg đầu tiên và thao tác cơ bản với Spark
Phần 5: Xử lý dữ liệu thực tế: ETL pipeline với Spark trên Kubernetes »