Chuẩn bị môi trường Kafka để làm Source và Sink
Để xây dựng luồng dữ liệu thời gian thực, chúng ta cần một hệ thống message queue để đóng vai trò là nguồn dữ liệu (Source) và đích dữ liệu (Sink). Trong phần này, chúng ta sẽ khởi động Kafka và Zookeeper qua Docker Compose bên cạnh RisingWave đã chạy.
Cấu hình Docker Compose mở rộng
Bạn cần sửa file docker-compose.yml trong thư mục dự án để thêm service Kafka và Zookeeper. Đảm bảo các service này chia sẻ network với RisingWave để có thể kết nối qua hostname.
Thay đổi file: ~/risingwave/docker-compose.yml
version: '3.8'
services:
# Giữ nguyên các service RisingWave cũ (Frontend, Compute, Compaction, Meta, Storage)
# ... (copy từ phần 2) ...
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: risingwave-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- risingwave-net
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: risingwave-kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
networks:
- risingwave-net
networks:
risingwave-net:
name: risingwave-net
driver: bridge
Kết quả mong đợi: File cấu hình đã chứa thêm 2 service mới. Các service Kafka và Zookeeper sẽ chạy trong cùng mạng nội bộ với RisingWave.
Khởi động Kafka và tạo Topic
Khởi động lại toàn bộ stack để đưa Kafka vào hoạt động, sau đó tạo các topic cần thiết cho luồng dữ liệu.
cd ~/risingwave && docker compose up -d zookeeper kafka
Kết quả mong đợi: 2 container mới được tạo và trạng thái là "healthy".
Tạo topic source để nhập dữ liệu và topic sink để xuất kết quả.
docker exec -it risingwave-kafka kafka-topics --create --topic raw_events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092
docker exec -it risingwave-kafka kafka-topics --create --topic processed_events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092
Kết quả mong đợi: Không có lỗi báo về, terminal trả về thông báo topic đã được tạo thành công.
Xây dựng Source từ Kafka
Bây giờ chúng ta sẽ định nghĩa một bảng ảo trong RisingWave để đọc dữ liệu trực tiếp từ Kafka topic "raw_events". Đây là bước tạo "Source".
Kết nối vào RisingWave
Sử dụng psql để kết nối vào RisingWave đang chạy.
docker exec -it risingwave-frontend psql -U root -d dev
Kết quả mong đợi: Giao diện dòng lệnh SQL của RisingWave hiện ra với prompt dev=#.
Tạo bảng Source với Kafka
Chúng ta sẽ tạo một bảng raw_events để nhận JSON từ Kafka. RisingWave sẽ tự động phân tích cú pháp JSON và ánh xạ các field thành cột.
Yêu cầu: Dữ liệu gửi vào Kafka phải là chuỗi JSON. Cấu hình Source cần chỉ rõ address của Kafka broker (sử dụng hostname trong Docker network là "kafka").
CREATE TABLE raw_events (
event_id BIGINT,
user_id BIGINT,
event_type VARCHAR,
timestamp TIMESTAMP,
payload JSONB
)
WITH (
connector = 'kafka',
topic = 'raw_events',
properties.bootstrap.server = 'kafka:9092',
properties.group.id = 'risingwave-raw-group',
scan.startup.mode = 'earliest'
);
Kết quả mong đợi: RisingWave trả về CREATE TABLE. Bạn có thể kiểm tra bằng lệnh \d raw_events để xem cấu trúc bảng.
Verify Source hoạt động
Gửi một mẫu dữ liệu vào Kafka và kiểm tra xem RisingWave có nhận được không.
echo '{"event_id": 101, "user_id": 50, "event_type": "login", "timestamp": "2024-05-20 10:00:00", "payload": {"ip": "192.168.1.1"}}' | docker exec -i risingwave-kafka kafka-console-producer --broker-list kafka:9092 --topic raw_events
Kiểm tra dữ liệu trong RisingWave:
SELECT * FROM raw_events;
Kết quả mong đợi: Lệnh SELECT trả về đúng 1 dòng dữ liệu bạn vừa gửi. RisingWave đã tự động parse JSON và điền vào các cột tương ứng.
Xây dựng Sink để đẩy dữ liệu ra Kafka
Sau khi đã có dữ liệu vào (Source), chúng ta cần tạo một bảng đích (Sink) để đẩy kết quả xử lý ra ngoài. Ở đây chúng ta sẽ đẩy sang Kafka topic "processed_events".
Tạo View để xử lý dữ liệu (Intermediate Step)
Trước khi tạo Sink, chúng ta cần định nghĩa logic xử lý. Chúng ta sẽ tạo một View để lọc và biến đổi dữ liệu từ raw_events.
CREATE VIEW active_logins AS
SELECT
event_id,
user_id,
event_type,
timestamp,
payload->>'ip' as user_ip
FROM raw_events
WHERE event_type = 'login';
Kết quả mong đợi: View active_logins được tạo. Dữ liệu trong view này sẽ luôn là dữ liệu realtime từ source.
Tạo Sink từ View
Định nghĩa Sink để đẩy nội dung của View active_logins ra Kafka. RisingWave sẽ tự động serialize dữ liệu thành JSON khi đẩy ra Kafka.
CREATE SINK login_sink
FROM active_logins
WITH (
connector = 'kafka',
topic = 'processed_events',
properties.bootstrap.server = 'kafka:9092',
format = 'json'
);
Kết quả mong đợi: RisingWave trả về CREATE SINK. Luồng dữ liệu đã được thiết lập: Kafka -> Source -> View (Logic) -> Sink -> Kafka.
Verify Sink hoạt động
Gửi thêm dữ liệu login mới vào source và đọc trực tiếp từ topic sink để xác nhận.
echo '{"event_id": 102, "user_id": 55, "event_type": "login", "timestamp": "2024-05-20 10:05:00", "payload": {"ip": "192.168.1.2"}}' | docker exec -i risingwave-kafka kafka-console-producer --broker-list kafka:9092 --topic raw_events
Đọc dữ liệu từ topic đích (processed_events):
docker exec -it risingwave-kafka kafka-console-consumer --bootstrap-server kafka:9092 --topic processed_events --from-beginning --max-messages 2
Kết quả mong đợi: Terminal hiển thị JSON đã được xử lý (chứa trường user_ip được trích xuất từ payload). Lưu ý: Dữ liệu có thể xuất hiện dưới dạng chuỗi JSON đơn giản hoặc tùy thuộc vào cấu hình format.
Ví dụ nâng cao: Streaming UDF và Aggregation
Chúng ta sẽ mở rộng luồng dữ liệu bằng cách sử dụng hàm SQL để tính toán tổng số lần login theo từng user trong một khoảng thời gian (window aggregation) và đẩy kết quả này ra S3 hoặc Kafka.
Tạo Sink với Aggregation và Window
Thay vì đẩy từng dòng, chúng ta sẽ tổng hợp (COUNT) số lần login của mỗi user trong cửa sổ 5 phút và đẩy kết quả vào topic "aggregated_stats".
CREATE SINK login_stats_sink
AS
SELECT
user_id,
TUMBLE_END(event_time) AS window_end,
COUNT(*) AS login_count
FROM raw_events
WHERE event_type = 'login'
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '5 minute')
WITH (
connector = 'kafka',
topic = 'aggregated_stats',
properties.bootstrap.server = 'kafka:9092',
format = 'json'
);
Kết quả mong đợi: RisingWave tạo sink mới. Luồng này sẽ tự động gom nhóm dữ liệu theo thời gian thực. Khi 5 phút trôi qua (hoặc khi dữ liệu mới đến đủ để đóng window), kết quả sẽ được đẩy ra Kafka.
Verify kết quả Aggregation
Gửi nhiều sự kiện login liên tiếp cho cùng một user để kích hoạt window aggregation.
for i in {1..3}; do echo "{\"event_id\": $i, \"user_id\": 99, \"event_type\": \"login\", \"timestamp\": \"2024-05-20 11:00:00\", \"payload\": {}}"; sleep 1; done | docker exec -i risingwave-kafka kafka-console-producer --broker-list kafka:9092 --topic raw_events
Đọc kết quả từ topic aggregated_stats (có thể cần đợi một chút để RisingWave tính toán window):
docker exec -it risingwave-kafka kafka-console-consumer --bootstrap-server kafka:9092 --topic aggregated_stats --from-beginning
Kết quả mong đợi: Xuất hiện một dòng JSON chứa user_id: 99 và login_count: 3 (hoặc số lượng sự kiện đã gửi).
Quản lý và Debug các luồng dữ liệu
Trong quá trình vận hành, bạn cần biết cách kiểm tra trạng thái của Source và Sink, cũng như cách xử lý khi luồng bị lỗi.
Liệt kê các Materialized View và Sink
Sử dụng lệnh SQL để xem danh sách các bảng vật lý (Materialized Table) và Sink đang chạy.
\d+
Kết quả mong đợi: Danh sách các bảng, trong đó có raw_events (Source), active_logins (View), và các Sink đã tạo.
Reset dữ liệu và trạng thái Source/Sink
Nếu bạn muốn xóa dữ liệu đang tích lũy trong Source hoặc Sink mà không xóa bảng, bạn có thể sử dụng lệnh DROP và CREATE lại, hoặc sử dụng tính năng reset offset (tùy thuộc vào connector). Cách đơn nhất nhất là xóa Sink và tạo lại để reset logic.
DROP SINK login_stats_sink;
Kết quả mong đợi: Sink bị xóa, luồng đẩy dữ liệu dừng lại. Bạn có thể chạy lại lệnh CREATE SINK để thiết lập lại.
Monitor trạng thái qua Dashboard (RisingWave Dashboard)
Mặc dù không yêu cầu cài đặt thêm, nhưng bạn có thể truy cập RisingWave Dashboard để xem trực quan các luồng dữ liệu (Dataflow).
curl -X GET http://localhost:4567/dashboard
Kết quả mong đợi: Mở trình duyệt web, bạn sẽ thấy giao diện quản trị. Vào tab "Pipelines" hoặc "Dataflow" để xem sơ đồ kết nối giữa Source, Compute Node và Sink. Màu xanh lá cây nghĩa là healthy, màu đỏ nghĩa là lỗi.
Điều hướng series:
Mục lục: Series: Triển khai Database Serverless với RisingWave trên Ubuntu 24.04
« Phần 4: Kết nối client và thực thi các truy vấn cơ bản
Phần 6: Tối ưu hiệu năng và khắc phục sự cố thường gặp »