Cấu hình thuộc tính Precombine cho thao tác Upsert
Để Spark thực hiện Upsert (thêm mới hoặc cập nhật) chính xác, bạn cần xác định trường nào dùng để gộp (precombine) các bản ghi trùng nhau trước khi ghi vào Hudi.
Thuộc tính hoodie.datasource.write.precombine.field sẽ chỉ định trường chứa giá trị timestamp hoặc version để quyết định bản ghi nào mới nhất. Nếu không cấu hình này, Hudi có thể ghi đè dữ liệu mới bằng dữ liệu cũ hoặc tạo ra bản ghi trùng lặp.
Tạo file cấu hình Spark cho phiên làm việc này tại đường dẫn /tmp/hudi_upsert_config.conf.
cat > /tmp/hudi_upsert_config.conf
Kết quả mong đợi: File /tmp/hudi_upsert_config.conf được tạo thành công, chứa định nghĩa rõ ràng rằng trường ts là trường dùng để gộp dữ liệu, id là khóa chính, và dt là trường phân vùng.
Thực thi Upsert: Ghi mới và Cập nhật cùng lúc
Sử dụng Spark DataFrame để tạo dữ liệu bao gồm cả bản ghi mới và bản ghi cần cập nhật, sau đó ghi vào Hudi với chế độ Upsert.
Chạy lệnh Spark-submit hoặc vào shell Spark để tạo DataFrame, áp dụng cấu hình đã thiết lập và ghi vào bảng hudi_orders.
spark-shell --conf-file /tmp/hudi_upsert_config.conf
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
// Tạo SparkSession
val spark = SparkSession.builder().appName("HudiUpsertDemo").getOrCreate()
import spark.implicits._
// Tạo dữ liệu Upsert: Chứa ID mới (103) và ID cũ (101) với giá trị mới hơn
val upsertData = Seq(
(101, "UserA", "NewProduct", 150.00, "2023-10-27", "2023-10-27T10:00:00Z"),
(102, "UserB", "OldProduct", 50.00, "2023-10-26", "2023-10-26T09:00:00Z"),
(103, "UserC", "NewProductX", 200.00, "2023-10-27", "2023-10-27T11:00:00Z")
).toDF("id", "customer", "product", "amount", "dt", "ts")
// Ghi dữ liệu vào Hudi (Upsert)
upsertData.write
.format("hudi")
.option("path", "/data/hudi/hudi_orders")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.precombine.field", "ts")
.option("hoodie.datasource.write.recordkey.field", "id")
.option("hoodie.datasource.write.partitionpath.field", "dt")
.option("hoodie.datasource.write.operation", "upsert")
.mode("append")
.save()
println("Upsert completed.")
spark.stop()
Kết quả mong đợi: Spark thực hiện 2 hành động: Cập nhật bản ghi ID 101 (nếu đã tồn tại) với giá trị mới nhất dựa trên ts, và chèn mới bản ghi ID 102 và 103. Không có lỗi xung đột khóa chính.
Thực hiện xóa dữ liệu có điều kiện
Hudi hỗ trợ xóa dữ liệu logic (soft delete) hoặc vật lý (hard delete) tùy thuộc vào chế độ bảng. Với chế độ COW, xóa sẽ tạo ra commit mới đánh dấu bản ghi đã xóa.
Sử dụng DataFrame để lọc ra các bản ghi cần xóa và ghi vào Hudi với operation là delete.
spark-shell --conf-file /tmp/hudi_upsert_config.conf
import org.apache.spark.sql.SparkSession
import org.apache.hudi.DataSourceWriteOptions._
val spark = SparkSession.builder().appName("HudiDeleteDemo").getOrCreate()
import spark.implicits._
// Đọc dữ liệu hiện tại từ bảng Hudi
val existingData = spark.read.format("hudi").load("/data/hudi/hudi_orders")
// Tạo DataFrame chứa các bản ghi cần xóa (ví dụ: xóa đơn hàng của UserB)
val dataToDelete = existingData.filter($"customer" === "UserB").select("id", "customer", "product", "amount", "dt", "ts")
// Thực hiện xóa
dataToDelete.write
.format("hudi")
.option("path", "/data/hudi/hudi_orders")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.operation", "delete")
.option("hoodie.datasource.write.recordkey.field", "id")
.option("hoodie.datasource.write.partitionpath.field", "dt")
.mode("append")
.save()
println("Delete operation completed.")
spark.stop()
Kết quả mong đợi: Bản ghi có customer = "UserB" (ID 102) sẽ bị đánh dấu là đã xóa trong hệ thống phiên bản. Khi truy vấn toàn bộ bảng, bản ghi này sẽ không xuất hiện.
So sánh hiệu năng ghi giữa COW và MOR
Chế độ Copy-On-Write (COW) tái tạo toàn bộ file dữ liệu khi có cập nhật, đảm bảo tính nhất quán tức thời nhưng tốn chi phí I/O cao với dữ liệu lớn. Chế độ Merge-On-Read (MOR) ghi dữ liệu cập nhật vào file log riêng, chỉ hợp nhất khi đọc hoặc compaction, mang lại tốc độ ghi nhanh hơn nhiều.
Thực hiện cùng một thao tác Upsert trên 2 bảng khác nhau để so sánh.
spark-shell --conf-file /tmp/hudi_upsert_config.conf
import org.apache.spark.sql.SparkSession
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.table.action.compact.CompactActionExecutor
val spark = SparkSession.builder().appName("HudiPerfCompare").getOrCreate()
import spark.implicits._
val updateData = Seq(
(101, "UserA", "UpdatedProduct", 160.00, "2023-10-27", "2023-10-27T12:00:00Z"),
(104, "UserD", "NewItem", 300.00, "2023-10-27", "2023-10-27T12:05:00Z")
).toDF("id", "customer", "product", "amount", "dt", "ts")
// 1. Ghi vào bảng COW
println("Starting COW Upsert...")
val startCOW = System.nanoTime()
updateData.write
.format("hudi")
.option("path", "/data/hudi/hudi_orders_cow")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.precombine.field", "ts")
.option("hoodie.datasource.write.recordkey.field", "id")
.option("hoodie.datasource.write.partitionpath.field", "dt")
.option("hoodie.datasource.write.operation", "upsert")
.mode("append")
.save()
val endCOW = System.nanoTime()
println(s"COW Upsert time: ${(endCOW - startCOW) / 1000000} ms")
// 2. Ghi vào bảng MOR
println("Starting MOR Upsert...")
val startMOR = System.nanoTime()
updateData.write
.format("hudi")
.option("path", "/data/hudi/hudi_orders_mor")
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
.option("hoodie.datasource.write.precombine.field", "ts")
.option("hoodie.datasource.write.recordkey.field", "id")
.option("hoodie.datasource.write.partitionpath.field", "dt")
.option("hoodie.datasource.write.operation", "upsert")
.mode("append")
.save()
val endMOR = System.nanoTime()
println(s"MOR Upsert time: ${(endMOR - startMOR) / 1000000} ms")
spark.stop()
Kết quả mong đợi: Thời gian ghi của bảng MOR thường nhanh hơn đáng kể so với COW (có thể nhanh gấp 3-10 lần tùy kích thước dữ liệu), vì MOR chỉ ghi vào file log nhỏ thay vì tái tạo file Parquet/Hudi lớn.
Verify tính nhất quán của dữ liệu sau Upsert
Để đảm bảo Upsert hoạt động đúng, cần kiểm tra xem dữ liệu mới nhất đã được giữ lại và dữ liệu cũ đã bị thay thế hay không. Đồng thời kiểm tra số lượng bản ghi sau khi xóa.
spark-shell --conf-file /tmp/hudi_upsert_config.conf
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("HudiVerifyDemo").getOrCreate()
import spark.implicits._
// 1. Kiểm tra dữ liệu sau Upsert (Bảng COW)
println("=== Checking Data Consistency (COW) ===")
val cowData = spark.read.format("hudi").load("/data/hudi/hudi_orders")
cowData.show(false)
cowData.groupBy("id").count().show() // Kiểm tra không có trùng ID
// 2. Kiểm tra dữ liệu sau Xóa (Bảng COW)
println("=== Checking Data after Delete ===")
val cowDataAfterDelete = spark.read.format("hudi").load("/data/hudi/hudi_orders")
cowDataAfterDelete.filter($"customer" === "UserB").count() shouldBe 0
println(s"Count of UserB: ${cowDataAfterDelete.filter($"customer" === "UserB").count()}")
// 3. Kiểm tra tính nhất quán trên bảng MOR (cần compaction nếu muốn xem ngay lập tức, nhưng read thường tự merge)
println("=== Checking Data Consistency (MOR) ===")
val morData = spark.read.format("hudi").load("/data/hudi/hudi_orders_mor")
morData.show(false)
spark.stop()
Kết quả mong đợi:
- Bảng COW: ID 101 hiển thị giá trị mới nhất (UpdatedProduct), không có bản ghi nào của UserB.
- Bảng MOR: Dữ liệu hiển thị đúng theo thời gian thực (nếu chưa compaction, đọc vẫn trả về kết quả merge đúng).
- Số lượng bản ghi duy nhất (distinct ID) không bị tăng lên do trùng lặp.
Điều hướng series:
Mục lục: Series: Triển khai Database Lakehouse với Apache Hudi và Ubuntu 24.04
« Phần 3: Khởi tạo kho dữ liệu Hudi: Tạo bảng và nhập dữ liệu ban đầu
Phần 5: Quản lý phiên bản dữ liệu (Versioning) và truy vấn lịch sử »