Cấu hình Schema và đảm bảo tính nhất quán dữ liệu
1. Định nghĩa Schema với các trường Dimensions và Metrics
Để đảm bảo tính nhất quán, bước đầu tiên là xác định rõ ràng ranh giới giữa các trường chiều (dimensions) dùng để lọc và các trường số (metrics) dùng để tính toán tổng hợp.
Tạo file định nghĩa schema cho bảng dữ liệu bán hàng, bao gồm các trường timestamp, dimension khách hàng và metric doanh thu.
cat > /tmp/acid_schema.json
Kết quả mong đợi: File JSON được tạo thành công tại đường dẫn /tmp/acid_schema.json, sẵn sàng để sử dụng trong lệnh tạo bảng.
2. Tạo bảng Real-time với Transaction Log (Atomicity & Durability)
Sử dụng API của Pinot Controller để tạo bảng Real-time. Để đảm bảo Atomicity (tính nguyên tử) và Durability (tính bền vững), ta phải bật tính năng Stream Ingestion và cấu hình Transaction Log (WAL).
Chạy lệnh HTTP để gửi schema đã định nghĩa lên Controller, đồng thời bật tham số `enableStreamIngestion` và cấu hình `transactionLogConfig`.
curl -X POST -H "Content-Type: application/json" \
-d @/tmp/acid_schema.json \
-d '{
"tableType": "REALTIME",
"streams": ["sales_topic"],
"tableConfig": {
"enableStreamIngestion": true,
"streamConfig": {
"type": "KAFKA",
"streams": ["sales_topic"],
"streamOffset": "LATEST"
},
"transactionLogConfig": {
"enable": true,
"type": "KAFKA",
"streams": ["sales_topic"],
"streamOffset": "LATEST",
"retentionTimeInMinutes": 60
},
"upsertKeys": ["transaction_id"],
"ingestionConfig": {
"type": "stream",
"stream": "sales_topic"
}
}
}' \
http://localhost:9000/tables/sales_acid_table
Kết quả mong đợi: API trả về mã trạng thái 200 OK. Pinot sẽ tự động tạo các segment mới và ghi nhận giao dịch vào transaction log của Kafka để đảm bảo dữ liệu không mất khi hệ thống restart.
3. Cấu hình tham số ACID và Retention Policy
Cấu hình chính sách giữ dữ liệu (retention policy) và các tham số tối ưu hóa để đảm bảo Consistency (tính nhất quán) trong quá trình nhập liệu liên tục.
Tạo file cấu hình table config bổ sung để áp dụng các chính sách xóa dữ liệu cũ và thiết lập thời gian chờ commit giao dịch.
cat > /tmp/table_config_acid.json
Áp dụng cấu hình này vào bảng đã tạo thông qua API PUT để cập nhật các chính sách ACID.
curl -X PUT -H "Content-Type: application/json" \
-d @/tmp/table_config_acid.json \
http://localhost:9000/tables/sales_acid_table/tableConfig
Kết quả mong đợi: Pinot cập nhật cấu hình bảng. Các segment cũ hơn 24 giờ sẽ bị xóa tự động, và các bản ghi trùng lặp dựa trên `transaction_id` sẽ được cập nhật (upsert) thay vì tạo bản ghi mới, đảm bảo tính nhất quán dữ liệu.
4. Xác minh tính Consistency sau khi Commit
Để xác minh tính Consistency, ta cần kiểm tra xem dữ liệu đã được commit vào transaction log và phản ánh đúng trong bảng chưa, ngay cả khi có các bản ghi trùng lặp (upsert).
Thực hiện truy vấn SQL trên bảng để kiểm tra tổng doanh thu theo danh mục sản phẩm, đảm bảo các giá trị đã được aggregate chính xác.
curl -X GET "http://localhost:9000/query/sql?query=SELECT product_category, SUM(amount) as total_revenue FROM sales_acid_table WHERE timestamp > 1700000000000 GROUP BY product_category&format=json"
Kết quả mong đợi: Trả về danh sách JSON chứa các nhóm `product_category` và tổng `total_revenue`. Nếu có giao dịch bị trùng `transaction_id` được ghi lại, giá trị `total_revenue` phải phản ánh giá trị mới nhất của giao dịch đó (do cơ chế upsert), không bị cộng dồn sai lệch.
Để kiểm tra sâu hơn về trạng thái transaction, truy vấn trực tiếp vào metadata của table để xem số lượng segment và trạng thái commit.
curl -X GET "http://localhost:9000/tables/sales_acid_table/segments?state=ONLINE&format=json"
Kết quả mong đợi: Danh sách các segment đang ở trạng thái ONLINE. Số lượng segment tăng lên khi có dữ liệu mới được commit thành công từ Kafka qua transaction log.
Điều hướng series:
Mục lục: Series: Triển khai Database ACID với Apache Pinot trên Ubuntu 24.04
« Phần 4: Cấu hình Pinot Broker và thiết lập giao diện quản trị
Phần 6: Nhập liệu dữ liệu từ Kafka và kiểm tra tính đúng đắn »