Triển khai Confluent Schema Registry
Schema Registry đóng vai trò là kho lưu trữ tập trung các định nghĩa schema, giúp đảm bảo tính tương thích và hiệu quả khi truyền tải dữ liệu giữa các dịch vụ trong hệ sinh thái Kafka.
Chúng ta sẽ sử dụng Docker để chạy Schema Registry nhằm đơn giản hóa quy trình cài đặt, đồng thời đảm bảo môi trường chạy độc lập với các thành phần khác.
Thực thi lệnh sau để kéo image và khởi động container Schema Registry, lắng nghe cổng 8081:
docker run -d --name schema-registry --rm -p 8081:8081 -e SCHEMA_REGISTRY_HOST_NAME=localhost -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092 confluentinc/cp-schema-registry:7.5.0
Kết quả mong đợi: Container được tạo thành công và trạng thái là "Running". Bạn có thể kiểm tra qua lệnh docker ps.
Để xác nhận Schema Registry đã sẵn sàng, truy cập vào endpoint health check:
curl http://localhost:8081/health
Kết quả mong đợi: Trả về JSON chứa trạng thái "status":"UP", xác nhận dịch vụ đang hoạt động bình thường.
Cấu hình Debezium để xuất dữ liệu dạng Avro
Để Debezium có thể tự động đăng ký schema và xuất dữ liệu dưới định dạng Avro, chúng ta cần điều chỉnh cấu hình của Kafka Connect Worker và Connector.
Cập nhật cấu hình Kafka Connect Worker
Trước tiên, cần khai báo thông tin kết nối đến Schema Registry trong file cấu hình worker. Điều này cho phép worker tự động tạo schema mới khi phát hiện dữ liệu chưa có định nghĩa.
Mở file cấu hình worker (thường là /etc/kafka/connect-distributed.properties hoặc file bạn đang dùng trong Docker) và thêm các dòng sau vào cuối file:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Kết quả mong đợi: File cấu hình được lưu. Lưu ý: Nếu đang chạy Docker, bạn cần restart container Kafka Connect để áp dụng thay đổi này bằng lệnh docker restart kafka-connect.
Cấu hình Connector PostgreSQL
Bây giờ, cập nhật payload khi tạo hoặc cập nhật connector PostgreSQL để sử dụng Avro. Điểm quan trọng là thiết lập value.converter.class và key.converter.class thành AvroConverter.
Thực thi lệnh sau để đăng ký connector với cấu hình Avro:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
--data '{
"name": "postgres-avro-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"database.server.name": "postgres-server",
"database.include.list": "postgres",
"table.include.list": "public.users",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot_avro",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"key.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": "true"
}
}'
Kết quả mong đợi: API trả về mã 201 Created, xác nhận connector đã được tạo. Kiểm tra trạng thái qua curl http://localhost:8083/connectors/postgres-avro-connector/status, trạng thái phải là RUNNING.
Kiểm tra việc tự động đăng ký Schema
Điểm mạnh nhất của Schema Registry là khả năng tự động phát hiện schema từ dữ liệu đầu vào và lưu trữ nó. Chúng ta sẽ kích hoạt luồng dữ liệu để xác nhận quá trình này.
Thực hiện thao tác chèn một bản ghi mới vào bảng users trong PostgreSQL (giả sử bạn đã có bảng này từ phần trước):
psql -U postgres -d postgres -c "INSERT INTO users (id, name, email) VALUES (101, 'Nguyen Van A', 'nguyena@example.com');"
Kết quả mong đợi: Câu lệnh SQL chạy thành công, trả về INSERT 0 1.
Ngay lập tức, truy cập vào API của Schema Registry để xem danh sách các topic đã được đăng ký schema:
curl http://localhost:8081/subjects
Kết quả mong đợi: Bạn sẽ thấy danh sách các subject (chủ đề schema) xuất hiện, thường có định dạng postgres-server.users-key và postgres-server.users-value.
Để xem chi tiết schema đã được tự động tạo cho trường value (dữ liệu thực), thực thi lệnh:
curl http://localhost:8081/subjects/postgres-server.users-value/versions/1
Kết quả mong đợi: API trả về JSON chứa định nghĩa schema Avro chi tiết, bao gồm các trường id (int32), name (string), email (string) cùng với các trường metadata của Debezium như op, ts_ms.
So sánh cấu trúc message JSON và Avro
Việc hiểu rõ sự khác biệt giữa định dạng JSON và Avro giúp bạn tối ưu hóa băng thông và bảo mật dữ liệu. Avro sử dụng định dạng binary, trong khi JSON là text.
Chúng ta sẽ so sánh bằng cách đọc một message từ topic Kafka sử dụng công cụ kafka-console-consumer với các tùy chọn khác nhau.
Đọc message dạng JSON (mặc định)
Giả sử bạn có một connector cũ hoặc một topic khác xuất dữ liệu JSON. Nếu bạn cố đọc message Avro bằng parser JSON, bạn sẽ thấy dữ liệu bị mã hóa (gibberish) hoặc lỗi nếu không cấu hình đúng parser.
Lệnh đọc message dạng JSON (chỉ hiển thị nội dung text dễ đọc):
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic postgres-server.users \
--from-beginning \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
| head -n 5
Kết quả mong đợi: Nếu dữ liệu là JSON thuần, bạn sẽ thấy chuỗi text rõ ràng như {"id":101, "name":"Nguyen Van A"}. Nếu dữ liệu là Avro, bạn sẽ thấy các ký tự không in được (binary garbage) vì StringDeserializer không hiểu được cấu trúc Avro.
Đọc message dạng Avro
Để đọc đúng message Avro, chúng ta cần sử dụng AvroDeserializer và trỏ về Schema Registry để lấy định nghĩa schema.
Lệnh đọc message dạng Avro (cần đảm bảo thư viện Avro đã có trong classpath của Kafka Connect hoặc môi trường client):
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic postgres-server.users \
--from-beginning \
--property print.key=true \
--property key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer \
--property value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer \
--property key.deserializer.schema.registry.url=http://localhost:8081 \
--property value.deserializer.schema.registry.url=http://localhost:8081 \
| head -n 5
Kết quả mong đợi: Bạn sẽ thấy dữ liệu được hiển thị rõ ràng dưới dạng JSON, nhưng đây là kết quả của việc giải mã (deserialization) từ binary Avro. Cấu trúc sẽ bao gồm đầy đủ các trường và metadata.
Phân tích sự khác biệt
Khi so sánh trực tiếp kích thước và định dạng:
- Định dạng JSON: Dữ liệu lưu trữ dưới dạng text, bao gồm tên trường trong mỗi message. Kích thước lớn hơn, tốn băng thông, nhưng dễ đọc trực tiếp bằng mắt thường hoặc các công cụ text editor.
- Định dạng Avro: Dữ liệu lưu trữ dưới dạng binary. Tên trường chỉ được gửi một lần khi đăng ký schema, các message sau chỉ chứa giá trị. Kích thước nhỏ hơn đáng kể (thường giảm 30-50% so với JSON), tiết kiệm băng thông và lưu trữ, nhưng cần Schema Registry để giải mã.
Để minh chứng sự khác biệt về kích thước, bạn có thể kiểm tra kích thước của file log hoặc sử dụng lệnh hexdump trên dữ liệu thô trong topic (tuy nhiên trong môi trường sản xuất, việc so sánh qua kích thước payload trong JSON trả về của consumer là đủ rõ ràng).
Trong bài này, việc chuyển sang Avro đã giúp giảm tải cho mạng lưới và chuẩn hóa dữ liệu cho các pipeline xử lý dữ liệu lớn (Big Data) như Spark hoặc Flink trong tương lai.
Điều hướng series:
Mục lục: Series: Triển khai Database Change Data Capture với Debezium và Kafka trên Ubuntu 24.04
« Phần 6: Kiểm tra luồng dữ liệu: Chèn, cập nhật và xóa
Phần 8: Troubleshooting, tối ưu hóa và bảo mật nâng cao »