Cấu hình Checkpointing và Savepoint cho Exactly-Once Semantics
Trong môi trường sản xuất, việc đảm bảo dữ liệu không bị mất (at-least-once) và không bị xử lý trùng (exactly-once) là yếu tố sống còn. Flink sử dụng cơ chế Checkpoint dựa trên Chandy-Lamport algorithm để tạo snapshot trạng thái của job tại một thời điểm nhất định.
Bạn cần cấu hình thời gian và cơ chế lưu trữ checkpoint trong file flink-conf.yaml để kích hoạt tính năng này. Nếu không cấu hình, job sẽ chạy ở chế độ "none" và mất dữ liệu khi gặp lỗi.
Truy cập vào file cấu hình chính của Flink (thường nằm ở /opt/flink/conf/flink-conf.yaml trên Ubuntu 24.04).
Thêm hoặc sửa các dòng sau vào cuối file để kích hoạt checkpoint với khoảng thời gian 60 giây và lưu trữ vào HDFS hoặc LocalFileSystem tùy theo hạ tầng:
state.backend: rocksdb
state.checkpoints.dir: file:///opt/flink/checkpoints
state.savepoints.dir: file:///opt/flink/savepoints
state.checkpoints.num-retained: 3
state.checkpoints.min-pause: 60s
state.checkpoints.timeout: 10m
state.checkpoints.unaligned: true
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 60s
Kết quả mong đợi: Flink sẽ tự động tạo snapshot trạng thái mỗi 60 giây. Nếu job crash, Flink sẽ tự động recover từ checkpoint gần nhất mà không mất dữ liệu giữa các lần checkpoint.
Để verify, hãy chạy job và truy cập vào Flink Dashboard (thường ở port 8081). Vào tab "Checkpoints" của job đang chạy. Bạn sẽ thấy trạng thái "SUCCESS" với timestamp cập nhật đều đặn.
Tạo Savepoint thủ công trước khi upgrade
Khi cần restart cluster, nâng cấp phiên bản Flink hoặc thay đổi logic job, bạn phải tạo Savepoint thủ công. Khác với checkpoint, savepoint là snapshot được yêu cầu bởi người dùng và giữ lại vĩnh viễn.
Sử dụng command line để tạo savepoint cho job đang chạy. Thay thế JOB_ID bằng ID thực tế của job (lấy từ Dashboard hoặc command flink list).
/opt/flink/bin/flink savepoint -t 5m file:///opt/flink/savepoints/my-savepoint-2024
Kết quả mong đợi: Flink sẽ dừng job ở điểm đồng bộ hóa, lưu snapshot vào đường dẫn chỉ định và trả về một đường dẫn URL (URI) của savepoint. Job sẽ ở trạng thái "FINISHED" hoặc "SUSPENDED".
Để verify, truy cập vào thư mục /opt/flink/savepoints/my-savepoint-2024. Bạn sẽ thấy các file checkpoint-* và file metadata chứa trạng thái của job.
Tối ưu hóa Slot và Parallelism trong flink-conf.yaml
Sự không khớp giữa số lượng CPU vật lý, số TaskManager slot và mức độ parallelism của job sẽ gây ra hiện tượng resource waste hoặc bottleneck. Một Task Slot là đơn vị tài nguyên nhỏ nhất để chạy một operator.
Mục tiêu là cân bằng số slot với số nhân CPU để tránh contention (tranh chấp tài nguyên) và đảm bảo mỗi operator có đủ tài nguyên xử lý.
Chỉnh sửa file flink-conf.yaml để định nghĩa số lượng slot trên mỗi TaskManager và mức parallelism mặc định.
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4
taskmanager.memory.process.size: 4096m
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 32m
Giải thích: taskmanager.numberOfTaskSlots: 4 nghĩa là mỗi TaskManager sẽ chia tài nguyên thành 4 phần độc lập. parallelism.default: 4 áp dụng mức song song 4 cho toàn bộ job nếu không chỉ định cụ thể trong code SQL.
Kết quả mong đợi: Khi khởi động cluster, mỗi TaskManager sẽ chiếm 4 slot. Nếu job có parallelism = 4, nó sẽ chạy trên 1 TaskManager. Nếu parallelism = 8, nó sẽ cần 2 TaskManager. Điều này tối ưu hóa việc sử dụng CPU.
Để verify, vào Flink Dashboard, chọn job đang chạy, vào tab "Parallelism". Bạn sẽ thấy biểu đồ hiển thị số lượng subtask đang chạy trên từng TaskManager. Nếu mọi slot đều được sử dụng và không có slot trống (idle) trong khi CPU thấp, bạn cần tăng parallelism. Ngược lại, nếu CPU cao (90%+) và có nhiều slot trống, hãy giảm parallelism.
Xử lý lỗi Backpressure và điều chỉnh Buffer Size
Backpressure xảy ra khi tốc độ xử lý (sink) chậm hơn tốc độ dữ liệu đầu vào (source). Dữ liệu sẽ bị ứ đọng trong các buffer giữa các operator, làm tăng latency và có thể gây tràn RAM (OOM).
Trong Flink, buffer size mặc định có thể không phù hợp với workload nặng. Bạn cần điều chỉnh kích thước buffer và số lượng IO threads để cân bằng throughput và latency.
Cấu hình các tham số buffer trong flink-conf.yaml để tăng dung lượng buffer tạm thời và tối ưu hóa network IO.
taskmanager.network.memory.fraction: 0.3
taskmanager.network.memory.min: 64m
taskmanager.network.buffer-size: 32k
taskmanager.network.num-spill-records: 10000
io.buffer.size: 32k
Giải thích: Tăng network.memory.fraction lên 0.3 để dành 30% RAM cho mạng. buffer-size: 32k tăng kích thước mỗi gói tin truyền đi, giảm overhead giao tiếp. num-spill-records quyết định khi nào Flink sẽ dump buffer xuống disk để tránh OOM.
Kết quả mong đợi: Khi chạy job với lượng dữ liệu lớn, Flink sẽ giữ được throughput cao hơn mà không bị tràn RAM. Tuy nhiên, latency có thể tăng nhẹ do dữ liệu nằm trong buffer lâu hơn.
Để verify backpressure, vào Flink Dashboard, chọn job, vào tab "Backpressure". Nếu bạn thấy màu đỏ (High) hoặc màu cam (Medium) trên các cạnh (edges) giữa các operator, điều đó có nghĩa là có tắc nghẽn. Sau khi áp dụng cấu hình trên, màu sắc này nên chuyển sang xanh lá (Low) hoặc xám (No backpressure).
Triển khai Job dưới dạng Container Docker và Kubernetes
Để chuẩn hóa môi trường và dễ dàng scale trên cloud, việc đóng gói Flink vào Docker container là bước bắt buộc. Kubernetes (K8s) sẽ đóng vai trò orchestrator để quản lý số lượng TaskManager động.
Bạn sẽ sử dụng image chính thức của Apache Flink để tạo cluster, thay vì cài đặt trực tiếp trên OS Ubuntu. Điều này giúp tách biệt dependency của Java và OS host.
Tạo file Dockerfile trong thư mục chứa job JAR của bạn để đóng gói môi trường Flink với code của bạn.
FROM apache/flink:1.18-scala_2.12-java17
# Copy file JAR của job vào container
COPY target/your-flink-job.jar /opt/flink/lib/
# Cấu hình startup script để tự động chạy job (tùy chọn)
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
Trong đó entrypoint.sh là script shell để khởi động Flink cluster với cấu hình tùy chỉnh:
#!/bin/bash
# Chạy Flink trong chế độ Standalone hoặc YARN/K8s tùy biến
# Ở đây giả sử chạy standalone để test
/opt/flink/bin/start-cluster.sh
sleep 10
/opt/flink/bin/flink run -c com.example.YourJobClassName /opt/flink/lib/your-flink-job.jar
Kết quả mong đợi: Khi build image bằng docker build -t my-flink-job:1.0 ., bạn sẽ có một container chứa đầy đủ Java, Flink và code của bạn. Chạy container này sẽ tự động khởi động cluster và chạy job.
Để verify trên Kubernetes, bạn sẽ sử dụng Helm Chart của Flink. Cài đặt Flink Operator trên K8s cluster của bạn:
helm repo add flink https://apache.github.io/flink-kubernetes-operator/
helm repo update
helm install flink-operator flink/flink-kubernetes-operator --namespace flink-system
Khởi tạo một FlinkCluster resource (YAML) để K8s tự động tạo JobManager và TaskManager dựa trên file JAR trong Docker image đã build.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: my-flink-app
namespace: flink-system
spec:
image: my-flink-job:1.0
version: v1_18
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
parallelism.default: "4"
jarUri: file:///opt/flink/lib/your-flink-job.jar
entryClass: com.example.YourJobClassName
runtimeMode: streaming
Kết quả mong đợi: Khi apply file YAML này bằng kubectl apply -f deployment.yaml, Kubernetes sẽ tạo ra các Pod cho JobManager và TaskManager. Bạn có thể scale TaskManager bằng cách chỉnh số replicas trong resource, K8s sẽ tự động thêm hoặc bớt container.
Để verify, chạy lệnh kubectl get pods -n flink-system. Bạn sẽ thấy các pod có prefix flinkjobmanager và flinktaskmanager ở trạng thái Running. Truy cập Dashboard JobManager qua kubectl port-forward svc/flinkjobmanager-service 8081:8081 để xem job đang chạy.
Điều hướng series:
Mục lục: Series: Triển khai Database Stream Processing với Apache Flink SQL và Ubuntu 24.04
« Phần 5: Xử lý dữ liệu phức tạp: Join Stream và CEP Pattern
Phần 7: Troubleshooting nâng cao và các mẹo vận hành Flink trên Ubuntu »