Khởi động Kafka Broker và Zookeeper trong môi trường Container
Chúng ta sẽ sử dụng Docker Compose để khởi tạo cụm Kafka gồm 1 Zookeeper và 1 Kafka Broker. Đây là bước nền tảng để Materialize kết nối và đọc dữ liệu stream.
Trước tiên, hãy mở file docker-compose.yml trong thư mục dự án của bạn (thư mục đã chứa file từ Phần 2) và thêm các dịch vụ Kafka vào.
Việc sử dụng biến môi trường (Environment Variables) giúp quản lý cấu hình linh hoạt và tránh hardcode các thông số nhạy cảm như password hoặc port.
version: '3.8'
services:
# Service từ Phần 2 (giữ nguyên để Materialize hoạt động)
materialize:
image: materialize/materialized:v0.100.0
ports:
- "6875:6875"
- "6876:6876"
command:
- --listen-addr
- 0.0.0.0:6876
- --materialize-addr
- 0.0.0.0:6875
- --data-directory
- /data/materialized
volumes:
- mz-data:/data/materialized
environment:
- MZ_HOST=materialize
- MZ_PORT=6876
- MZ_KAFKA_BROKER=kafka:9092
depends_on:
- postgres
- kafka
postgres:
image: postgres:16
environment:
- POSTGRES_USER=materialize
- POSTGRES_PASSWORD=materialize
volumes:
- pg-data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U materialize"]
interval: 5s
timeout: 5s
retries: 5
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
depends_on:
- zookeeper
volumes:
mz-data:
pg-data:
Kết quả mong đợi: File docker-compose.yml đã được cập nhật với 2 service mới là zookeeper và kafka. Service materialize đã được cấu hình để biết địa chỉ broker Kafka.
Bây giờ, hãy thực thi lệnh khởi động lại toàn bộ stack để áp dụng các thay đổi.
docker compose up -d
Kiểm tra trạng thái của các container để đảm bảo cả Zookeeper và Kafka đều đang chạy (healthy).
docker compose ps
Kết quả mong đợi: Bạn sẽ thấy danh sách các container với trạng thái Up hoặc Healthy. Đặc biệt chú ý service kafka phải đã khởi động xong để các bước sau hoạt động.
Để chắc chắn Kafka đang lắng nghe đúng port, hãy kiểm tra log của Kafka Broker.
docker compose logs kafka --tail 20
Kết quả mong đợi: Trong log xuất hiện dòng [KafkaServer id=1] started và không có lỗi ERROR liên quan đến kết nối Zookeeper.
Cài đặt và cấu hình Confluent Schema Registry
Materialize cần Schema Registry để hiểu cấu trúc dữ liệu Avro của Kafka. Chúng ta sẽ thêm một container Schema Registry vào Docker Compose.
Schema Registry đóng vai trò là nơi lưu trữ và quản lý các định dạng schema (Avro, JSON Schema, Protobuf) cho các topic Kafka.
Chỉnh sửa lại file docker-compose.yml để thêm service schema-registry.
version: '3.8'
services:
materialize:
image: materialize/materialized:v0.100.0
ports:
- "6875:6875"
- "6876:6876"
command:
- --listen-addr
- 0.0.0.0:6876
- --materialize-addr
- 0.0.0.0:6875
- --data-directory
- /data/materialized
volumes:
- mz-data:/data/materialized
environment:
- MZ_HOST=materialize
- MZ_PORT=6876
- MZ_KAFKA_BROKER=kafka:9092
depends_on:
- postgres
- kafka
- schema-registry
postgres:
image: postgres:16
environment:
- POSTGRES_USER=materialize
- POSTGRES_PASSWORD=materialize
volumes:
- pg-data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U materialize"]
interval: 5s
timeout: 5s
retries: 5
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
depends_on:
- zookeeper
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
hostname: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
depends_on:
- kafka
volumes:
mz-data:
pg-data:
Kết quả mong đợi: File cấu hình đã bao gồm thêm service schema-registry lắng nghe ở port 8081 và kết nối đến Kafka Store qua Zookeeper.
Khởi động lại toàn bộ stack để áp dụng service Schema Registry mới.
docker compose up -d
Đợi khoảng 10-15 giây để Schema Registry khởi động xong, sau đó kiểm tra trạng thái.
docker compose ps schema-registry
Kết quả mong đợi: Container schema-registry hiển thị trạng thái Up.
Verify bằng cách gọi API Health Check của Schema Registry từ terminal.
curl http://localhost:8081/
Kết quả mong đợi: Nhận lại JSON phản hồi chứa thông tin version và status UP (ví dụ: {"version":"7.6.0","status":"UP"}).
Tạo các Topic Kafka mẫu với Schema Avro
Bây giờ chúng ta cần tạo các Topic thực tế và đăng ký Schema Avro tương ứng. Đây là bước bắt buộc để Materialize có thể tạo Source từ Kafka.
Chúng ta sẽ tạo 2 topic: user-signups (dữ liệu đăng ký người dùng) và page-views (dữ liệu xem trang).
Trước tiên, hãy tạo file chứa Schema Avro cho topic user-signups. Tạo file tại đường dẫn ./schemas/user-signups-value.avsc.
mkdir -p schemas
cat > schemas/user-signups-value.avsc
Kết quả mong đợi: File schemas/user-signups-value.avsc được tạo thành công với định nghĩa schema Avro chuẩn.
Đăng ký Schema này vào Schema Registry. Lưu ý, key trong Schema Registry thường là tên topic + "-value".
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "$(cat schemas/user-signups-value.avsc)"}' \
http://localhost:8081/subjects/user-signups-value/versions
Kết quả mong đợi: Nhận lại JSON chứa "id": 1 (hoặc số lớn hơn nếu có schema khác) xác nhận schema đã được lưu.
Tiếp theo, tạo file Schema cho topic page-views tại ./schemas/page-views-value.avsc.
cat > schemas/page-views-value.avsc
Đăng ký Schema page-views vào Schema Registry.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "$(cat schemas/page-views-value.avsc)"}' \
http://localhost:8081/subjects/page-views-value/versions
Kết quả mong đợi: Nhận lại JSON chứa "id" mới (thường là 2) xác nhận schema thứ 2 đã được lưu.
Bây giờ, sử dụng công cụ kafka-topics có sẵn trong container Kafka để tạo các topic vật lý.
docker compose exec kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic user-signups \
--partitions 3 \
--replication-factor 1
Kết quả mong đợi: Xuất hiện dòng Topic 'user-signups' created.
Tạo topic thứ hai cho page-views.
docker compose exec kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic page-views \
--partitions 3 \
--replication-factor 1
Kết quả mong đợi: Xuất hiện dòng Topic 'page-views' created.
Verify cuối cùng bằng cách liệt kê toàn bộ topic và kiểm tra cấu hình của chúng.
docker compose exec kafka kafka-topics --list --bootstrap-server localhost:9092
Kết quả mong đợi: Danh sách hiển thị 2 topic user-signups và page-views cùng các topic hệ thống của Kafka (như __consumer_offsets).
Kiểm tra lại Schema Registry để đảm bảo cả 2 schema đã được đăng ký.
curl http://localhost:8081/subjects
Kết quả mong đợi: Trả về mảng JSON chứa "user-signups-value" và "page-views-value".
Điều hướng series:
Mục lục: Series: Triển khai Database Stream-native với Materialize trên Ubuntu 24.04
« Phần 2: Khởi tạo cụm Materialize với Docker Compose và PostgreSQL
Phần 4: Xây dựng các Source và Sink đầu tiên với SQL »