1. Hiểu và áp dụng các chế độ Envelope trong Materialize
Envelope xác định cách Materialize đóng gói dữ liệu khi nó đi từ Source vào hệ thống hoặc từ Sink ra ngoài. Việc chọn sai Envelope sẽ dẫn đến mất dữ liệu hoặc tạo ra bản ghi trùng lặp không thể phục hồi.
Materialize hỗ trợ ba chế độ chính: DEBEZIUM (đầy đủ nhất cho CDC), UPSERT (tối ưu cho dữ liệu có khóa chính), và NONE (dữ liệu thô, không có trạng thái).
1.1. Chế độ DEBEZIUM (Change Data Capture)
Chế độ này mô phỏng định dạng của Debezium. Mỗi dòng dữ liệu chứa một envelope JSON bao gồm các trường before (trạng thái cũ), after (trạng thái mới) và op (operation: c, u, d, r).
Đây là chuẩn vàng khi làm việc với Kafka Connect (Debezium) để xử lý Insert, Update, Delete.
Để tạo một Source sử dụng envelope Debezium, bạn khai báo ENVELOPE DEBEZIUM trong câu lệnh CREATE SOURCE:
CREATE SOURCE my_db_source
FROM KAFKA BROKER 'kafka:9092' TOPIC 'orders-changes'
KEY FORMAT BYTES
VALUE FORMAT JSON ENVELOPE DEBEZIUM;
Kết quả mong đợi: Materialize sẽ tự động phân tích JSON, tách các trường before và after thành hai cột riêng biệt trong table metadata, và tạo cột _meta chứa thông tin operation.
1.2. Chế độ UPSERT
Chế độ này dùng cho dữ liệu dạng Key-Value. Nó yêu cầu định dạng giá trị phải là JSON chứa hai trường: key và value.
Với UPSERT, Materialize hiểu rằng mỗi record là một trạng thái cập nhật hoàn chỉnh. Nếu giá trị là NULL, nó coi đó là lệnh Delete.
CREATE SOURCE my_upsert_source
FROM KAFKA BROKER 'kafka:9092' TOPIC 'user-snapshots'
KEY FORMAT BYTES
VALUE FORMAT JSON ENVELOPE UPSERT;
Kết quả mong đợi: Source này chỉ hiển thị trạng thái hiện tại của dữ liệu. Các bản ghi cũ bị ẩn, chỉ còn lại bản ghi mới nhất cho mỗi khóa chính.
1.3. Chế độ NONE
Dùng khi dữ liệu chỉ là dòng chảy sự kiện đơn thuần (Append-only), không có khái niệm Update hay Delete.
CREATE SOURCE raw_events
FROM KAFKA BROKER 'kafka:9092' TOPIC 'clickstream'
KEY FORMAT BYTES
VALUE FORMAT JSON;
Kết quả mong đợi: Dữ liệu được lưu y nguyên như dòng log. Không có cơ chế deduplicate tự động dựa trên khóa chính.
2. Xử lý sự kiện Insert, Update và Delete
Khi dữ liệu đi qua chế độ DEBEZIUM, bạn cần biết cách truy xuất và xử lý từng loại sự kiện trong SQL để xây dựng logic nghiệp vụ chính xác.
2.1. Phân tích cấu trúc dữ liệu DEBEZIUM
Khi tạo Source với ENVELOPE DEBEZIUM, Materialize tự động tạo các cột đặc biệt. Bạn không cần parse JSON thủ công.
Cấu trúc dữ liệu trả về sẽ bao gồm:
- Cột dữ liệu gốc: Các trường nằm trong
after (ví dụ: id, name, amount).
- Cột
before: Chứa giá trị cũ của các trường (NULL nếu là Insert).
- Cột
op: Chứa ký tự đại diện cho operation: 'c' (create), 'u' (update), 'd' (delete), 'r' (read).
Để xem cấu trúc này, thực thi lệnh DESCRIBE trên source vừa tạo:
DESCRIBE my_db_source;
Kết quả mong đợi: Danh sách các cột sẽ hiển thị rõ các trường business data, cùng với các cột hệ thống before, after, op.
2.2. Lọc sự kiện Insert (Create)
Để chỉ lấy dữ liệu mới được thêm vào (ví dụ: đếm đơn hàng mới), bạn lọc theo cột op.
SELECT id, name, amount
FROM my_db_source
WHERE op = 'c';
Kết quả mong đợi: Chỉ trả về các bản ghi mới được tạo, không bao gồm các bản ghi đã được cập nhật hoặc xóa.
2.3. Xử lý sự kiện Update
Khi có sự kiện Update, cột before sẽ chứa giá trị cũ, after chứa giá trị mới.
Bạn có thể so sánh để tìm ra sự thay đổi cụ thể (ví dụ: giá đơn hàng thay đổi bao nhiêu):
SELECT id,
before.amount as old_amount,
after.amount as new_amount,
(after.amount - before.amount) as difference
FROM my_db_source
WHERE op = 'u';
Kết quả mong đợi: Một bảng chỉ chứa các dòng đã bị cập nhật, kèm theo sự chênh lệch giá trị giữa trạng thái cũ và mới.
2.4. Xử lý sự kiện Delete
Khi có sự kiện Delete, cột before chứa giá trị cuối cùng trước khi xóa, còn after là NULL.
Để tạo log xóa hoặc xóa khỏi bảng đích, lọc theo op = 'd':
SELECT id, before.name as deleted_name
FROM my_db_source
WHERE op = 'd';
Kết quả mong đợi: Trả về danh sách các bản ghi đã bị xóa khỏi nguồn gốc, kèm theo thông tin giá trị của chúng trước khi mất.
3. Cấu hình Source để tự động nhận diện thay đổi (CDC)
Để Materialize tự động nhận diện thay đổi, bạn phải cấu hình Source kết nối với Kafka Topic mà Kafka Connect (Debezium) đã xuất ra. Materialize không tự động "lắng nghe" DB, nó lắng nghe Kafka.
Bước này giả định bạn đã có Kafka và Debezium chạy (như Phần 3). Chúng ta sẽ cấu hình Materialize để hiểu đúng định dạng dữ liệu từ Debezium.
3.1. Khai báo Source với CDC metadata
Debezium xuất dữ liệu với định dạng JSON đặc biệt. Materialize cần biết điều này để tách khóa và giá trị.
Cấu hình chuẩn cho Source CDC:
- KEY FORMAT: Thường là
BYTES (khi Debezium dùng key là byte) hoặc JSON nếu key là JSON.
- VALUE FORMAT: Phải là
JSON ENVELOPE DEBEZIUM.
- TOPIC: Tên topic mà Debezium đã publish (ví dụ:
server1.inventory.customers).
Thực thi lệnh tạo Source:
CREATE SOURCE orders_cdc
FROM KAFKA BROKER 'kafka:9092' TOPIC 'orders-changes'
KEY FORMAT BYTES
VALUE FORMAT JSON ENVELOPE DEBEZIUM
WITH (
'scan.startup.mode' = 'earliest',
'scan.timestamp' = '2023-10-01T00:00:00Z'
);
Giải thích tham số: 'scan.startup.mode' = 'earliest' giúp Materialize đọc lại toàn bộ lịch sử từ đầu topic (nếu cần), còn 'scan.timestamp' giúp bắt đầu từ một thời điểm cụ thể.
Kết quả mong đợi: Source orders_cdc được tạo thành công và bắt đầu nhận dữ liệu. Dữ liệu trong source này sẽ tự động có các cột before, after, op.
3.2. Tạo Materialized View để tổng hợp trạng thái
Mục đích của CDC là duy trì một "bản sao" của bảng gốc. Để làm điều này, bạn tạo một Materialized View áp dụng logic Upsert từ các sự kiện CDC.
Sử dụng cú pháp UPSERT trong SQL của Materialize để tự động gộp các sự kiện Insert/Update/Delete:
CREATE MATERIALIZED VIEW orders_state
AS
SELECT id, name, amount
FROM orders_cdc
WHERE op != 'd';
Lưu ý: Với Source có envelope DEBEZIUM, Materialize tự động thực hiện logic "latest state" nếu bạn chọn tất cả các cột dữ liệu. Tuy nhiên, để rõ ràng, bạn có thể lọc bỏ sự kiện delete nếu muốn giữ log, hoặc dùng logic phức tạp hơn.
Tuy nhiên, cách chuẩn nhất để tạo "bảng trạng thái" từ CDC là sử dụng hàm UPSERT ngầm định hoặc khai báo rõ ràng. Materialize tự động xử lý deduplicate dựa trên Primary Key nếu bạn chỉ chọn các cột dữ liệu từ source DEBEZIUM.
Kiểm tra lại bằng cách chọn toàn bộ view:
SELECT * FROM orders_state;
Kết quả mong đợi: Bạn sẽ thấy một bảng dữ liệu sạch, chỉ chứa trạng thái hiện tại của các đơn hàng. Nếu bạn xóa một đơn hàng trong DB gốc, nó sẽ biến mất khỏi view này. Nếu bạn update, giá trị sẽ thay đổi.
4. Verify kết quả và kiểm soát dữ liệu
Sau khi cấu hình, bạn cần kiểm tra xem Envelope và CDC có hoạt động đúng như mong đợi không.
4.1. Kiểm tra sự kiện Update/Delete
Thực hiện một thao tác trên nguồn dữ liệu gốc (PostgreSQL), sau đó quan sát Source Materialize.
1. Update một bản ghi trong DB gốc.
2. Chạy lệnh sau để xem sự kiện update đã được nhận chưa:
SELECT id, op, before.amount, after.amount
FROM orders_cdc
WHERE id = 123
ORDER BY emitted_at DESC
LIMIT 5;
Kết quả mong đợi: Dòng đầu tiên phải có op = 'u', before.amount là giá trị cũ, after.amount là giá trị mới.
4.2. Kiểm tra trạng thái cuối cùng (Final State)
Đảm bảo Materialized View phản ánh đúng trạng thái hiện tại.
SELECT id, name, amount FROM orders_state WHERE id = 123;
Kết quả mong đợi: Giá trị trả về phải khớp 100% với giá trị hiện tại trong PostgreSQL.
4.3. Kiểm tra sự kiện Delete
Xóa bản ghi id = 123 trong PostgreSQL.
Quan sát Source:
SELECT id, op, before.name FROM orders_cdc WHERE id = 123 ORDER BY emitted_at DESC LIMIT 1;
Kết quả mong đợi: Dòng cuối cùng có op = 'd' và before.name chứa tên của đơn hàng đã xóa.
Quan sát View:
SELECT * FROM orders_state WHERE id = 123;
Kết quả mong đợi: Không trả về dòng nào (bản ghi đã biến mất khỏi view).
Điều hướng series:
Mục lục: Series: Triển khai Database Stream-native với Materialize trên Ubuntu 24.04
« Phần 5: Chuyển đổi dữ liệu thời gian thực với View và Materialized View
Phần 7: Giám sát hiệu năng và phân tích lỗi với CLI và Dashboard »