Thiết lập Source từ Kafka với Schema Registry
Chúng ta sẽ tạo một Source để Materialize tự động đọc dữ liệu từ Kafka topic và parse JSON schema từ Confluent Schema Registry.
Bước này thay thế việc phải viết logic parse thủ công, giúp Source hiểu cấu trúc dữ liệu ngay lập tức.
Trước tiên, hãy mở terminal và kết nối vào shell của container Materialize đã chạy sẵn.
docker exec -it materialize zsh
Kết quả mong đợi: Bạn thấy prompt lệnh của Materialize là materialize> hoặc tương tự.
Định nghĩa Source với Avro Schema
Chúng ta sẽ tạo Source từ topic user-events đã được chuẩn bị ở phần trước, sử dụng định dạng Avro và kết nối tới Schema Registry.
Lệnh này khai báo Materialize cần lấy dữ liệu từ Kafka, biết vị trí của Schema Registry để tự động map các field về kiểu dữ liệu SQL tương ứng.
CREATE SOURCE user_events_source
FROM KAFKA BROKER 'kafka:9092'
TOPIC 'user-events'
FORMAT AVRO USING SCHEMA REGISTRY 'schema-registry:8081';
Kết quả mong đợi: Materialize trả về CREATE SOURCE và không báo lỗi kết nối.
Verify Source đã hoạt động
Kiểm tra xem Source đã được tạo thành công và hiển thị schema tự động sinh ra như thế nào.
SHOW CREATE SOURCE user_events_source;
Kết quả mong đợi: Hiển thị toàn bộ cú pháp tạo Source cùng với định nghĩa các column (id, name, timestamp, v.v.) đã được Materialize tự động suy ra từ Schema Registry.
Xây dựng Sink để xuất dữ liệu biến đổi
Bây giờ chúng ta sẽ tạo Sink để đẩy dữ liệu đã được Materialize xử lý ra ngoài, có thể là về Kafka hoặc PostgreSQL.
Sink hoạt động như một "cửa ra", cho phép bạn gửi dữ liệu stream-time vào các hệ thống downstream.
Tạo Sink xuất ra Kafka với định dạng JSON
Chúng ta sẽ tạo một Sink đẩy dữ liệu từ Source user_events_source vào một topic Kafka mới tên là user-events-processed.
Materialize sẽ tự động serialize lại dữ liệu SQL sang định dạng JSON để Kafka có thể lưu trữ.
CREATE SINK user_events_sink
FROM user_events_source
INTO KAFKA BROKER 'kafka:9092'
TOPIC 'user-events-processed'
FORMAT JSON;
Kết quả mong đợi: Materialize trả về CREATE SINK. Dữ liệu sẽ bắt đầu được đẩy vào topic Kafka mới ngay lập tức nếu có dữ liệu mới vào Source.
Tạo Sink xuất ra PostgreSQL
Trong trường hợp cần lưu trữ dữ liệu vào hệ thống lưu trữ truy vấn (OLAP/OLTP) như PostgreSQL, chúng ta sẽ tạo Sink hướng tới database này.
Lưu ý: PostgreSQL phải đã được chạy và cấu hình ở Phần 2. Ở đây chúng ta giả định PostgreSQL đang chạy và lắng nghe ở host postgres (tên service trong docker-compose).
CREATE SINK user_events_pg
FROM user_events_source
INTO CONNECTION postgres_conn
TABLE user_events_table;
CREATE CONNECTION postgres_conn
TO POSTGRES (
HOST 'postgres',
PORT 5432,
USER 'materialize',
PASSWORD 'materialize',
DATABASE 'materialize'
);
Kết quả mong đợi: Materialize tạo connection object và sink. Dữ liệu từ Kafka sẽ được lưu vào bảng user_events_table trong PostgreSQL dưới dạng snapshot hoặc append tùy thuộc vào cấu hình.
Verify Sink hoạt động
Kiểm tra xem Sink có đang kết nối và đẩy dữ liệu bằng cách truy vấn trực tiếp vào đích đến.
Đối với Sink Kafka, sử dụng công cụ kafka-console-consumer để xem dữ liệu realtime.
kafka-console-consumer --bootstrap-server kafka:9092 --topic user-events-processed --from-beginning
Kết quả mong đợi: Bạn thấy dòng dữ liệu JSON được in ra màn hình tương ứng với dữ liệu đã đẩy vào Source.
Đối với Sink PostgreSQL, truy vấn trực tiếp vào bảng đích.
psql -h postgres -U materialize -d materialize -c "SELECT * FROM user_events_table LIMIT 5;"
Kết quả mong đợi: Bảng dữ liệu được hiển thị với các cột tương ứng với Source, xác nhận dữ liệu đã được lưu thành công vào PostgreSQL.
Điều hướng series:
Mục lục: Series: Triển khai Database Stream-native với Materialize trên Ubuntu 24.04
« Phần 3: Cấu hình nguồn dữ liệu Kafka và Confluent Schema Registry
Phần 5: Chuyển đổi dữ liệu thời gian thực với View và Materialized View »