1. Soạn thảo file cấu hình Connector PostgreSQL
Tạo file cấu hình JSON chứa các thông số kết nối và logic xử lý của Debezium cho PostgreSQL.
File này định nghĩa cách Kafka Connect kết nối với database, topic nào sẽ được tạo, và định dạng dữ liệu đầu ra (JSON hoặc Avro).
Đường dẫn file: /etc/kafka/debezium-postgres-connector.json
Đoạn code dưới đây cấu hình connector để bắt sự kiện thay đổi từ database demo_db của PostgreSQL, ghi vào topic dbserver1.public.inventory và sử dụng định dạng JSON.
cat > /etc/kafka/debezium-postgres-connector.json
Kết quả mong đợi: File được tạo thành công tại đường dẫn chỉ định. Không có lỗi cú pháp khi chạy lệnh cat. Các trường như database.password và slot.name đã được điền đúng theo cấu hình PostgreSQL ở phần 4.
2. Gửi request tạo Connector qua REST API
Sử dụng lệnh curl để gửi file cấu hình vừa tạo vào endpoint quản lý của Kafka Connect.
Kafka Connect hoạt động như một service nhận HTTP request để đăng ký và quản lý các connector. Chúng ta sẽ gửi POST request tới port 8083 (mặc định của Connect).
Thực thi lệnh dưới đây để kích hoạt connector. Đảm bảo dịch vụ Kafka Connect đang chạy và lắng nghe port 8083.
curl -i -X POST -H "Content-Type: application/json" --data @/etc/kafka/debezium-postgres-connector.json http://localhost:8083/connectors
Kết quả mong đợi: Nhận phản hồi HTTP 201 Created. Nội dung JSON trả về chứa thông tin name là "postgres-connector" và trạng thái state ban đầu có thể là CREATED hoặc RUNNING.
3. Kiểm tra trạng thái Connector
Query lại Kafka Connect để xác nhận connector đã chuyển sang trạng thái RUNNING và không có lỗi khởi động.
Trạng thái RUNNING nghĩa là connector đã kết nối thành công với PostgreSQL, tạo được replication slot (nếu cần) và sẵn sàng lắng nghe WAL log.
Thực hiện lệnh sau để lấy chi tiết trạng thái của connector "postgres-connector".
curl -s http://localhost:8083/connectors/postgres-connector/status | jq .
Kết quả mong đợi: Trong output JSON, trường state phải là RUNNING. Trường worker_id phải có giá trị (không null). Nếu thấy FAILED, cần kiểm tra phần trace hoặc log để tìm nguyên nhân.
4. Phân tích log khởi động và xử lý lỗi thường gặp
Kiểm tra log của Kafka Connect để xác nhận quá trình kết nối với PostgreSQL diễn ra suôn sẻ.
Log chứa thông tin chi tiết về việc handshake với database, việc tạo topic tự động, và việc bắt đầu lắng nghe dòng thay đổi dữ liệu (WAL).
Giả sử Kafka Connect đang chạy bằng systemd hoặc script shell, log thường nằm tại /var/log/kafka/connect.log hoặc stdout của container. Dưới đây là lệnh tail log thời gian thực.
tail -f /var/log/kafka/connect.log | grep -E "(postgres-connector|INFO|ERROR|WARNING)"
Kết quả mong đợi: Thấy dòng log chứa Connector 'postgres-connector' is starting, sau đó là Successfully opened database connection và Successfully created slot (nếu chưa tồn tại). Không xuất hiện dòng ERROR liên quan đến authentication hay replication.
Nếu gặp lỗi Authentication failed: Kiểm tra lại database.password trong file JSON và file pg_hba.conf của PostgreSQL.
Nếu gặp lỗi slot does not exist: Đảm bảo đã chạy lệnh SQL tạo slot trong phần 4, hoặc bật tùy chọn slot.autocreate.mode trong config nếu muốn tự động tạo.
5. Xác minh kết quả cuối cùng
Thực hiện các bước kiểm tra nhanh để đảm bảo connector đã sẵn sàng cho phần 6.
- Chạy lệnh
kafka-topics.sh --bootstrap-server localhost:9092 --list để kiểm tra xem topic dbserver1.public.inventory đã được tự động tạo chưa.
- Chạy lệnh
curl http://localhost:8083/connectors/postgres-connector/config để xác nhận cấu hình đã được lưu trữ đúng trong Kafka Connect.
- Quan sát log một lần nữa để đảm bảo không có cảnh báo heartbeat timeout hay mất kết nối.
Khi thấy topic đã xuất hiện trong danh sách và trạng thái connector là RUNNING ổn định trong 30 giây, việc cấu hình hoàn tất. Dữ liệu sẽ bắt đầu được đẩy vào topic ngay khi có sự thay đổi trên bảng inventory.
Điều hướng series:
Mục lục: Series: Triển khai Database Change Data Capture với Debezium và Kafka trên Ubuntu 24.04
« Phần 4: Cấu hình PostgreSQL để kích hoạt Change Data Capture
Phần 6: Kiểm tra luồng dữ liệu: Chèn, cập nhật và xóa »