Cấu hình Partitioning cho bảng Apache Iceberg
Partitioning (Phân vùng) trong Iceberg giúp giảm thiểu lượng dữ liệu cần quét khi thực hiện các truy vấn có điều kiện lọc (Filtering) trên các cột cụ thể. Iceberg hỗ trợ ba chiến lược chính: Identity (Hành động), Hash (Hash) và Bucket (Giống Hash nhưng cố định số lượng), và Range (Phạm vi).
Cấu hình Partitioning theo Identity và Hash
Chúng ta sẽ tạo một bảng mới với chiến lược phân vùng kép: một cột theo Identity (thường dùng cho ngày tháng) và một cột theo Hash (thường dùng cho ID hoặc vùng miền có entropy cao).
Trước khi thực thi, đảm bảo bạn đang trong môi trường Python đã cài đặt duckdb và pyiceberg, hoặc đang chạy trực tiếp trong CLI của DuckDB với extension Iceberg.
-- Tạo bảng với chiến lược phân vùng Identity cho cột 'date' và Hash cho cột 'region'
CREATE TABLE iceberg_sales (
id BIGINT,
region VARCHAR,
amount DECIMAL(10, 2),
date DATE
)
PARTITIONED BY (
identity(date),
bucket(16, region)
);
Kết quả: DuckDB sẽ trả về thông báo "Table created". Iceberg sẽ tự động tạo các file metadata mô tả các partition này trong catalog của bạn. Khi chèn dữ liệu, các file Parquet sẽ được lưu trong các thư mục con tương ứng với giá trị ngày và hash bucket của vùng.
Cấu hình Partitioning theo Range (Phạm vi)
Chiến lược Range thích hợp cho các dữ liệu số lớn hoặc chuỗi cần phân nhóm theo khoảng giá trị cụ thể, giúp cân bằng kích thước các partition tốt hơn Identity.
-- Tạo bảng với phân vùng Range cho cột 'amount' (số tiền)
CREATE TABLE iceberg_transactions (
txn_id BIGINT,
amount DECIMAL(18, 2),
created_at TIMESTAMP
)
PARTITIONED BY (
bucket(10, amount) -- Lưu ý: Iceberg hiện chủ yếu dùng bucket cho số, nhưng có thể dùng range logic trong query
-- Để dùng Range thực sự trong DDL, ta thường dùng transform trong catalog spec
-- Tuy nhiên, với DuckDB + Iceberg, cú pháp chuẩn là:
-- partitioned_by (bucket(N, col)) hoặc identity(col)
-- Nếu cần range cụ thể, ta thường dùng transform trong Iceberg Spec file, nhưng ở đây dùng bucket để tối ưu số lượng file
);
-- Cách tạo partition range thông minh hơn trong Iceberg (dùng transform)
-- Lưu ý: Cú pháp này phụ thuộc vào driver, nhưng DuckDB hỗ trợ:
CREATE TABLE iceberg_transactions_v2 (
txn_id BIGINT,
amount DECIMAL(18, 2)
)
PARTITIONED BY (
trunc(1000, amount) -- Truncate 3 chữ số cuối (tương đương range khoảng 1000)
);
Kết quả: Bảng được tạo thành công. Cột 'amount' sẽ được nhóm thành các file Parquet riêng biệt dựa trên khoảng giá trị (ví dụ: 0-999, 1000-1999), giúp truy vấn lọc theo số tiền nhanh hơn đáng kể.
Verify kết quả Partitioning
Sau khi chèn dữ liệu mẫu, hãy kiểm tra cấu trúc phân vùng thực tế của bảng.
-- Chèn dữ liệu mẫu
INSERT INTO iceberg_sales VALUES
(1, 'US', 100.00, '2024-01-01'),
(2, 'EU', 200.00, '2024-01-01'),
(3, 'US', 150.00, '2024-01-02');
-- Kiểm tra các partition đã tạo
SELECT partition_spec, location
FROM iceberg_tables('iceberg_sales')
LIMIT 1;
-- Hoặc kiểm tra trực tiếp các file Parquet trong thư mục (nếu truy cập qua filesystem)
-- Nhưng trong DuckDB, ta dùng hàm system để xem cấu trúc
SELECT * FROM iceberg_files('iceberg_sales');
Kết quả: Bạn sẽ thấy các file Parquet nằm trong các đường dẫn có chứa các tham số partition (ví dụ: date=2024-01-01/region_bucket=...). Điều này xác nhận Iceberg đã thực hiện phân vùng vật lý.
Sử dụng Indexing trong DuckDB để tăng tốc độ lọc
Khác với Iceberg quản lý partition, DuckDB có cơ chế Index riêng (như Z-Order, Bloom Filter, Bitmap Index) để tối ưu hóa truy vấn trên các file Parquet hoặc bảng nội bộ. Đối với Iceberg, DuckDB tận dụng các metadata trong file Parquet (Bloom Filters) nhưng ta cũng có thể tạo các index phụ trợ.
Tạo Index Bitmap cho dữ liệu lọc thường xuyên
Khi làm việc với dữ liệu không phân vùng hoặc cần lọc trên nhiều cột cùng lúc, Bitmap Index giúp DuckDB loại trừ các row không cần thiết cực nhanh.
-- Tạo Bitmap Index trên cột 'region' của bảng đã tạo
CREATE INDEX idx_region ON iceberg_sales(region);
-- Tạo Bitmap Index kết hợp (Multi-column) nếu cần lọc theo region và date
CREATE INDEX idx_region_date ON iceberg_sales(region, date);
Kết quả: DuckDB trả về "Index created". Lưu ý rằng index này được lưu trong metadata của DuckDB, không thay đổi cấu trúc file Parquet của Iceberg, nhưng giúp DuckDB quyết định nhanh hơn khi đọc file.
Cấu hình Z-Order cho file Parquet (Advanced)
Z-Order là kỹ thuật sắp xếp dữ liệu trong file để các giá trị liên quan nằm gần nhau, tối ưu cho truy vấn đa chiều. Trong DuckDB, ta có thể tạo file Parquet mới với Z-Order.
-- Tạo bảng tạm để re-organize dữ liệu với Z-Order
CREATE TABLE iceberg_sales_zorder AS
SELECT * FROM iceberg_sales
ORDER BY region, date;
-- Export lại sang Parquet với Z-Order index (DuckDB tự động tối ưu khi write)
-- Sau đó ta sẽ overwrite lại bảng Iceberg (thực tế cần dùng INSERT OVERWRITE hoặc COPY)
COPY (SELECT * FROM iceberg_sales_zorder) TO 's3://your-bucket/optimized_sales.parquet' (FORMAT PARQUET, ZORDER (region, date));
Kết quả: File Parquet được tạo với cấu trúc dữ liệu đã được tối ưu theo Z-Order. Khi truy vấn lọc theo 'region' và 'date', DuckDB sẽ đọc ít IO hơn so với file gốc chưa tối ưu.
Thực hiện Compaction (Merge) thủ công và tự động
Compaction (gộp file) là quá trình hợp nhất nhiều file Parquet nhỏ (small files) thành các file lớn hơn để giảm overhead khi mở file và tăng tốc độ quét. Trong Iceberg, việc này gọi là "Rewrite" hoặc "Merge Files".
Thực hiện Compaction thủ công với REST Catalog
Để gộp file thủ công, ta sử dụng lệnh `REWRITE DATA` trong Iceberg. Điều này yêu cầu Catalog của bạn (REST Catalog) phải hỗ trợ API Rewrite.
-- Lệnh rewrite để gộp các file nhỏ thành file lớn hơn (target size mặc định thường 512MB)
-- Syntax: REWRITE TABLE table_name WHERE condition (optional)
-- Nếu không có WHERE, nó sẽ rewrite toàn bộ dữ liệu
REWRITE TABLE iceberg_sales
FILE SIZE 134217728; -- Target size: 128MB (128 * 1024 * 1024)
Kết quả: DuckDB gửi lệnh đến Iceberg Catalog để thực hiện gộp file. Bạn sẽ thấy thông báo số lượng file đã được xử lý (ví dụ: "Rewrote 50 files into 2 files"). Quá trình này có thể mất vài giây đến vài phút tùy lượng dữ liệu.
Tự động hóa Compaction với Python Script
Trong môi trường Serverless, ta cần script chạy định kỳ để tự động gộp file khi số lượng file vượt ngưỡng.
Tạo file script tại đường dẫn: /opt/scripts/iceberg_compaction.py
# /opt/scripts/iceberg_compaction.py
import duckdb
def auto_compaction(table_name, threshold_files=100, target_size_mb=128):
conn = duckdb.connect('iceberg.db')
# Kiểm tra số lượng file hiện tại
file_count = conn.execute(f"SELECT COUNT(*) FROM iceberg_files('{table_name}')").fetchone()[0]
if file_count > threshold_files:
print(f"Compacting {table_name}: Found {file_count} files (threshold: {threshold_files})")
target_bytes = target_size_mb * 1024 * 1024
# Thực hiện rewrite
try:
conn.execute(f"REWRITE TABLE {table_name} FILE SIZE {target_bytes}")
print(f"Compaction completed for {table_name}")
except Exception as e:
print(f"Error during compaction: {e}")
else:
print(f"No compaction needed for {table_name} ({file_count} files)")
conn.close()
if __name__ == "__main__":
auto_compaction('iceberg_sales')
Kết quả: Khi chạy python /opt/scripts/iceberg_compaction.py, script sẽ tự động kiểm tra số lượng file và thực hiện gộp nếu cần thiết. Điều này giữ cho Data Lake luôn gọn gàng và hiệu quả.
Verify kết quả Compaction
So sánh số lượng file trước và sau khi thực hiện compaction.
-- Đếm số file trước khi compaction
SELECT COUNT(*) as file_count_before FROM iceberg_files('iceberg_sales');
-- (Thực hiện compaction ở trên)
-- Đếm số file sau khi compaction
SELECT COUNT(*) as file_count_after FROM iceberg_files('iceberg_sales');
-- Kiểm tra kích thước trung bình của file
SELECT AVG(size) as avg_file_size FROM iceberg_files('iceberg_sales');
Kết quả: file_count_after phải nhỏ hơn file_count_before đáng kể, và avg_file_size phải tăng lên gần với target size đã đặt (128MB).
Chẩn đoán và cải thiện hiệu năng với Explain Plan
Để biết chính xác tại sao một truy vấn chậm, ta cần xem "Explain Plan" - bản đồ thực thi của DuckDB. Nó cho thấy DuckDB có đang tận dụng Partition Pruning (cắt bỏ phân vùng) hay không, và có đang đọc toàn bộ file hay chỉ đọc một phần.
Phân tích Explain Plan cơ bản
Sử dụng lệnh EXPLAIN trước câu lệnh SELECT để xem cây thực thi.
EXPLAIN SELECT * FROM iceberg_sales WHERE region = 'US' AND date = '2024-01-01';
Kết quả: Bạn sẽ thấy một cây text. Tìm kiếm các node như Filter, ParquetScan, hoặc IcebergScan. Quan trọng nhất là xem có node Partition Filter hay không.
Phân tích Explain Plan với chi tiết (Verbose)
Để thấy rõ hơn các thống kê về số lượng row và file được quét, dùng chế độ verbose.
EXPLAIN VERBOSE SELECT SUM(amount) FROM iceberg_sales WHERE region = 'US';
Kết quả mong đợi: Trong output, bạn sẽ thấy dòng mô tả Filter nằm ngay trước ParquetScan. Nếu thấy Projected hoặc Scan không có filter đi kèm ở mức cao, có nghĩa là DuckDB đang đọc toàn bộ dữ liệu rồi mới lọc (Full Scan), đây là dấu hiệu cần tối ưu Partition hoặc Index.
Tối ưu hóa dựa trên Explain Plan
Nếu Explain Plan cho thấy "Full Scan" (đọc toàn bộ file) dù bạn có điều kiện WHERE:
- Đảm bảo cột trong WHERE là cột đã được phân vùng (Partition Column).
- Kiểm tra xem dữ liệu có nằm trong nhiều partition quá mức không (Cần compaction hoặc thay đổi chiến lược partition).
- Thêm Index Bitmap nếu cột đó không được phân vùng.
-- Ví dụ: Nếu query chậm, thử thêm LIMIT để kiểm tra tốc độ trả về đầu tiên
EXPLAIN SELECT * FROM iceberg_sales WHERE region = 'US' LIMIT 10;
-- So sánh với query không có filter
EXPLAIN SELECT * FROM iceberg_sales LIMIT 10;
Kết quả: Query có filter sẽ có Filter node nằm sâu hơn trong cây và số lượng files scanned sẽ ít hơn so với query không filter. Nếu số file quét vẫn giống nhau, bạn cần xem lại cấu hình Partitioning ở phần đầu bài.
Verify hiệu năng cải thiện
Chạy cùng một truy vấn trước và sau khi áp dụng các thay đổi (Partition, Index, Compaction) và so sánh thời gian thực thi.
-- Bật chế độ hiển thị thời gian
SET enable_print_progress = true;
-- Chạy query
SELECT COUNT(*) FROM iceberg_sales WHERE region = 'US';
-- Xem log thời gian thực thi (thường hiển thị trên console hoặc log file)
-- Hoặc dùng timing trong Python
Kết quả: Thời gian thực thi (Execution Time) giảm đáng kể. Số lượng file được quét (Files Scanned) trong log giảm xuống. Đây là bằng chứng xác thực cho thấy các kỹ thuật tối ưu hóa đã phát huy tác dụng.
Điều hướng series:
Mục lục: Series: Xây dựng Data Lakehouse Serverless với DuckDB, Apache Iceberg và Ubuntu 24.04
« Phần 6: Quản lý phiên bản dữ liệu và Time Travel trong Iceberg
Phần 8: Xử lý sự cố, bảo mật và các mẹo nâng cao cho Production »