1. Cấu hình Snapshot cho State của Flink và ClickHouse
Flink sử dụng cơ chế checkpoint để lưu trạng thái tính toán (state) vào hệ thống lưu trữ phân tán (như S3 hoặc MinIO) nhằm phục hồi khi job bị lỗi. ClickHouse cần cấu hình backup thủ công hoặc tự động để đảm bảo dữ liệu lịch sử không bị mất.
Cấu hình Checkpoint cho Flink
Để Flink lưu state vào object storage, bạn cần sửa file `flink-conf.yaml` trong container JobManager. Đảm bảo bạn đã mount volume chứa S3/MinIO credentials vào container.
Đường dẫn file: /opt/flink/conf/flink-conf.yaml
Nội dung cấu hình:
state.backend: rocksdb
state.checkpoints-dir: s3://flink-checkpoints/cluster-01
state.savepoints-dir: s3://flink-savepoints/cluster-01
state.checkpointing.interval: 60000
state.checkpointing.mode: EXACTLY_ONCE
state.checkpointing.tolerable-failed-checkpoints: 3
state.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.infinite-state-retention: false
Kết quả mong đợi: Khi job chạy, bạn sẽ thấy các checkpoint được tạo tự động mỗi 60 giây trên bucket S3 đã chỉ định. Mode EXACTLY_ONCE đảm bảo dữ liệu không bị xử lý trùng lặp khi recovery.
Cấu hình Backup cho ClickHouse
ClickHouse sử dụng công cụ `clickhouse-backup` để tạo snapshot toàn bộ dữ liệu vào S3. Bạn cần cấu hình file `config.d/backup.xml` hoặc file riêng biệt.
Đường dẫn file: /etc/clickhouse-server/config.d/backup.xml
Nội dung cấu hình:
s3
https://s3.amazonaws.com
clickhouse-backups
cluster-01/
ap-southeast-1
YOUR_AWS_ACCESS_KEY
YOUR_AWS_SECRET_KEY
3600
7
zstd
Kết quả mong đợi: Khi chạy lệnh backup, dữ liệu sẽ được nén và đẩy lên S3. ClickHouse sẽ tự động xóa các backup cũ hơn 7 ngày để tiết kiệm chi phí.
Thực hiện Backup thủ công để kiểm tra
Chạy lệnh backup ngay lập tức để tạo snapshot đầu tiên trước khi triển khai DR.
clickhouse-backup create --name daily-backup-$(date +%F) --compress=zstd
Kết quả mong đợi: Lệnh trả về thông báo "Backup created successfully" và bạn thấy file mới trên S3 bucket.
2. Triển khai Kafka MirrorMaker 2 để Replication giữa các Region
Khi xảy ra sự cố mất toàn bộ một vùng (region), bạn cần một cơ chế sao chép dữ liệu Kafka từ Region A sang Region B. MirrorMaker 2 (MM2) là công cụ tiêu chuẩn của Apache Kafka để làm việc này.
Deploy MirrorMaker 2 trên Kubernetes
Sử dụng Strimzi Operator để deploy MM2. File YAML dưới đây định nghĩa 2 connector: một để replicate từ Source (Region A) sang Target (Region B) và ngược lại (optional, để DR hai chiều).
Đường dẫn file: mm2-config.yaml
Nội dung file:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: cross-region-mm2
namespace: production
spec:
replicas: 2
version: 3.6.0
image: strimzi/kafka-mirror-maker:3.6.0-kafka-3.6.0
bootstrapServers: source-cluster-kafka-bootstrap:9092
consumer:
replicas: 2
resources:
requests:
cpu: 500m
memory: 512Mi
producer:
replicas: 2
resources:
requests:
cpu: 500m
memory: 512Mi
config:
group.id: cross-region-group
replication.factor: 3
consume.topic.blacklist: "__.*"
replication.policy: org.apache.kafka.connect.mirror.DefaultTopicReplicationPolicy
replication.max.message.latency.ms: 300000
connectors:
- name: topic-sync
config:
class: org.apache.kafka.connect.mirror.MirrorSourceConnector
topics: "sensor-data,clickhouse-events"
target.cluster: target-cluster-kafka-bootstrap:9092
source.cluster.alias: source-cluster
target.cluster.alias: target-cluster
sync.group.id: sync-group-01
mirror.checkpoint.interval.ms: 60000
sync.topic.replication.factor: 3
Kết quả mong đợi: Strimzi sẽ tạo 2 pod cho MM2. Connector sẽ bắt đầu sao chép topic "sensor-data" và "clickhouse-events" sang cluster đích với độ trễ dưới 300ms.
Verify Replication Lag
Kiểm tra độ trễ giữa 2 cluster để đảm bảo dữ liệu được sync kịp thời.
kafka-consumer-groups.sh --bootstrap-server target-cluster-kafka-bootstrap:9092 --describe --group sync-group-01 --topic sensor-data
Kết quả mong đợi: Trường "Lag" phải ở mức thấp (gần 0) nếu load bình thường. Nếu lag tăng cao, cần tăng số lượng consumer/producer trong spec.
3. Kịch bản tự động phục hồi (Recovery) sau sự cố mất Node
Kịch bản này giả định bạn mất toàn bộ node chứa JobManager của Flink hoặc Pod ClickHouse. Hệ thống cần tự động khởi động lại và restore từ checkpoint/backup gần nhất.
Script Recovery cho Flink
Tạo script shell để tự động tìm savepoint mới nhất và submit job mới với state đó. Script này có thể chạy trong CronJob hoặc Kubernetes Job khi phát hiện sự cố.
Đường dẫn file: /opt/scripts/recover-flink-job.sh
Nội dung script:
#!/bin/bash
set -e
BUCKET="flink-savepoints"
CLUSTER_ID="cluster-01"
JOB_NAME="realtime-analytics-job"
JAR_PATH="/opt/flink/lib/analytics-job.jar"
# Tìm savepoint mới nhất
LATEST_SAVEPOINT=$(aws s3 ls s3://${BUCKET}/${CLUSTER_ID}/ --recursive | grep -E '\.savepoint$' | sort -r | head -1 | awk '{print $4}')
if [ -z "$LATEST_SAVEPOINT" ]; then
echo "ERROR: No savepoint found in s3://${BUCKET}/${CLUSTER_ID}/"
exit 1
fi
echo "Found latest savepoint: $LATEST_SAVEPOINT"
# Submit job với savepoint
flink run \
--fromSavepoint $LATEST_SAVEPOINT \
--detached \
--jobname "$JOB_NAME" \
$JAR_PATH
echo "Job $JOB_NAME recovered from $LATEST_SAVEPOINT"
Kết quả mong đợi: JobManager mới sẽ khởi động, tải state từ S3 và tiếp tục xử lý dữ liệu từ thời điểm checkpoint cuối cùng mà không bị mất dữ liệu (exactly-once).
Script Recovery cho ClickHouse
Script này dùng khi bạn mất cả Pod ClickHouse hoặc cần restore toàn bộ DB từ backup.
Đường dẫn file: /opt/scripts/recover-clickhouse.sh
Nội dung script:
#!/bin/bash
set -e
BACKUP_NAME="daily-backup-2023-10-27" # Hoặc biến động lấy backup mới nhất
CLICKHOUSE_HOST="clickhouse-server"
CLICKHOUSE_USER="admin"
CLICKHOUSE_PASSWORD="secure_password"
echo "Starting ClickHouse recovery from backup: $BACKUP_NAME"
# Restore toàn bộ backup
clickhouse-client --host=$CLICKHOUSE_HOST --user=$CLICKHOUSE_USER --password=$CLICKHOUSE_PASSWORD \
--query "SYSTEM RESTORE FROM BACKUP '$BACKUP_NAME'"
# Wait for completion
sleep 10
echo "ClickHouse restored successfully"
Kết quả mong đợi: ClickHouse sẽ khôi phục toàn bộ database, bảng và dữ liệu từ snapshot S3. Các query trên bảng sẽ hoạt động lại bình thường.
4. Kiểm tra tính toàn vẹn dữ liệu sau khi Restore
Sau khi recovery, việc quan trọng nhất là xác nhận dữ liệu không bị hỏng, không bị mất record và tính nhất quán giữa các hệ thống.
Verify Flink State Consistency
So sánh số lượng record đã xử lý (processed count) trong state của Flink với số lượng record trong Kafka sau khi restore.
curl -s http://jobmanager:8081/jobs//metrics | grep -i "numRecordsIn" | head -5
Kết quả mong đợi: Giá trị metrics phải khớp hoặc lớn hơn so với offset cuối cùng của Kafka partition trước khi sự cố xảy ra.
Verify ClickHouse Data Integrity
Thực hiện kiểm tra checksum hoặc đếm số dòng dữ liệu trong bảng quan trọng để đảm bảo không có record bị mất trong quá trình restore.
clickhouse-client --query "SELECT count() FROM analytics_events WHERE event_time >= '2023-10-27 00:00:00'"
Kết quả mong đợi: Kết quả trả về phải bằng với số lượng record dự kiến (có thể so sánh với log audit hoặc số lượng message trong Kafka). Nếu số lượng thấp hơn, có thể có lỗi trong quá trình restore.
Chạy Test Query để kiểm tra tính khả dụng
Thực hiện một truy vấn phức tạp (aggregation, windowing) để đảm bảo index và partition của ClickHouse hoạt động đúng sau restore.
clickhouse-client --query "SELECT toStartOfHour(event_time) as hour, count() as cnt FROM analytics_events GROUP BY hour ORDER BY hour DESC LIMIT 10"
Kết quả mong đợi: Query chạy nhanh, trả về kết quả chính xác và không bị lỗi "Table doesn't exist" hay "Data part error".
Điều hướng series:
Mục lục: Series: Series: Xây dựng hệ thống Real-time Analytics và Stream Processing với Apache Kafka, Flink và ClickHouse trên Kubernetes
« Phần 10: Tối ưu hóa hiệu năng và chi phí cho hệ thống production
Phần 12: Troubleshooting nâng cao: Xử lý sự cố thường gặp và Best Practices »