Cài đặt và khởi động Confluent Schema Registry
Tải và cài đặt Schema Registry từ kho Confluent để quản lý định dạng dữ liệu (Avro/Protobuf/JSON) cho Kafka.
Schema Registry hoạt động như một dịch vụ độc lập, lưu trữ schema và cho phép Producer/Consumer tra cứu. Nó đảm bảo tính tương thích giữa các phiên bản dữ liệu.
Cài đặt gói confluent-platform hoặc chỉ schema-registry từ kho.
sudo apt update
sudo apt install -y confluent-schema-registry
Khởi động thành công dịch vụ Schema Registry trên cổng mặc định 8081.
Cấu hình file để kết nối với Kafka đã cài đặt ở Phần 2. Đường dẫn file cấu hình là /etc/schema-registry/schema-registry.properties.
Tạo file cấu hình hoàn chỉnh với thông số kết nối Kafka và cổng HTTP.
cat > /etc/schema-registry/schema-registry.properties
File cấu hình đã được ghi vào /etc/schema-registry/schema-registry.properties với thông số kết nối Kafka ở localhost:9092.
Khởi động dịch vụ Schema Registry dưới dạng service systemd để chạy bền vững.
sudo systemctl daemon-reload
sudo systemctl enable schema-registry
sudo systemctl start schema-registry
Dịch vụ Schema Registry đang chạy (active running) và sẵn sàng nhận yêu cầu.
Verify: Kiểm tra trạng thái service và gọi API health check.
systemctl status schema-registry
curl http://localhost:8081/
Output của curl trả về JSON {"status":"OK"} hoặc tương tự, xác nhận API đang hoạt động.
Định nghĩa Avro Schema cho Domain Event
Tạo file schema định dạng Avro cho sự kiện "UserCreated" làm mẫu cho Event Sourcing.
Avro Schema mô tả cấu trúc dữ liệu chặt chẽ, bao gồm tên trường, kiểu dữ liệu và giá trị mặc định, giúp Schema Registry kiểm tra tính tương thích.
Tạo thư mục chứa schema và file định nghĩa.
mkdir -p /opt/schemas
cat > /opt/schemas/UserCreated.avsc
File UserCreated.avsc đã được tạo với các trường id, email, username, timestamp và version.
Đăng ký schema này vào Schema Registry dưới tên chủ đề (topic) "user-events".
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "$(cat /opt/schemas/UserCreated.avsc)"}' \
http://localhost:8081/subjects/user-events-value/versions
API trả về mã 200 và JSON chứa id schema (ví dụ: {"id": 1}) và version (1).
Verify: Lấy lại danh sách các schema đã đăng ký để xác nhận.
curl http://localhost:8081/subjects
Output trả về danh sách chứa "user-events-value", xác nhận chủ đề đã được tạo.
Cấu hình Kafka Producer tích hợp Schema Registry
Cấu hình Producer Client (Java hoặc Python) để tự động đăng ký và tra cứu schema khi gửi dữ liệu.
Producer cần biết địa chỉ Schema Registry để serialize đối tượng thành byte stream theo định dạng Avro trước khi gửi lên Kafka.
Sử dụng thư viện confluent-kafka-python (hoặc Java client tương đương) để minh họa cấu hình.
Đầu tiên, cài đặt thư viện cần thiết.
pip install confluent-kafka fastavro
Thư viện confluent-kafka và fastavro đã được cài đặt vào môi trường Python.
Viết script Python cấu hình Producer với tham số "schema.registry.url".
cat > /opt/scripts/producer_test.py
Script producer_test.py đã được tạo với cấu hình kết nối Schema Registry và Kafka.
Chạy script để gửi sự kiện mẫu vào Kafka.
python3 /opt/scripts/producer_test.py
Output hiển thị "Sự kiện gửi thành công" kèm thông tin Topic, Partition và Offset.
Verify: Kiểm tra dữ liệu trong Kafka bằng console consumer hoặc công cụ để đảm bảo dữ liệu đã được encode đúng (dạng binary Avro).
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user-events --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer --max-messages 1
Console consumer sẽ hiển thị dữ liệu dạng binary (không đọc được trực tiếp bằng mắt thường nếu không có deserializer Avro), chứng tỏ Producer đã serialize đúng theo Schema.
Xử lý phiên bản Schema và Backward Compatibility
Cấu hình chính sách tương thích (Compatibility Policy) để quản lý việc cập nhật schema mà không làm vỡ Consumer cũ.
Backward Compatibility cho phép Schema mới đọc dữ liệu cũ, nhưng không bắt buộc Schema cũ đọc được dữ liệu mới. Đây là chuẩn cho Event Sourcing để đảm bảo tính nhất quán khi replay.
Thiết lập chính sách Backward cho chủ đề "user-events-value".
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/user-events-value
API trả về JSON {"compatibility": "BACKWARD"} xác nhận chính sách đã được áp dụng.
Định nghĩa phiên bản schema mới (Version 2) để thử nghiệm tính tương thích.
Thêm trường mới "created_at" vào schema, đây là phép biến đổi Backward Compatible (Consumer cũ vẫn đọc được dữ liệu cũ vì trường này có default value).
cat > /opt/schemas/UserCreated_v2.avsc
File schema version 2 đã được tạo với trường bổ sung "created_at" có giá trị mặc định.
Đăng ký schema mới vào Schema Registry để kích hoạt kiểm tra tương thích.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "$(cat /opt/schemas/UserCreated_v2.avsc)"}' \
http://localhost:8081/subjects/user-events-value/versions
API trả về version mới (2) và id schema mới, xác nhận Schema Registry chấp nhận cập nhật vì tuân thủ Backward Compatibility.
Thử nghiệm trường hợp không tương thích: Xóa trường "id" (thành phần bắt buộc) trong schema v3 để xem Schema Registry từ chối.
cat > /opt/schemas/UserCreated_v3_invalid.avsc
File schema v3 (bị lỗi) đã được tạo, thiếu trường "id" bắt buộc.
Thử đăng ký schema v3 để xem lỗi từ Schema Registry.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "$(cat /opt/schemas/UserCreated_v3_invalid.avsc)"}' \
http://localhost:8081/subjects/user-events-value/versions
API trả về lỗi 422 Unprocessable Entity với thông báo "Incompatible schema", xác nhận cơ chế bảo vệ dữ liệu hoạt động.
Verify: Kiểm tra danh sách version của chủ đề để thấy chỉ có v1 và v2.
curl http://localhost:8081/subjects/user-events-value/versions
Output trả về mảng chứa số 1 và 2, không có số 3, xác nhận schema không tương thích đã bị từ chối.
Điều hướng series:
Mục lục: Series: Triển khai Database Event Sourcing với Apache Kafka và Ubuntu 24.04
« Phần 2: Cài đặt và cấu hình Apache Kafka trên Ubuntu
Phần 4: Xây dựng ứng dụng Producer với mô hình Event Sourcing »