Cấu hình chiến lược Compaction tự động cho bảng MOR
Thiết lập tham số Compaction trong Spark
Bạn cần cấu hình Spark để tự động kích hoạt compaction khi số lượng file delta vượt quá ngưỡng quy định. Điều này ngăn chặn tình trạng "small files" gây quá tải cho NameNode và làm chậm truy vấn.
Tham số hoodie.compaction.inline cần được đặt là true để kích hoạt compaction ngay sau khi commit, nhưng với dữ liệu lớn, ta thường dùng ngưỡng hoodie.compaction.max.num.commits để trì hoãn một chút nhằm cân bằng giữa tốc độ ghi và độ sạch của dữ liệu.
Chạy script Spark với các tham số tối ưu sau đây:
spark-submit \
--class org.apache.hudi.examples.HoodieUpsertExample \
--packages org.apache.hudi:hudi-spark3.5_2.12:0.14.0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.hudi.hive.HoodieHiveCatalog \
--conf hoodie.compaction.inline=true \
--conf hoodie.compaction.max.num.commits=5 \
--conf hoodie.compaction.async.enabled=true \
--conf hoodie.compaction.async.enabled=true \
--conf hoodie.compaction.async.schedule.interval.seconds=300 \
--conf hoodie.compaction.async.cron=0 */5 * * * * \
hudi-spark3.5_2.12_0.14.0.jar \
--base-path /data/hudi/trips \
--table-name trips_mor \
--operation upsert \
--record-key-fields _ts \
--partition-path-fields day
Kết quả mong đợi: Spark sẽ tự động chạy job compaction nền khi bảng đạt 5 commit liên tiếp hoặc cứ 5 phút một lần, đảm bảo số lượng file delta luôn được gom lại thành file base.
Verify kết quả Compaction tự động
Kiểm tra số lượng file trong thư mục partition để xác nhận compaction đã diễn ra. Sử dụng lệnh ls trên thư mục Hudi.
ls -la /data/hudi/trips/day=2023-10-25/ | grep -E '\.(delta|parquet)' | wc -l
So sánh với số lượng commit trong log .commits để đảm bảo không có quá nhiều file delta tồn đọng so với số lần commit.
ls /data/hudi/trips/.commits | wc -l
Kết quả mong đợi: Tỷ lệ file delta/parquet trên tổng số commit phải nhỏ hơn 1, nghĩa là nhiều commit đã được gom lại thành một file parquet duy nhất.
Thực hiện thủ công các tác vụ Compaction và Clustering
Chạy Compaction thủ công
Đôi khi compaction tự động chưa đủ nhanh hoặc bạn muốn ép buộc gom file sau một đợt import dữ liệu khổng lồ. Bạn cần chạy một job Spark riêng biệt chỉ để thực hiện compaction.
Script này sẽ đọc các file delta, gom chúng thành file base mới và xóa các file delta cũ.
spark-submit \
--class org.apache.hudi.QuickstartWrite \
--packages org.apache.hudi:hudi-spark3.5_2.12:0.14.0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.hudi.hive.HoodieHiveCatalog \
--conf hoodie.compaction.inline=false \
--conf hoodie.compaction.async.enabled=false \
hudi-spark3.5_2.12_0.14.0.jar \
--base-path /data/hudi/trips \
--table-name trips_mor \
--operation compact \
--record-key-fields _ts \
--partition-path-fields day
Kết quả mong đợi: Job chạy xong, không báo lỗi, và khi list file trong partition, các file .delta biến mất, thay thế bằng file .parquet mới.
Chạy Clustering thủ công cho bảng COER (Copy-on-Write)
Clustering giúp gom các bản ghi có cùng partition và cùng record key vào cùng một file, giảm độ phân mảnh dữ liệu. Thao tác này thường dùng cho bảng COER hoặc MOR khi cần tối ưu đọc.
Cấu hình chiến lược clustering là BUCKET hoặc TIME_BASED. Ở đây ta dùng BUCKET để gom theo số lượng bản ghi.
spark-submit \
--class org.apache.hudi.QuickstartWrite \
--packages org.apache.hudi:hudi-spark3.5_2.12:0.14.0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.hudi.hive.HoodieHiveCatalog \
--conf hoodie.clustering.inline.enabled=false \
--conf hoodie.clustering.strategy=org.apache.hudi.table.action.compact.strategy.BucketClusteringStrategy \
--conf hoodie.clustering.max.bytes.in.result=1073741824 \
hudi-spark3.5_2.12_0.14.0.jar \
--base-path /data/hudi/trips_cow \
--table-name trips_cow \
--operation cluster \
--record-key-fields _ts \
--partition-path-fields day
Kết quả mong đợi: Dữ liệu được sắp xếp lại theo bucket, số lượng file giảm xuống và kích thước file tăng lên, cải thiện hiệu năng scan.
Verify kết quả Compaction/Clustering
Đếm số lượng file trước và sau khi chạy tác vụ để xác minh hiệu quả gom file.
echo "Số file trước compaction:" && ls /data/hudi/trips/day=2023-10-25/*.parquet /data/hudi/trips/day=2023-10-25/*.delta 2>/dev/null | wc -l
Chạy lại compaction/cluster rồi đếm lại:
echo "Số file sau compaction:" && ls /data/hudi/trips/day=2023-10-25/*.parquet 2>/dev/null | wc -l
Kết quả mong đợi: Số lượng file sau phải nhỏ hơn đáng kể so với trước, chứng tỏ các file nhỏ đã được gom thành file lớn.
Cấu hình các loại Index cho hiệu năng cao
Chuyển đổi sang Global Index (Bloom Filter)
Index mặc định của Hudi là BLOOM (local), chỉ lưu vị trí trong partition. Khi dữ liệu lớn và phân tán nhiều partition, Upsert chậm vì phải scan toàn bộ partition. Chuyển sang GLOBAL_BLOOM hoặc GLOBAL_INDEX giúp tìm kiếm record key trên toàn bảng chỉ với một lần tra cứu.
Tạo bảng mới hoặc chuyển đổi bảng hiện tại bằng cách thay đổi tham số hoodie.index.type khi khởi tạo hoặc update.
spark-submit \
--class org.apache.hudi.examples.HoodieUpsertExample \
--packages org.apache.hudi:hudi-spark3.5_2.12:0.14.0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.hudi.hive.HoodieHiveCatalog \
--conf hoodie.index.type=GLOBAL_BLOOM \
--conf hoodie.bloom.filter.fpp=0.0001 \
--conf hoodie.bloom.filter.num.entries=200000000 \
--conf hoodie.write.key.field=ts \
hudi-spark3.5_2.12_0.14.0.jar \
--base-path /data/hudi/trips_global \
--table-name trips_global \
--operation upsert \
--record-key-fields _ts \
--partition-path-fields day
Kết quả mong đợi: Hudi sẽ tạo file index toàn cục index trong thư mục metadata. Thao tác Upsert sau này sẽ nhanh hơn đáng kể trên dữ liệu lớn.
Chuyển đổi Index cho bảng hiện có (Migration)
Nếu bảng đã tồn tại với local index, bạn cần chạy job migration để chuyển sang Global Index mà không mất dữ liệu.
spark-submit \
--class org.apache.hudi.QuickstartWrite \
--packages org.apache.hudi:hudi-spark3.5_2.12:0.14.0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.hudi.hive.HoodieHiveCatalog \
--conf hoodie.index.type=GLOBAL_BLOOM \
hudi-spark3.5_2.12_0.14.0.jar \
--base-path /data/hudi/trips_mor \
--table-name trips_mor \
--operation migrate \
--record-key-fields _ts \
--partition-path-fields day
Kết quả mong đợi: Job migration chạy xong, file metadata được cập nhật, và các file index mới được tạo.
Verify kết quả Index
Kiểm tra cấu hình metadata của bảng để xác nhận loại index đang được sử dụng.
cat /data/hudi/trips_global/.hoodie/table.properties | grep index.type
Kiểm tra sự tồn tại của thư mục index toàn cục.
ls -la /data/hudi/trips_global/.hoodie/index/
Kết quả mong đợi: Giá trị index.type là GLOBAL_BLOOM và thư mục index chứa các file index toàn cục.
Tối ưu hóa tham số partitioning cho dữ liệu lớn
Chuyển từ Partitioning theo ngày sang Partitioning theo Hash
Với dữ liệu cực lớn, partition theo ngày (day) có thể dẫn đến một partition quá nặng (hot partition) hoặc quá nhẹ. Sử dụng Bucket partitioning giúp phân bổ đều dữ liệu vào các partition dựa trên hash của record key.
Cấu hình hoodie.datasource.write.partition.path.field để sử dụng bucket, hoặc dùng hoodie.datasource.write.bucketindex.type nếu muốn Hudi tự quản lý bucket.
spark-submit \
--class org.apache.hudi.examples.HoodieUpsertExample \
--packages org.apache.hudi:hudi-spark3.5_2.12:0.14.0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.hudi.hive.HoodieHiveCatalog \
--conf hoodie.datasource.write.partition.path.field=day \
--conf hoodie.datasource.write.bucketindex.type=BUCKET \
--conf hoodie.datasource.write.bucketindex.num.buckets=256 \
--conf hoodie.datasource.write.bucketindex.key.field=_ts \
--conf hoodie.write.key.field=_ts \
hudi-spark3.5_2.12_0.14.0.jar \
--base-path /data/hudi/trips_bucket \
--table-name trips_bucket \
--operation upsert \
--record-key-fields _ts \
--partition-path-fields day
Kết quả mong đợi: Dữ liệu trong mỗi partition ngày sẽ được chia nhỏ thành 256 bucket, giúp cân bằng tải khi đọc/ghi song song.
Tối ưu kích thước file sau partitioning
Đảm bảo mỗi file output có kích thước phù hợp (thường 256MB - 512MB) để tận dụng tối đa I/O của HDFS/S3.
spark-submit \
--conf spark.sql.hudi.write.buffer.size.mb=256 \
--conf hoodie.write.buffer.size.mb=256 \
--conf hoodie.compaction.target.max.file.size=536870912 \
... (các tham số khác như trên) ...
Kết quả mong đợi: Các file parquet trong partition có kích thước đồng đều và nằm trong khoảng mục tiêu.
Verify kết quả Partitioning
Kiểm tra cấu trúc thư mục để thấy bucket đã được tạo.
ls -la /data/hudi/trips_bucket/day=2023-10-25/ | head -n 20
Kiểm tra kích thước file trong bucket.
ls -lh /data/hudi/trips_bucket/day=2023-10-25/bucket=0/*.parquet
Kết quả mong đợi: Có các thư mục con dạng bucket=0, bucket=1... và file bên trong có kích thước lớn.
Giám sát và điều chỉnh tài nguyên Spark cho các tác vụ tối ưu
Cấu hình Executor cho Compaction/Clustering
Các tác vụ Compaction và Clustering tiêu tốn nhiều CPU và RAM hơn so với viết đơn thuần. Cần tăng spark.executor.memory và spark.executor.cores.
Tối ưu hóa spark.default.parallelism để đảm bảo số lượng task phù hợp với số lượng file cần xử lý.
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 20 \
--executor-memory 8g \
--executor-cores 4 \
--conf spark.default.parallelism=100 \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.memory.fraction=0.6 \
--conf spark.memory.storageFraction=0.1 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
... (các tham số khác) ...
Kết quả mong đợi: Job chạy nhanh hơn, ít bị OOM (Out Of Memory) hơn, và thời gian compaction giảm đáng kể.
Giám sát qua Spark UI và Hudi Metrics
Truy cập Spark UI (thường ở port 8080 của driver hoặc YARN ResourceManager) để theo dõi tiến độ compaction.
curl -s http://:8080/metrics
Trong Spark UI, xem tab "Stages" để tìm stage có tên "Compaction" hoặc "Clustering". Quan sát thời gian chạy của từng stage.
Đặc biệt chú ý đến metric hoodie.compaction.total.time và hoodie.compaction.files.read.
Điều chỉnh động (Dynamic Allocation)
Cho phép Spark tự động thêm/bớt executor dựa trên tải của job compaction để tiết kiệm tài nguyên khi không cần thiết.
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=50 \
--conf spark.dynamicAllocation.initialExecutors=5 \
--conf spark.dynamicAllocation.schedulingBackend=SCHEDULER \
--conf spark.dynamicAllocation.executorIdleTimeout=60
Kết quả mong đợi: Khi bắt đầu compaction, số executor tăng lên để xử lý nhanh, sau khi xong sẽ giảm xuống để giải phóng tài nguyên cho các job khác.
Verify hiệu năng tài nguyên
Kiểm tra thời gian thực thi của job compaction so với job không có tối ưu.
time spark-submit --conf spark.dynamicAllocation.enabled=true ... (lệnh compaction)
So sánh với log trước đó. Nếu thời gian giảm 30-50% và không có cảnh báo OOM, thì cấu hình tài nguyên đã thành công.
Điều hướng series:
Mục lục: Series: Triển khai Database Lakehouse với Apache Hudi và Ubuntu 24.04
« Phần 5: Quản lý phiên bản dữ liệu (Versioning) và truy vấn lịch sử
Phần 7: Tích hợp Hudi với các công cụ truy vấn và ETL phổ biến »