Nguyên lý High Availability (HA) của JobManager trong Apache Flink
Mục đích: Hiểu cơ chế hoạt động của HA để tránh cấu hình sai. Trong kiến trúc Flink, JobManager là điểm thất bại đơn (Single Point of Failure - SPOF) nếu không bật HA.
Cơ chế: Khi bật HA, Flink sẽ sử dụng một backend (thường là ZooKeeper hoặc HDFS/S3) để lưu trữ trạng thái metadata của JobManager. Khi JobManager chính (Active) gặp sự cố, một JobManager dự phòng (Standby) sẽ đọc metadata từ backend và tự động chuyển đổi trạng thái thành Active để tiếp tục xử lý.
Kết quả mong đợi: Cluster có thể chịu được sự cố của node chứa JobManager mà không làm gián đoạn quá trình xử lý dữ liệu (downtime chỉ xảy ra trong vài giây).
Cấu hình ZooKeeper làm Backend cho Flink HA
Chuẩn bị thư mục chia sẻ và cấu hình flink-conf.yaml
Mục đích: Tạo nơi lưu trữ trạng thái (HA backend) và chỉ định cho Flink biết địa chỉ ZooKeeper. Chúng ta sẽ sử dụng ZooKeeper đã cài đặt ở Phần 2 làm nơi lưu trữ metadata.
Giải thích: Cần tạo một thư mục trống trên hệ thống file (nơi Flink sẽ ghi log HA nếu cần) và chỉnh sửa file cấu hình chính `flink-conf.yaml` để bật HA mode.
Đường dẫn file: /opt/flink/conf/flink-conf.yaml (Giả sử Flink đã cài tại /opt/flink)
Thao tác: Thêm các dòng sau vào cuối file `flink-conf.yaml`:
ha: true
ha.storage-dir: file:///opt/flink/ha-storage
ha.zookeeper.quorum: localhost:2181
ha.zookeeper.root-path: /flink-ha
ha.jobmanager.rpc-address: localhost
ha.jobmanager.rpc-port: 6123
Kết quả mong đợi: File cấu hình được cập nhật, Flink biết phải kết nối với ZooKeeper tại localhost:2181 và lưu metadata vào path `/flink-ha`.
Thiết lập thư mục HA Storage
Mục đích: Tạo thư mục vật lý tương ứng với `ha.storage-dir` để tránh lỗi Permission Denied khi Flink khởi động.
Giải thích: Flink cần quyền ghi vào thư mục này để lưu các file checkpoint hoặc log HA. Đảm bảo user chạy Flink (ví dụ: `flink` hoặc `root`) có quyền truy cập.
sudo mkdir -p /opt/flink/ha-storage
sudo chown -R $USER:$USER /opt/flink/ha-storage
Kết quả mong đợi: Thư mục được tạo thành công, không có lỗi quyền truy cập.
Triển khai Cluster HA với 2 JobManager (Active/Standby)
Chỉnh sửa script khởi động JobManager
Mục đích: Cấu hình để cluster chạy 2 instance JobManager đồng thời: 1 Active và 1 Standby.
Giải thích: Trong file `jobmanager.yaml`, ta cần thay đổi số lượng task manager và đặc biệt là cấu hình `jobmanager.memory.process.size` nếu cần, nhưng quan trọng nhất là đảm bảo Flink hiểu rằng đây là cluster HA. Flink tự động chọn Active/Standby dựa trên ZooKeeper.
Đường dẫn file: /opt/flink/conf/jobmanager.yaml
Thao tác: Đảm bảo file này không bị ghi đè các cấu hình HA. Nếu cần tăng bộ nhớ cho HA, thêm vào file này:
jobmanager.memory.process.size: 2g
Kết quả mong đợi: File cấu hình sẵn sàng để khởi động cluster.
Khởi động Cluster HA
Mục đích: Chạy cluster với 2 JobManager để kiểm tra cơ chế (election).
Giải thích: Sử dụng lệnh `start-cluster.sh` hoặc `start-jobmanager.sh`. Khi chạy, Flink sẽ khởi động 2 tiến trình JobManager. Một tiến trình sẽ đăng ký vào ZooKeeper và trở thành Active, tiến trình kia sẽ ở trạng thái Standby (chờ đợi).
cd /opt/flink
./bin/start-cluster.sh
Kết quả mong đợi: Xuất hiện 2 tiến trình Java chạy JobManager. Kiểm tra bằng lệnh `jps` sẽ thấy 2 tiến trình `JobManager` hoặc `FlinkRunner` (tùy phiên bản) cùng tồn tại.
Verify trạng thái HA trên ZooKeeper
Mục đích: Xác nhận Flink đã kết nối thành công với ZooKeeper và đã tạo node metadata.
Giải thích: Sử dụng công cụ `zkCli.sh` (có sẵn trong package ZooKeeper) để xem node `/flink-ha` đã được tạo chưa. Node này chứa thông tin về JobManager Active hiện tại.
cd /opt/zookeeper/bin
./zkCli.sh -server localhost:2181
ls /flink-ha
get /flink-ha/jobmanager
Kết quả mong đợi: Lệnh `ls` trả về danh sách các node (thường là `jobmanager`), và lệnh `get` trả về địa chỉ IP/Port của JobManager đang là Active.
Xử lý sự cố JobManager Crash và Recovery
Simulate sự cố bằng cách kill JobManager Active
Mục đích: Kiểm tra tính năng tự động failover của Flink.
Giải thích: Chúng ta sẽ tìm tiến trình JobManager đang là Active (thường là tiến trình có PID đầu tiên hoặc kiểm tra qua Web UI/ZooKeeper) và dùng lệnh `kill` để tắt nó đột ngột.
ps aux | grep JobManager
# Giả sử PID của Active là 12345
kill -9 12345
Kết quả mong đợi: Tiến trình Active bị tắt ngay lập tức. Trong vòng 1-5 giây, JobManager Standby sẽ nhận tín hiệu từ ZooKeeper và chuyển sang trạng thái Active.
Quan sát quá trình Recovery
Mục đích: Xác minh rằng Job mới đã được tiếp tục xử lý mà không mất dữ liệu (State được giữ nguyên nhờ Checkpoint, nếu đã cấu hình).
Giải thích: Kiểm tra lại danh sách tiến trình và Web UI. Nếu Job đang chạy (ví dụ: WordCount hoặc Kafka Source), dòng log sẽ xuất hiện thông báo về việc "New JobManager elected" hoặc "Resuming job from checkpoint".
ps aux | grep JobManager
# Kiểm tra log Flink để thấy thông báo recovery
tail -f /opt/flink/log/flink-*-jobmanager-*.log
Kết quả mong đợi:
- `ps` hiện ra 2 tiến trình JobManager (Flink tự động khởi động lại instance bị kill hoặc chuyển Standby lên Active).
- Log hiển thị: "JobManager [ID] is now the new leader" hoặc "Restoring state from checkpoint".
- Web UI vẫn truy cập được và job vẫn chạy (có thể có độ trễ nhỏ).
Kiểm tra tính sẵn sàng cao bằng Web UI và CLI
Truy cập Web UI sau khi failover
Mục đích: Đảm bảo người dùng không bị ngắt kết nối hoàn toàn khi JobManager thay đổi.
Giải thích: Truy cập lại địa chỉ Web UI (thường là http://localhost:8081). Nếu cấu hình HA đúng, Web UI sẽ tự động chuyển hướng (redirect) hoặc reload để hiển thị thông tin của JobManager mới.
curl -I http://localhost:8081
Kết quả mong đợi: HTTP 200 OK. Giao diện web hiển thị danh sách job đang chạy (Running) thay vì bị lỗi hoặc trống rỗng.
Kiểm tra bằng Flink CLI
Mục đích: Dùng lệnh dòng lệnh để xác nhận trạng thái job sau sự cố.
Giải thích: Sử dụng `flink list` để xem danh sách job. Trong môi trường HA, lệnh này sẽ tự động kết nối đến JobManager Active mới nhất.
./bin/flink list
Kết quả mong đợi: Danh sách job hiển thị trạng thái `RUNNING`. Nếu có checkpoint, trạng thái có thể là `RUNNING (CHECKPOINTING)` hoặc `RUNNING (SUSPENDED)` trong quá trình recovery.
Verify lại trên ZooKeeper
Mục đích: Xác nhận node metadata trên ZooKeeper đã được cập nhật với thông tin JobManager mới.
./zkCli.sh -server localhost:2181
get /flink-ha/jobmanager
Kết quả mong đợi: Giá trị trả về của node `jobmanager` là địa chỉ IP/Port của JobManager mới (Standby cũ) đang đảm nhận vai trò Active.
Điều hướng series:
Mục lục: Series: Triển khai Database Streaming với Apache Flink và Ubuntu 24.04
« Phần 6: Xử lý dữ liệu có trạng thái (Stateful) và Windowing
Phần 8: Giám sát, Troubleshooting và Tips nâng cao »