Triển khai Flink Operator và Cluster Manager
Để quản lý vòng đời của các job Flink trên Kubernetes, chúng ta cần cài đặt Apache Flink Kubernetes Operator. Operator này sẽ tự động tạo, scaling và giám sát các cluster Flink dựa trên các tài nguyên Kubernetes custom (CRD).
Cài đặt Flink Operator
Bước đầu tiên là thêm repository của Apache Flink vào cluster và cài đặt operator bằng Helm. Điều này sẽ triển khai các CRD cần thiết và deployment của operator.
helm repo add flink-operator https://apache.github.io/flink-kubernetes-operator/
helm repo update
helm install flink-operator flink-operator/flink-operator --namespace default --create-namespace
Kết quả mong đợi: Bạn thấy thông báo "Release 'flink-operator' has been installed" và các pod trong namespace default được tạo ra (flink-operator-controller-manager).
Verify bằng cách kiểm tra trạng thái pod của operator:
kubectl get pods -l app=flink-operator-controller-manager -n default
Kết quả mong đợi: Pod ở trạng thái "Running" với số lần restart là 0.
Tạo Cluster Flink
Sau khi operator đã chạy, chúng ta định nghĩa một cluster Flink thông qua một Custom Resource (FlinkCluster). Cluster này bao gồm một JobManager và các TaskManager.
Tạo file cấu hình flink-cluster.yaml với nội dung hoàn chỉnh dưới đây:
cat > flink-cluster.yaml
Kết quả mong đợi: File flink-cluster.yaml được tạo thành công trên local machine.
Áp dụng cấu hình vào Kubernetes để tạo cluster:
kubectl apply -f flink-cluster.yaml
Kết quả mong đợi: Thông báo "flinkcluster.flink.apache.org/flink-cluster created".
Verify trạng thái cluster:
kubectl get flinkcluster flink-cluster -o yaml
Kết quả mong đợi: Trong phần status, bạn thấy state là "Running" và các pod JobManager/TaskManager đã được tạo.
Cấu hình Flink JobManager và TaskManager trong K8s
Để đảm bảo JobManager và TaskManager hoạt động hiệu quả, chúng ta cần cấu hình networking và discovery mechanism phù hợp với môi trường Kubernetes.
Cấu hình Discovery và Networking
Flink trên K8s cần biết cách tìm địa chỉ của JobManager. Mặc định, operator sử dụng Kubernetes Service để làm điều này. Chúng ta sẽ cấu hình thêm để đảm bảo tính sẵn sàng cao.
Chỉnh sửa file flink-cluster.yaml (hoặc tạo file mới flink-cluster-config.yaml) để thêm cấu hình service type và discovery:
cat > flink-cluster-config.yaml
Kết quả mong đợi: File cấu hình mới được tạo với các tham số networking rõ ràng.
Áp dụng cấu hình mới:
kubectl apply -f flink-cluster-config.yaml
Kết quả mong đợi: Cluster mới được tạo với service type LoadBalancer (nếu cấu hình đúng) hoặc ClusterIP.
Verify service được tạo:
kubectl get svc -l app=flink-cluster-config-jm-service
Kết quả mong đợi: Thấy service flink-cluster-config-jm-service với trạng thái "LoadBalancer" hoặc "ClusterIP" và có địa chỉ IP.
Cài đặt dependency JAR và cấu hình Checkpointing
Để chạy các job xử lý dữ liệu phức tạp, chúng ta cần nạp các thư viện JAR bên ngoài vào container Flink và cấu hình cơ chế checkpoint để đảm bảo tính nhất quán dữ liệu (exactly-once semantics).
Cấu hình Dependency JAR
Thay vì build lại image Flink, chúng ta có thể sử dụng tính năng "dependency" của operator để mount các JAR vào container. Trong ví dụ này, giả sử chúng ta cần thư viện `flink-connector-kafka`.
Cập nhật file flink-cluster-dependency.yaml:
cat > flink-cluster-dependency.yaml
Kết quả mong đợi: File cấu hình được tạo với phần dependencies chứa artifact Kafka connector.
Triển khai cluster có dependency:
kubectl apply -f flink-cluster-dependency.yaml
Kết quả mong đợi: Cluster được tạo và operator tự động tải file JAR từ Maven Central vào container.
Cấu hình Checkpointing
Checkpointing là cơ chế lưu trạng thái (state) của job tại các thời điểm nhất định để phục hồi khi lỗi. Chúng ta sẽ cấu hình lưu checkpoint vào S3 (hoặc local volume nếu test).
Tạo file cấu hình flink-cluster-checkpoint.yaml với cấu hình state backend và checkpoint:
cat > flink-cluster-checkpoint.yaml
Kết quả mong đợi: File cấu hình checkpointing được tạo.
Áp dụng cấu hình:
kubectl apply -f flink-cluster-checkpoint.yaml
Kết quả mong đợi: Cluster được tạo với các thông số checkpointing đã được set trong flinkConfiguration.
Verify cấu hình trong JobManager:
kubectl exec -it $(kubectl get pods -l app=flink-cluster-checkpoint-jm -o jsonpath='{.items[0].metadata.name}') -n default -- cat /opt/flink/conf/flink-conf.yaml
Kết quả mong đợi: Nội dung file flink-conf.yaml hiển thị các dòng cấu hình checkpoint (checkpointing.mode, checkpoint.interval, etc.) đã được áp dụng.
Chạy job mẫu đơn giản để kiểm tra tính sẵn sàng
Bước cuối cùng là xác nhận cluster đã sẵn sàng bằng cách submit một job mẫu. Chúng ta sẽ sử dụng tính năng "default job" của operator để chạy một job ngay khi cluster khởi động.
Tạo Job mẫu với Operator
Thay vì dùng command line `flink run`, chúng ta khai báo job trực tiếp trong spec của FlinkCluster. Điều này giúp operator tự động quản lý vòng đời của job.
Tạo file flink-cluster-job.yaml:
cat > flink-cluster-job.yaml
Kết quả mong đợi: File cấu hình job được tạo. Lưu ý: Nếu không có S3, bạn có thể đổi `state.checkpoints.dir` thành một local volume hoặc để trống nếu chỉ test tính sẵn sàng.
Triển khai cluster với job:
kubectl apply -f flink-cluster-job.yaml
Kết quả mong đợi: Cluster được tạo và job WordCount tự động bắt đầu chạy.
Verify trạng thái job:
kubectl get flinkcluster flink-cluster-job -o jsonpath='{.status.jobManager.jobStatus}'
Kết quả mong đợi: Trạng thái là "RUNNING".
Để xem log của job, truy cập vào pod TaskManager:
kubectl logs -f $(kubectl get pods -l app=flink-cluster-job-tm -o jsonpath='{.items[0].metadata.name}') -n default | grep "org.apache.flink.examples.java.streaming.WordCount"
Kết quả mong đợi: Log hiển thị các dòng output của job WordCount (ví dụ: "hello: 1", "world: 2") hoặc thông báo job đang chạy.
Để truy cập vào giao diện Dashboard Flink (nếu service type là LoadBalancer hoặc dùng port-forwarding):
kubectl port-forward svc/flink-cluster-job-jm-service 8081:8081
Kết quả mong đợi: Mở trình duyệt truy cập http://localhost:8081, bạn thấy giao diện Dashboard Flink với job WordCount đang ở trạng thái "Running" và các thông số về checkpoint.
Điều hướng series:
Mục lục: Series: Series: Xây dựng hệ thống Real-time Analytics và Stream Processing với Apache Kafka, Flink và ClickHouse trên Kubernetes
« Phần 3: Đặt máy chủ ClickHouse vào Kubernetes cho lưu trữ dữ liệu lớn
Phần 5: Xây dựng pipeline ingest: Kết nối Kafka Source với Flink »