Xử lý các lỗi thường gặp khi kết nối Kafka và Schema Registry
Khi triển khai Materialize trong môi trường production, lỗi kết nối đến Kafka Broker hoặc Schema Registry là nguyên nhân phổ biến nhất gây gián đoạn luồng dữ liệu.
Vấn đề thường gặp nhất là lỗi Authentication Failed hoặc SSL Handshake Failed do cấu hình SASL/SSL chưa đồng bộ giữa Source và Broker.
Bước 1: Kiểm tra cấu hình kết nối và chứng chỉ SSL
Kiểm tra lại file cấu hình môi trường của Materialize để đảm bảo các biến môi trường kết nối Kafka được khai báo chính xác.
Đường dẫn file: /opt/materialize/docker-compose.env (hoặc file .env tương ứng trong project của bạn).
Để xem lại cấu hình hiện tại, sử dụng lệnh:
cat /opt/materialize/docker-compose.env | grep -E "KAFKA|SCHEMA_REGISTRY"
Kết quả mong đợi: Hiển thị các biến như KAFKA_BROKERS, SCHEMA_REGISTRY_URL, KAFKA_SASL_USERNAME, KAFKA_SASL_PASSWORD.
Bước 2: Sửa lỗi Authentication Failed (SASL PLAIN)
Lỗi này xảy ra khi Materialize gửi username/password sai hoặc Broker không cấu hình cơ chế xác thực tương ứng.
Chỉnh sửa file cấu hình Docker Compose tại /opt/materialize/docker-compose.yml để thêm cấu hình SASL vào service materialized.
Nội dung hoàn chỉnh của phần service materialized trong file /opt/materialize/docker-compose.yml:
materialized:
image: materialize/materialized:latest
command:
- "--listen-addr=0.0.0.0:6875"
- "--storage-addrs=materialized:6873"
- "--storage-addrs=materialized:6874"
- "--storage-addrs=materialized:6875"
- "--internal-addr=0.0.0.0:6876"
- "--dataflow-addrs=materialized:6873"
- "--dataflow-addrs=materialized:6874"
- "--dataflow-addrs=materialized:6875"
- "--http-addr=0.0.0.0:6877"
- "--http-addr=0.0.0.0:6878"
- "--http-addr=0.0.0.0:6879"
environment:
- MZ_KAFKA_BROKER=brokers:9092
- MZ_KAFKA_SASL_MECHANISM=PLAIN
- MZ_KAFKA_SASL_USERNAME=${KAFKA_SASL_USERNAME}
- MZ_KAFKA_SASL_PASSWORD=${KAFKA_SASL_PASSWORD}
- MZ_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- MZ_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO
- MZ_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=${SCHEMA_REGISTRY_USER}:${SCHEMA_REGISTRY_PASSWORD}
ports:
- "6875:6875"
- "6877:6877"
depends_on:
- brokers
- schema-registry
networks:
- materialize-net
Kết quả mong đợi: Dịch vụ Materialize khởi động lại và không còn báo lỗi authentication trong log.
Bước 3: Xác minh lỗi kết nối bằng CLI
Sử dụng lệnh DESCRIBE SOURCE trong SQL để kiểm tra trạng thái của Source đang gặp lỗi.
Truy cập vào CLI của Materialize:
docker exec -it materialized sql -c "DESCRIBE SOURCE kafka_source;"
Kết quả mong đợi: Nếu có lỗi, phần ERROR sẽ hiển thị chi tiết nguyên nhân (ví dụ: AuthenticationFailed, BrokerConnectionError). Nếu thành công, sẽ hiển thị schema và các thuộc tính nguồn.
Bước 4: Xử lý lỗi Schema Registry (Compatibility)
Lỗi thường gặp khi Schema mới không tương thích với Schema cũ (ví dụ: xóa trường hoặc thay đổi kiểu dữ liệu không cho phép).
Thực hiện kiểm tra tương thích schema trước khi đăng ký mới.
curl -X POST http://schema-registry:8081/compatibility \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-H "Authorization: Basic $(echo -n 'user:pass' | base64)" \
-d '{
"topic": "users",
"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"}]}"
}'
Kết quả mong đợi: Trả về {"version": 2} nếu tương thích, hoặc HTTP 409 Conflict nếu vi phạm quy tắc BACKWARD hoặc FORWARD.
Tối ưu hóa cấu hình phân vùng (Partitioning) cho Source và Sink
Trong môi trường production, việc phân bổ dữ liệu không đều giữa các phân vùng (partitions) của Kafka có thể gây ra hiện tượng Hot Partition, làm giảm hiệu suất xử lý của Materialize.
Materialize tự động map các partition của Kafka vào các worker nodes. Tối ưu hóa phân vùng giúp cân bằng tải (load balancing) và tăng thông lượng (throughput).
Bước 1: Phân tích độ lệch dữ liệu (Data Skew)
Trước khi tối ưu, cần kiểm tra kích thước của từng partition trong Kafka Topic đang được sử dụng.
Sử dụng công cụ kafka-topics.sh trên container Kafka Broker để liệt kê thông tin partition.
docker exec brokers kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic users --partitions
Kết quả mong đợi: Danh sách các partition kèm theo kích thước (Size) và offset. Nếu một partition có kích thước lớn gấp 10 lần các partition khác, đó là dấu hiệu của Data Skew.
Bước 2: Điều chỉnh hàm Hashing cho Source
Để giảm thiểu skew, hãy đảm bảo khi tạo Source, bạn sử dụng KEY để Materialize có thể phân phối dữ liệu đều hơn dựa trên hash của key.
Tạo lại Source với tùy chọn KEY rõ ràng thay vì dùng toàn bộ message.
CREATE SOURCE users_optimized
FROM KAFKA BROKER 'brokers:9092' TOPIC 'users'
KEY FORMAT JSON
VALUE FORMAT JSON
ENVELOPE DEBEZIUM;
Kết quả mong đợi: Materialize sẽ sử dụng key trong message để hash và phân bổ workload cho các worker, tránh tình trạng một worker bị quá tải.
Bước 3: Cấu hình Sink với số lượng phân vùng động
Khi tạo Sink, Materialize mặc định sẽ tạo số lượng partition tương ứng với số lượng worker. Tuy nhiên, trong môi trường lớn, bạn có thể muốn cố định số lượng partition để kiểm soát.
Sử dụng tùy chọn KEY và FORMAT trong lệnh CREATE SINK.
CREATE SINK users_sink
FROM users_optimized
INTO KAFKA BROKER 'brokers:9092' TOPIC 'users_processed'
KEY (id)
FORMAT JSON
WITH (
PARTITIONS = 12,
REPLICAS = 3
);
Kết quả mong đợi: Topic users_processed được tạo với đúng 12 partitions, đảm bảo dữ liệu được phân tán đều và khả năng mở rộng cao.
Bước 4: Verify hiệu năng sau tối ưu
Giám sát lại trạng thái của Source và Sink để đảm bảo không còn cảnh báo về Backpressure hoặc High Latency.
docker exec -it materialized sql -c "SELECT * FROM mz_sources WHERE name = 'users_optimized';"
Kết quả mong đợi: Cột state hiển thị active và các chỉ số ingest_rate ổn định, không có cảnh báo lỗi.
Chiến lược Backup, Restore và Disaster Recovery
Materialize lưu trữ trạng thái (state) và dữ liệu trong hệ thống file hệ thống (persistent storage). Việc backup cần bao gồm cả dữ liệu metadata (PostgreSQL) và dữ liệu state (Persistent Volume).
Chiến lược recovery được chia thành hai cấp độ: Recovery metadata (cấu hình) và Recovery state (dữ liệu thời gian thực).
Bước 1: Backup Metadata (Cấu hình và Schema)
Sử dụng PostgreSQL để dump toàn bộ schema và các đối tượng SQL (Source, Sink, View, Materialized View).
Thực hiện lệnh dump trực tiếp từ container PostgreSQL.
docker exec -t postgres pg_dump -U materialize -d materialize > /opt/materialize/backup/metadata_backup_$(date +%F).sql
Kết quả mong đợi: File SQL được tạo tại thư mục /opt/materialize/backup/ chứa toàn bộ định nghĩa của các object.
Bước 2: Backup Persistent State (Data)
Dữ liệu trạng thái của Materialize (Computation state) được lưu trong thư mục /var/lib/materialize/storage trên các node storage.
Sử dụng lệnh tar để nén toàn bộ thư mục dữ liệu.
docker exec materialized tar -czf /opt/materialize/backup/state_backup_$(date +%F).tar.gz /var/lib/materialize/storage
Kết quả mong đợi: File tar.gz chứa toàn bộ dữ liệu state của cụm.
Bước 3: Tự động hóa Backup với Cron Job
Để đảm bảo backup diễn ra định kỳ, hãy cấu hình cron job trên host Ubuntu.
Chỉnh sửa crontab:
crontab -e
Thêm dòng sau vào cuối file để backup hàng ngày lúc 2 AM:
0 2 * * * /opt/materialize/scripts/backup_script.sh >> /var/log/materialize_backup.log 2>&1
Kết quả mong đợi: Hệ thống tự động tạo file backup mới mỗi ngày và ghi log vào file nhật ký.
Bước 4: Khôi phục (Restore) từ Backup
Khi xảy ra sự cố, quy trình khôi phục bao gồm 3 bước: Khởi tạo môi trường, Restore State, Restore Metadata.
Bước 1: Restore dữ liệu state vào container mới.
docker exec materialized tar -xzf /opt/materialize/backup/state_backup_2023-10-27.tar.gz -C /var/lib/materialize/
Kết quả mong đợi: Thư mục /var/lib/materialize/storage được khôi phục đầy đủ.
Bước 2: Restore metadata (SQL) vào PostgreSQL.
docker exec -i postgres psql -U materialize -d materialize -f /opt/materialize/backup/metadata_backup_2023-10-27.sql
Kết quả mong đợi: Các Source, Sink, và View được tạo lại trong hệ thống. Materialize sẽ bắt đầu re-process dữ liệu từ state đã khôi phục.
Bước 5: Verify sau khi Restore
Đảm bảo dữ liệu đã được phục hồi đầy đủ và không bị mất mát.
docker exec -it materialized sql -c "SELECT COUNT(*) FROM users_optimized;"
Kết quả mong đợi: Số lượng dòng dữ liệu trả về khớp với số lượng dòng trước khi sự cố xảy ra.
Điều hướng series:
Mục lục: Series: Triển khai Database Stream-native với Materialize trên Ubuntu 24.04
« Phần 7: Giám sát hiệu năng và phân tích lỗi với CLI và Dashboard