Cấu hình Stream Config để liên kết Kafka Topic với Pinot Table
Để Pinot có thể nhận dữ liệu thời gian thực từ Kafka, ta cần định nghĩa một Stream Config. File này mô tả source dữ liệu (Kafka topic), các tham số kết nối (Zookeeper, Broker) và cơ chế xử lý sự cố.
File này sẽ được lưu trong thư mục cấu hình của Controller hoặc tạo trực tiếp qua API. Ở đây ta tạo file JSON thủ công để đảm bảo tính minh bạch.
Tạo file /etc/pinot/stream-configs/ride_data_stream.json với nội dung hoàn chỉnh sau:
{
"streamName": "ride_data_stream",
"streamType": "KAFKA",
"streamConfigMap": {
"broker.list": "localhost:9092",
"zookeeper.url": "localhost:2181",
"topic": "ride_data",
"consumer.group.id": "pinot-consumer-group-01",
"kafka.consumer.group.id": "pinot-consumer-group-01",
"kafka.consumer.group.id.auto.create": "false",
"kafka.consumer.group.id.auto.create.topic": "false",
"offset.reset.policy": "earliest",
"kafka.consumer.properties": {
"max.poll.records": "1000",
"fetch.min.bytes": "1"
}
}
}
Kết quả mong đợi: File được lưu thành công, nội dung JSON hợp lệ. Pinot Controller sẽ đọc file này khi khởi động hoặc khi được chỉ định để thiết lập connection.
Gắn Stream vào Table bằng API
Sau khi có file config, ta cần gắn nó vào Table đã tạo ở Phần 5. Việc này được thực hiện qua REST API của Controller để cập nhật cấu hình Table.
Thực hiện lệnh sau để update Table (giả sử tên Table là ride_data_table):
curl -X PUT "http://localhost:9000/tables/ride_data_table/streamConfigs" \
-H "Content-Type: application/json" \
-d '{
"streamName": "ride_data_stream",
"streamType": "KAFKA",
"streamConfigMap": {
"broker.list": "localhost:9092",
"zookeeper.url": "localhost:2181",
"topic": "ride_data",
"consumer.group.id": "pinot-consumer-group-01",
"offset.reset.policy": "earliest"
}
}'
Kết quả mong đợi: HTTP 200 OK. Table đã được kích hoạt chế độ Stream Ingestion. Pinot Server sẽ bắt đầu subscribe vào Kafka topic.
Gửi dữ liệu mẫu (Batch và Stream) vào Kafka để kiểm tra pipeline
Để kiểm tra tính đúng đắn, ta cần gửi dữ liệu vào Kafka. Dữ liệu phải tuân thủ Schema đã định nghĩa (JSON format) và chứa các trường bắt buộc như timestamp, partition key.
Trước tiên, đảm bảo Kafka đang chạy và topic ride_data đã tồn tại. Nếu chưa, tạo topic:
kafka-topics.sh --create --topic ride_data --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
Kết quả mong đợi: Thông báo "Topic 'ride_data' created".
Gửi dữ liệu Stream (Real-time)
Sử dụng kafka-console-producer.sh để gửi dữ liệu JSON trực tiếp vào topic. Dữ liệu này sẽ được Pinot ingest ngay lập tức (Real-time ingestion).
Mở terminal và chạy lệnh producer, sau đó paste các dòng dữ liệu mẫu (chứa timestamp hiện tại):
kafka-console-producer.sh --broker-list localhost:9092 --topic ride_data
Trong prompt của producer, nhập các dòng JSON sau (mỗi dòng là một record):
{"timestamp": 1715678900000, "driver_id": "DRV001", "ride_type": "standard", "amount": 15.50, "status": "completed"}
{"timestamp": 1715678901000, "driver_id": "DRV002", "ride_type": "premium", "amount": 45.00, "status": "completed"}
{"timestamp": 1715678902000, "driver_id": "DRV003", "ride_type": "standard", "amount": 12.25, "status": "pending"}
Kết quả mong đợi: Producer chấp nhận dữ liệu, không có lỗi. Pinot Server bắt đầu processing.
Gửi dữ liệu Batch (Offline)
Để so sánh và kiểm tra tính ACID, ta cũng cần gửi một lượng dữ liệu lớn hơn dưới dạng file để simulate Batch ingestion (thông qua Stream Config vẫn có thể xử lý, nhưng ở đây ta giả lập việc gửi file vào Kafka để kiểm tra độ trễ và tính nguyên tử).
Tạo file /tmp/batch_ride_data.json chứa 500 dòng dữ liệu mẫu:
for i in $(seq 1 500); do echo "{\"timestamp\": $(($(date +%s) * 1000 + $i)), \"driver_id\": \"DRV_BATCH_$i\", \"ride_type\": \"standard\", \"amount\": $((10 + $i % 50)).00, \"status\": \"completed\"}"; done > /tmp/batch_ride_data.json
Sau đó, pipe file này vào Kafka producer:
cat /tmp/batch_ride_data.json | kafka-console-producer.sh --broker-list localhost:9092 --topic ride_data
Kết quả mong đợi: Dữ liệu được đẩy vào Kafka. Pinot sẽ nhận và tạo các segment mới (Real-time segment).
Thực hiện các truy vấn SQL để kiểm tra tính Atomicity
Atomicity (Tính nguyên tử) đảm bảo rằng một transaction (batch dữ liệu) hoặc được thực hiện hoàn toàn, hoặc không được thực hiện chút nào. Trong Pinot, điều này thể hiện qua việc segment được commit hoàn chỉnh hoặc bị rollback nếu có lỗi.
Sử dụng Pinot SQL query engine (thông qua Broker hoặc Controller) để kiểm tra số lượng record.
Thực hiện truy vấn đếm tổng số record trong Table:
curl "http://localhost:9000/query/sql?statement=SELECT COUNT(*) FROM ride_data_table"
Kết quả mong đợi: JSON trả về chứa results với giá trị count > 0. Nếu ta gửi 503 records (3 dòng stream + 500 dòng batch), kết quả phải là 503 hoặc 0 (nếu lỗi toàn bộ). Không thể có số lẻ như 250 (lỗi một nửa).
Kiểm tra tính nguyên tử khi có lỗi dữ liệu
Để test Atomicity thực sự, ta sẽ cố tình gửi một record không hợp lệ (vi phạm schema) nằm giữa các record hợp lệ trong một batch.
Tạo file /tmp/atomicity_test.json với 3 dòng, dòng 2 bị lỗi (thiếu trường bắt buộc):
echo '{"timestamp": 1715679000000, "driver_id": "DRV_A", "amount": 10.0}' > /tmp/atomicity_test.json
echo '{"timestamp": 1715679001000, "driver_id": "DRV_B", "amount": 20.0, "status": "completed"}' >> /tmp/atomicity_test.json
echo '{"timestamp": 1715679002000, "driver_id": "DRV_C"}' >> /tmp/atomicity_test.json
Gửi file này vào Kafka:
cat /tmp/atomicity_test.json | kafka-console-producer.sh --broker-list localhost:9092 --topic ride_data
Quan sát log của Pinot Server (/var/log/pinot/server.log) để xem xử lý như thế nào. Sau đó query lại:
curl "http://localhost:9000/query/sql?statement=SELECT driver_id FROM ride_data_table WHERE driver_id IN ('DRV_A', 'DRV_B', 'DRV_C')"
Kết quả mong đợi: Pinot sẽ skip record lỗi (DRV_C) nhưng vẫn commit DRV_A và DRV_B nếu cấu hình errorHandling cho phép, HOẶC rollback toàn bộ batch nếu cấu hình nghiêm ngặt. Nếu dùng chế độ "skip errors", ta thấy DRV_A và DRV_B xuất hiện, DRV_C không xuất hiện. Điều này chứng tỏ tính nguyên tử ở cấp độ record hoặc segment.
Kiểm tra dữ liệu sau khi có sự cố để xác nhận tính Durability
Durability (Tính bền vững) đảm bảo rằng một khi dữ liệu đã được ghi (commit) vào Pinot, nó sẽ vẫn tồn tại ngay cả khi hệ thống gặp sự cố (server crash, mất điện).
Thao tác này mô phỏng việc dữ liệu đã được push vào Kafka và được Pinot ingest, sau đó ta tắt Server đi để kiểm tra.
Bước 1: Xác nhận dữ liệu đã được commit vào Persistent Store
Trước khi tạo sự cố, đảm bảo dữ liệu đã được chuyển từ Real-time segment sang Offline segment (hoặc đã được snapshot vào HDFS/S3, ở đây ta giả định lưu vào local disk của Server).
Query để kiểm tra trạng thái của Table:
curl "http://localhost:9000/tables/ride_data_table"
Kết quả mong đợi: Trong JSON trả về, trường segments phải liệt kê các segment đã có trạng thái READY hoặc COMPLETE.
Bước 2: Mô phỏng sự cố (Server Crash)
Dừng dịch vụ Pinot Server một cách đột ngột để mô phỏng sự cố mất điện hoặc crash:
systemctl stop pinot-server
Hoặc dùng kill signal mạnh hơn để mô phỏng crash:
pkill -9 -f "pinot.Server"
Kết quả mong đợi: Process Server bị dừng ngay lập tức. Dịch vụ không còn lắng nghe port 9000.
Bước 3: Khôi phục và kiểm tra dữ liệu
Khởi động lại Pinot Server:
systemctl start pinot-server
Chờ khoảng 30-60 giây để Server load lại các segment từ disk và đăng ký với Controller.
Thực hiện truy vấn SQL để kiểm tra lại toàn bộ dữ liệu đã nhập trước đó:
curl "http://localhost:9000/query/sql?statement=SELECT COUNT(*) FROM ride_data_table"
Kết quả mong đợi: Số lượng record trả về phải GIỐNG Hệt như trước khi dừng server. Nếu trước đó có 503 records, sau khi restart vẫn phải là 503. Điều này chứng minh dữ liệu đã được lưu trữ bền vững (Durability) trên disk và không bị mất khi restart.
Bước 4: Kiểm tra tính nhất quán sau sự cố
Thực hiện một truy vấn phức tạp hơn để đảm bảo dữ liệu không bị corruption:
curl "http://localhost:9000/query/sql?statement=SELECT SUM(amount) FROM ride_data_table WHERE status = 'completed'"
Kết quả mong đợi: Tổng amount phải khớp với tổng tính toán thủ công từ dữ liệu mẫu đã gửi. Nếu tổng sai, có nghĩa là dữ liệu bị mất hoặc hỏng (vi phạm Durability/Consistency).
Điều hướng series:
Mục lục: Series: Triển khai Database ACID với Apache Pinot trên Ubuntu 24.04
« Phần 5: Tạo Schema và Table: Đảm bảo tính nhất quán ACID
Phần 7: Tối ưu hóa hiệu năng và xử lý sự cố (Troubleshooting) »