Cấu hình Prometheus và Grafana để giám sát Flink Metrics
Flink cung cấp các metrics chi tiết về throughput, latency, và trạng thái của job. Để trực quan hóa, ta cần thu thập metrics qua JMX Exporter và đẩy về Prometheus.
1. Kích hoạt JMX Exporter trong Flink
Sửa file cấu hình flink-conf.yaml trong thư mục $FLINK_HOME/conf để bật JMX metrics và định nghĩa port.
Mục đích: Cho phép Prometheus scrape dữ liệu từ Flink JobManager và TaskManager thông qua HTTP endpoint.
Kết quả mong đợi: Flink sẽ expose metrics tại port 8081 (JobManager) và 8082 (TaskManager).
cat >> /opt/flink/conf/flink-conf.yaml
Khởi động lại Flink cluster để áp dụng cấu hình.
systemctl restart flink-jobmanager
systemctl restart flink-taskmanager
2. Cài đặt và cấu hình Prometheus
Tạo file cấu hình prometheus.yml tại /etc/prometheus/prometheus.yml để scrape metrics từ Flink.
Mục đích: Prometheus sẽ định kỳ gọi endpoint metrics của Flink và lưu trữ dữ liệu time-series.
Kết quả mong đợi: Tab "Targets" trong Prometheus UI (port 9090) hiển thị trạng thái "UP" cho JobManager và TaskManager.
cat > /etc/prometheus/prometheus.yml
Khởi động Prometheus và kiểm tra kết quả.
systemctl start prometheus
curl http://localhost:9090/api/v1/targets | jq '.data.activeTargets[].health'
3. Cấu hình Grafana để hiển thị Dashboard
Cài đặt Grafana và import dashboard mẫu của Flink (ID: 12345 - Flink Metrics Dashboard).
Mục đích: Trực quan hóa các chỉ số như Records/Sec, Checkpoint Duration, Backpressure.
Kết quả mong đợi: Dashboard hiển thị các biểu đồ real-time về hiệu năng của job đang chạy.
apt update && apt install -y grafana
systemctl enable grafana-server && systemctl start grafana-server
# Lưu ý: Truy cập http://localhost:3000 (admin/admin) -> Data Sources -> Add Prometheus -> URL: http://localhost:9090
# Sau đó vào Dashboards -> Import -> nhập ID: 12345
Phân tích Log Flink (JobManager và TaskManager)
Log là nguồn thông tin quan trọng nhất khi sự cố xảy ra. Flink viết log vào thư mục $FLINK_HOME/log.
1. Cấu hình Log4j để phân biệt mức độ nghiêm trọng
Sửa file log4j.properties tại $FLINK_HOME/conf để điều chỉnh mức log từ INFO xuống WARN/ERROR khi chạy production.
Mục đích: Giảm nhiễu log, tập trung vào các lỗi thực sự và giảm tải I/O cho disk.
Kết quả mong đợi: File log chỉ chứa các dòng WARN, ERROR, FATAL.
cat > /opt/flink/conf/log4j.properties
2. Phân tích flink-jobmanager.log
JobManager log chứa thông tin về lifecycle của job, phân phối task, và lỗi hệ thống cấp cao.
Mục đích: Xác định nguyên nhân job bị kill, lỗi HA, hoặc vấn đề về scheduling.
Kết quả mong đợi: Tìm được các dòng "Job terminated", "JobManager failed", hoặc "Checkpoint failed".
# Tìm các lỗi nghiêm trọng trong 1 giờ gần nhất
grep -i "error\|fatal\|exception" /opt/flink/log/flink-jobmanager.log | tail -n 50
# Tìm nguyên nhân job bị fail
grep -B 5 -A 5 "Job terminated" /opt/flink/log/flink-jobmanager.log
3. Phân tích flink-taskmanager.log
TaskManager log chứa chi tiết về xử lý từng subtask, lỗi OOM (Out Of Memory), và lỗi serialization.
Mục đích: Debug các lỗi xảy ra trong quá trình xử lý dữ liệu (transformation, sink).
Kết quả mong đợi: Phát hiện các lỗi "Java heap space", "SerializationException", hoặc "NetworkTimeout".
# Tìm lỗi OOM (Out of Memory)
grep -i "java.lang.OutOfMemoryError" /opt/flink/log/flink-taskmanager.log
# Tìm lỗi Serialization
grep -i "SerializationException" /opt/flink/log/flink-taskmanager.log
Xử lý lỗi Common: Memory Leak, Serialization Error, Network Timeout
1. Xử lý lỗi Out Of Memory (OOM)
Lỗi "Java heap space" thường xảy ra khi buffer hoặc state quá lớn so với memory được cấp.
Mục đích: Điều chỉnh memory profile hoặc tối ưu code để tránh OOM.
Kết quả mong đợi: Job chạy ổn định, không bị TaskManager bị kill do OOM.
# Tăng TaskManager memory (cấu hình trong flink-conf.yaml)
cat >> /opt/flink/conf/flink-conf.yaml
Mẹo: Nếu dùng State Backend RocksDB, tăng rocksdb.memory.managed thay vì tăng heap.
2. Xử lý lỗi Serialization Error
Lỗi này xảy ra khi object không thể được serialize để gửi qua mạng hoặc lưu vào state. Thường do object chứa Reference không serializable (ví dụ: Thread, Socket).
Mục đích: Đảm bảo tất cả object trong pipeline đều implement Serializable hoặc dùng Instantiator.
Kết quả mong đợi: Không còn lỗi "Cannot serialize object".
# Ví dụ code Java cần sửa:
# class MyData implements Serializable { ... }
# class MyRichFunction implements RichFunction {
# public MyData data; // Phải đảm bảo MyData serializable
# public void open() { data = new MyData(); } // Khởi tạo ở open() để tránh serialize instance
# }
# Sau khi fix code, rebuild jar và submit lại
flink run -c com.example.MyJob target/my-job.jar
3. Xử lý lỗi Network Timeout
Lỗi này thường do mạng chậm hoặc task bị "stuck" quá lâu chờ dữ liệu.
Mục đích: Tăng thời gian timeout hoặc giảm parallelism nếu network bottleneck.
Kết quả mong đợi: Task không bị kill do timeout khi mạng yếu.
# Tăng timeout cho task execution (đơn vị ms)
cat >> /opt/flink/conf/flink-conf.yaml
Tối ưu hóa hiệu năng: Parallelism, Batch Size, Checkpointing
1. Điều chỉnh Parallelism
Parallelism xác định số lượng subtask chạy song song cho mỗi operator.
Mục đích: Tận dụng tối đa CPU cores của cluster.
Kết quả mong đợi: Throughput tăng, CPU usage cân bằng giữa các TaskManager.
# Đặt parallelism toàn cục
flink run -c com.example.MyJob -p 4 target/my-job.jar
# Hoặc đặt riêng cho operator trong code Java:
# .setParallelism(4)
Lưu ý: Parallelism không được lớn hơn tổng số slot của TaskManager.
2. Tối ưu Batch Size cho Source/Sink
Khi đọc từ Kafka hoặc viết ra S3, việc điều chỉnh batch size giúp giảm overhead I/O.
Mục đích: Giảm số lần I/O call, tăng throughput.
Kết quả mong đợi: Giảm CPU usage cho I/O, tăng throughput records/s.
# Ví dụ cấu hình Kafka Source trong code Java:
# KafkaSource.builder()
# .setBatchSize(1024) // Tăng batch size
# .setBatchSizeMax(2048)
# .build();
# Ví dụ Sink (HDFS/S3):
# .setBatchSize(10000)
3. Tối ưu Checkpointing
Checkpoint quá thường xuyên gây overhead, quá thưa thớt gây mất dữ liệu lớn khi fail.
Mục đích: Cân bằng giữa Consistency và Performance.
Kết quả mong đợi: Checkpoint hoàn thành nhanh hơn thời gian cho phép, không gây backpressure.
# Cấu hình trong flink-conf.yaml
cat >> /opt/flink/conf/flink-conf.yaml
Chiến lược Scaling Cluster và Backup Checkpoint
1. Scaling Cluster (Thêm TaskManager)
Khi tải tăng, cần thêm TaskManager vào cluster.
Mục đích: Tăng tổng số slot để mở rộng parallelism hoặc xử lý thêm data.
Kết quả mong đợi: Job tự động phân phối lại task sang TaskManager mới.
# Sao chép cấu hình TaskManager sang node mới (ví dụ node2)
cp -r /opt/flink /opt/flink-node2
# Chỉnh sửa host trong flink-conf.yaml của node2
sed -i 's/taskmanager.rpc.address: 127.0.0.1/taskmanager.rpc.address: /' /opt/flink-node2/conf/flink-conf.yaml
# Khởi động service
systemctl start flink-taskmanager
# Kiểm tra số slot trong JobManager UI hoặc command:
flink list -d
Lưu ý: Parallelism của job đang chạy sẽ tự động scale nếu cấu hình dynamic-scaling được bật (Flink 1.15+).
2. Backup Checkpoint (S3/HDFS)
Luôn lưu checkpoint vào storage bền vững (S3, HDFS) thay vì LocalFS.
Mục đích: Đảm bảo có thể khôi phục (restore) job từ checkpoint cũ khi cluster bị sự cố hoàn toàn.
Kết quả mong đợi: Checkpoint được lưu an toàn, có thể dùng để rollback hoặc migrate job.
# Cấu hình checkpoint storage trong flink-conf.yaml
cat >> /opt/flink/conf/flink-conf.yaml
Khôi phục job từ savepoint:
flink run -s s3://my-bucket/flink-savepoints/my-backup -c com.example.MyJob target/my-job.jar
Điều hướng series:
Mục lục: Series: Triển khai Database Streaming với Apache Flink và Ubuntu 24.04
« Phần 7: Cấu hình High Availability (HA) cho Flink Cluster