Hiểu cơ chế Commit và Versioning của Apache Hudi
Apache Hudi lưu trữ lịch sử thay đổi dưới dạng các commit files (commit files) trong thư mục .hoodie của bảng. Mỗi lần thực hiện Upsert, Insert, hoặc Delete, Hudi sẽ tạo một timestamp commit mới theo định dạng 20240520103000000.
Cơ chế này cho phép Hudi duy trì một timeline (danh sách thời gian) đầy đủ về mọi thay đổi, giúp bạn có thể quay lại bất kỳ trạng thái nào trong quá khứ mà không cần backup riêng biệt.
1. Kiểm tra Timeline hiện tại
Để xem danh sách các commit đã xảy ra trên bảng, ta sử dụng lệnh DESCRIBE trong Spark SQL hoặc Hive.
Lệnh này liệt kê tất cả các commit theo thứ tự thời gian, bao gồm cả commit hiện tại và các commit lịch sử.
DESCRIBE FORMATTED hudi_trip_data;
Kết quả mong đợi: Bạn sẽ thấy một bảng mô tả chi tiết, trong đó phần Commit Metadata hoặc Timeline hiển thị danh sách các commit ID (ví dụ: 20240520103000000, 20240520114500000).
2. Xem chi tiết từng Commit
Để hiểu rõ hơn về từng phiên bản, ta cần xem nội dung file timeline trực tiếp trong hệ thống tệp tin hoặc dùng Spark để đọc metadata.
Trong môi trường Spark, ta có thể truy vấn bảng metadata để xem loại thao tác (insert, update, delete) trong mỗi commit.
SELECT commit_time, action FROM hudi_trip_data._hoodie.timeline ORDER BY commit_time;
Kết quả mong đợi: Danh sách các dòng chứa thời gian commit và loại hành động (INS, UPD, DEL) tương ứng với từng phiên bản.
Truy vấn dữ liệu tại điểm thời gian cụ thể (As of Version)
Đây là tính năng cốt lõi của Time Travel trong Hudi. Bạn có thể truy vấn dữ liệu như thể nó tồn tại tại một thời điểm hoặc một commit version cụ thể trong quá khứ.
Cú pháp chuẩn là sử dụng AS OF TIMESTAMP hoặc AS OF VERSION ngay sau tên bảng trong câu lệnh SELECT.
1. Truy vấn theo Timestamp
Giả sử bạn muốn xem dữ liệu chuyến đi (trip) vào lúc 10:30 sáng ngày 20/05/2024, trước khi có các bản cập nhật mới nhất.
Câu lệnh này sẽ tự động tìm commit gần nhất trước thời điểm được chỉ định và trả về snapshot dữ liệu tại thời điểm đó.
SELECT * FROM hudi_trip_data AS OF TIMESTAMP '2024-05-20 10:30:00' WHERE trip_id = 'trip_123';
Kết quả mong đợi: Trả về dữ liệu của trip_123 đúng như trạng thái tại 10:30, bao gồm cả các giá trị đã bị thay đổi hoặc xóa đi trong các commit sau đó.
2. Truy vấn theo Commit Version
Thay vì dùng thời gian, bạn có thể dùng chính Commit ID (version) nếu đã biết từ bước kiểm tra timeline.
Phương pháp này chính xác tuyệt đối về mặt phiên bản, không phụ thuộc vào sự lệch múi giờ của hệ thống.
SELECT * FROM hudi_trip_data AS OF VERSION '20240520103000000' WHERE trip_id = 'trip_123';
Kết quả mong đợi: Trả về dữ liệu chính xác tại commit 20240520103000000, đảm bảo tính nhất quán với metadata của commit đó.
Verify kết quả
Để xác minh tính chính xác, hãy so sánh kết quả của truy vấn AS OF VERSION với dữ liệu hiện tại.
Thực hiện lệnh sau để xem sự khác biệt giữa dữ liệu hiện tại và dữ liệu lịch sử:
SELECT * FROM hudi_trip_data WHERE trip_id = 'trip_123' EXCEPT SELECT * FROM hudi_trip_data AS OF VERSION '20240520103000000' WHERE trip_id = 'trip_123';
Kết quả mong đợi: Nếu có thay đổi, câu lệnh sẽ trả về các dòng khác biệt. Nếu không có thay đổi, kết quả sẽ rỗng.
Tách biệt Snapshot và Incremental Data
Hudi hỗ trợ hai chế độ đọc chính: Snapshot (đọc toàn bộ dữ liệu tại một thời điểm) và Incremental (đọc chỉ những thay đổi trong một khoảng thời gian).
Chế độ Incremental rất quan trọng cho các pipeline ETL, giúp chỉ xử lý dữ liệu mới hoặc đã thay đổi thay vì quét toàn bộ bảng.
1. Đọc dữ liệu Incremental
Sử dụng tham số hoodie.table.type hoặc chế độ đọc INCREMENTAL trong Spark để chỉ lấy các bản ghi đã thay đổi từ commit A đến commit B.
Cú pháp này thường được dùng trong Spark DataFrame API hoặc Spark SQL với các tham số đọc cụ thể.
val df = spark.read
.format("org.apache.hudi")
.option("hoodie.table.type", "COPY_ON_WRITE") // Hoặc MERGE_ON_WRITE
.option("hoodie.start.version", "20240520103000000")
.option("hoodie.end.version", "20240520120000000")
.load("/data/hudi_trip_data")
df.show(10)
Kết quả mong đợi: DataFrame chỉ hiển thị các dòng dữ liệu đã được Insert, Update hoặc Delete trong khoảng thời gian từ 10:30 đến 12:00. Các dòng không thay đổi sẽ bị lược bỏ.
2. Đọc dữ liệu Snapshot (Toàn bộ)
Để đọc toàn bộ dữ liệu hiện có (snapshot), ta không cần chỉ định khoảng thời gian, Hudi sẽ tự động hợp nhất (merge) tất cả các commit trước đó.
Đây là cách đọc mặc định khi bạn không dùng tham số version hay start/end version.
val snapshotDf = spark.read
.format("org.apache.hudi")
.load("/data/hudi_trip_data")
snapshotDf.count()
Kết quả mong đợi: Trả về tổng số lượng dòng dữ liệu hiện hành trong bảng, bao gồm tất cả lịch sử đã được hợp nhất.
Verify kết quả
So sánh số lượng dòng giữa chế độ Incremental và Snapshot để đảm bảo logic tách biệt hoạt động đúng.
val incrementalCount = spark.read
.format("org.apache.hudi")
.option("hoodie.start.version", "20240520103000000")
.option("hoodie.end.version", "20240520120000000")
.load("/data/hudi_trip_data").count()
val snapshotCount = spark.read
.format("org.apache.hudi")
.load("/data/hudi_trip_data").count()
println(s"Incremental rows: $incrementalCount, Snapshot rows: $snapshotCount")
Kết quả mong đợi: Incremental rows phải nhỏ hơn hoặc bằng Snapshot rows. Nếu bằng nhau, nghĩa là toàn bộ dữ liệu trong bảng đều được tạo mới trong khoảng thời gian đó.
Tái tạo dữ liệu từ phiên bản cũ để phục hồi
Khi xảy ra sự cố do thao tác sai (ví dụ: xóa nhầm toàn bộ dữ liệu hoặc cập nhật sai hàng loạt), bạn có thể sử dụng cơ chế versioning để khôi phục dữ liệu về trạng thái trước đó.
Quá trình này liên quan đến việc đọc dữ liệu từ commit cũ và ghi đè (write) lại vào bảng hiện tại.
1. Xác định Commit phục hồi
Trước tiên, xác định Commit ID an toàn nhất trước khi sự cố xảy ra bằng cách xem lại timeline.
DESCRIBE FORMATTED hudi_trip_data;
Kết quả mong đợi: Xác định được một Commit ID (ví dụ: 20240520090000000) mà tại đó dữ liệu vẫn còn nguyên vẹn.
2. Thực hiện khôi phục (Restore)
Cách đơn giản nhất là đọc dữ liệu từ commit cũ và Upsert lại vào cùng một bảng. Hudi sẽ tự động xử lý logic merge để đưa bảng về trạng thái cũ.
Trong Spark SQL, ta dùng lệnh INSERT OVERWRITE hoặc INSERT INTO kết hợp với AS OF VERSION.
INSERT OVERWRITE TABLE hudi_trip_data
SELECT * FROM hudi_trip_data AS OF VERSION '20240520090000000';
Kết quả mong đợi: Bảng hudi_trip_data sẽ được cập nhật với một commit mới, nhưng nội dung dữ liệu bên trong sẽ giống hệt trạng thái tại commit 20240520090000000. Các thay đổi sau đó sẽ bị thay thế.
3. Khôi phục bằng Spark DataFrame (Tùy chọn)
Nếu bạn cần kiểm soát nhiều hơn (ví dụ: khôi phục vào một bảng khác trước khi rename), hãy dùng DataFrame API.
val restoreDf = spark.read
.format("org.apache.hudi")
.option("hoodie.start.version", "20240520090000000")
.load("/data/hudi_trip_data")
restoreDf.write
.option("hoodie.datasource.write.precombine.field", "timestamp")
.option("hoodie.datasource.write.recordkey.field", "trip_id")
.mode("overwrite")
.format("org.apache.hudi")
.save("/data/hudi_trip_data")
Kết quả mong đợi: Dữ liệu đã được ghi đè hoàn toàn vào đường dẫn lưu trữ, tạo ra một snapshot mới phản ánh trạng thái cũ.
Verify kết quả
Đảm bảo dữ liệu đã được khôi phục đúng bằng cách so sánh với một bản sao lưu (backup) hoặc kiểm tra lại một số dòng cụ thể mà bạn biết chắc chắn tồn tại trước sự cố.
SELECT * FROM hudi_trip_data WHERE trip_id IN ('trip_001', 'trip_002', 'trip_003') ORDER BY trip_id;
Kết quả mong đợi: Các dòng dữ liệu này phải khớp hoàn toàn với dữ liệu bạn ghi nhớ hoặc đã lưu trữ trước khi xảy ra sự cố. Số lượng dòng tổng thể cũng phải trở lại mức ban đầu.
Điều hướng series:
Mục lục: Series: Triển khai Database Lakehouse với Apache Hudi và Ubuntu 24.04
« Phần 4: Thực thi các thao tác Upsert, Cập nhật và Xóa dữ liệu
Phần 6: Tối ưu hóa hiệu năng: Compaction, Clustering và Indexing »