Chuẩn bị môi trường kiểm thử PostgreSQL
Mở terminal để kết nối trực tiếp với instance PostgreSQL đã cấu hình trong các phần trước. Bạn cần truy cập vào database đích để thực hiện các thao tác tạo bảng và dữ liệu mẫu.
Mục đích: Đảm bảo kết nối ổn định và có sẵn bảng để ghi dữ liệu. Nếu chưa có bảng, bước này sẽ tạo bảng mẫu với các trường dữ liệu cơ bản.
Kết quả mong đợi: Bạn thấy prompt lệnh SQL (postgres=#) và bảng users được tạo thành công.
psql -U postgres -d debezium_db -c "DROP TABLE IF EXISTS users; CREATE TABLE users (id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, email VARCHAR(100), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP); INSERT INTO users (name, email) VALUES ('Nguyen Van A', 'a@example.com'), ('Tran Thi B', 'b@example.com');"
Chạy lệnh này xong, bạn đã có 2 bản ghi ban đầu trong bảng. Debezium sẽ bắt đầu capture các sự kiện xảy ra từ thời điểm này trở đi.
Thực hiện thao tác INSERT và quan sát luồng dữ liệu
Thực hiện lệnh chèn một bản ghi mới vào bảng users. Hành động này sẽ kích hoạt cơ chế Write-Ahead Log (WAL) của PostgreSQL, từ đó Debezium Connector sẽ đọc và đẩy sự kiện vào Kafka Topic.
Mục đích: Xác minh rằng pipeline đã hoạt động và sự kiện "Create" được truyền tải chính xác từ nguồn đến đích.
Kết quả mong đợi: Một bản ghi mới được thêm vào PostgreSQL và một message mới xuất hiện trong Kafka Topic tương ứng.
psql -U postgres -d debezium_db -c "INSERT INTO users (name, email) VALUES ('Le Van C', 'c@example.com');"
Sau khi thực hiện lệnh INSERT, hãy mở một terminal khác để chạy Kafka Console Consumer. Bạn cần chỉ định đúng tên topic mà Connector đã đăng ký (thường là server1.debezium_db.public.users hoặc tương tự tùy cấu hình).
Mục đích: Đọc trực tiếp payload JSON từ Kafka để kiểm tra cấu trúc sự kiện, đặc biệt là trường op (operation) và before/after.
Kết quả mong đợi: Console hiển thị một dòng JSON với "op": "c" (create), trường "before" là null, và trường "after" chứa dữ liệu đầy đủ của bản ghi mới.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic server1.debezium_db.public.users --from-beginning --property print.key=true --property key.separator=" | "
Chú ý: Nếu bạn chạy consumer ngay sau khi INSERT, bạn sẽ thấy message hiện ra. Nếu không thấy, hãy chạy lại lệnh INSERT rồi xem lại consumer. Trường op là "c" xác nhận đây là sự kiện chèn mới.
Thực hiện thao tác UPDATE và kiểm tra payload
Chạy lệnh UPDATE để thay đổi một trường dữ liệu của bản ghi đã tồn tại. Ví dụ: cập nhật email của người dùng có ID = 1.
Mục đích: Kiểm tra khả năng ghi nhận sự kiện thay đổi của Debezium. Đây là trường hợp quan trọng nhất trong Change Data Capture (CDC) vì nó chứa cả trạng thái cũ và trạng thái mới.
Kết quả mong đợi: PostgreSQL cập nhật dữ liệu thành công và Kafka nhận được một sự kiện với op là "u".
psql -U postgres -d debezium_db -c "UPDATE users SET email = 'updated_a@example.com' WHERE id = 1;"
Quay lại terminal đang chạy Kafka Console Consumer. Quan sát message mới xuất hiện tương ứng với lệnh UPDATE vừa thực hiện.
Mục đích: Phân tích cấu trúc JSON để hiểu cách Debezium lưu trữ lịch sử thay đổi. Bạn sẽ thấy sự khác biệt rõ rệt so với sự kiện INSERT.
Kết quả mong đợi: JSON message có "op": "u" (update). Quan trọng nhất, nó chứa cả hai trường: "before" (dữ liệu cũ trước khi update) và "after" (dữ liệu mới sau khi update).
# Không cần chạy lệnh mới, chỉ cần quan sát output của lệnh consumer đang chạy ở phần trên
Khi inspect kỹ payload, bạn sẽ thấy trong "before" có "email": "a@example.com" và trong "after" có "email": "updated_a@example.com". Điều này chứng tỏ Debezium đang ghi nhận đầy đủ sự thay đổi.
Thực hiện thao tác DELETE và xác nhận Tombstone
Thực hiện lệnh DELETE để xóa một bản ghi khỏi bảng PostgreSQL. Ví dụ: xóa người dùng có ID = 2.
Mục đích: Xác minh cơ chế "Tombstone" của Kafka. Khi một record bị xóa trong nguồn, Kafka cần biết để xóa record đó khỏi các partition của topic đích.
Kết quả mong đợi: Bản ghi bị xóa khỏi PostgreSQL và một sự kiện đặc biệt xuất hiện trong Kafka.
psql -U postgres -d debezium_db -c "DELETE FROM users WHERE id = 2;"
Quan sát output của Kafka Console Consumer. Sự kiện cho thao tác DELETE sẽ có cấu trúc rất đặc biệt, khác biệt hoàn toàn so với INSERT và UPDATE.
Mục đích: Hiểu về cơ chế Tombstone (gravestone) trong Kafka. Tombstone là một message có Key giống với record cần xóa nhưng Value (payload) là null.
Kết quả mong đợi: Console hiển thị message với "op": "d" (delete). Trường "before" chứa dữ liệu của record đã xóa, nhưng trường "after" là null.
# Quan sát output từ console consumer đang chạy
Trong JSON bạn sẽ thấy "before": { "id": 2, ... } và "after": null. Khi Kafka Consumer (hoặc các stream processing engine như Flink, Spark) đọc được message này, chúng sẽ biết phải xóa key tương ứng khỏi state store hoặc sink đích.
Tổng hợp và kiểm chứng luồng dữ liệu
Để đảm bảo toàn bộ pipeline hoạt động chính xác, hãy thực hiện một vòng kiểm thử nhanh bằng cách tạo một script nhỏ hoặc chạy thủ công các lệnh trên với các giá trị khác nhau.
Mục đích: Xác nhận tính nhất quán của dữ liệu giữa PostgreSQL và Kafka Topic. Đảm bảo không có sự kiện bị mất (lost) hoặc bị sai thứ tự (out of order).
Kết quả mong đợi: Mỗi thao tác SQL (INSERT, UPDATE, DELETE) đều sinh ra đúng một message trong Kafka với cấu trúc payload chuẩn xác.
psql -U postgres -d debezium_db -c "INSERT INTO users (name, email) VALUES ('Test User', 'test@test.com'); UPDATE users SET name = 'Modified Test' WHERE id = 4; DELETE FROM users WHERE id = 4;"
Quay lại console consumer và kiểm tra xem bạn có thấy đúng 3 sự kiện theo thứ tự: Create (op=c) -> Update (op=u) -> Delete (op=d). Việc kiểm tra này là bước cuối cùng để xác nhận phần 6 đã hoàn tất thành công trước khi chuyển sang tích hợp Schema Registry.
Nếu bạn thấy đầy đủ 3 sự kiện với cấu trúc before/after chính xác, thì luồng dữ liệu CDC của bạn đã hoạt động hoàn hảo.
Điều hướng series:
Mục lục: Series: Triển khai Database Change Data Capture với Debezium và Kafka trên Ubuntu 24.04
« Phần 5: Cấu hình và khởi động Debezium Connector
Phần 7: Tích hợp Schema Registry và xử lý định dạng Avro/JSON »