Cấu hình file flink-conf.yaml cho Standalone Cluster
Chỉnh sửa tham số JobManager và TaskManager
File cấu hình chính flink-conf.yaml nằm trong thư mục conf của package Apache Flink. Chúng ta cần khai báo rõ ràng các tham số memory, parallelism và địa chỉ liên kết để đảm bảo JobManager và TaskManager hoạt động trên cùng một host.
Tại sao: Việc định nghĩa rõ ràng tài nguyên giúp tránh xung đột cổng (port conflict) và đảm bảo TaskManager có đủ bộ nhớ để xử lý các toán tử (operator) trong pipeline HTAP.
Kết quả mong đợi: File cấu hình được ghi đè, sẵn sàng để khởi động cụm mà không gặp lỗi về tài nguyên hoặc địa chỉ mạng.
Sử dụng trình soạn thảo nano để chỉnh sửa file cấu hình:
sudo nano /opt/flink/conf/flink-conf.yaml
Thay thế toàn bộ nội dung file bằng cấu hình tối ưu cho môi trường Standalone như sau (giả định RAM server 8GB):
jobmanager.rpc.address: 0.0.0.0
jobmanager.rpc.port: 6123
jobmanager.web.address: 0.0.0.0
jobmanager.web.port: 8081
jobmanager.memory.process.size: 2g
jobmanager.memory.taskmanager.rpc: 256m
jobmanager.memory.framework: 256m
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 6g
taskmanager.memory.managed.fraction: 0.6
taskmanager.memory.network.fraction: 0.1
parallelism.default: 2
taskmanager.heap.size: 4g
taskmanager.offheap.size: 1g
state.backend: rocksdb
state.checkpoints.dir: file:///opt/flink/checkpoints
state.savepoints.dir: file:///opt/flink/savepoints
high-availability: true
high-availability.storageDir: file:///opt/flink/ha
Verify: Kiểm tra lại file cấu hình đã được lưu và các giá trị memory không vượt quá tổng RAM vật lý của máy.
Cài đặt và khởi động JobManager và TaskManager
Chạy script khởi động cụm Standalone
Chúng ta sẽ sử dụng script start-cluster.sh có sẵn trong package. Script này tự động khởi động cả tiến trình JobManager và TaskManager trên cùng một node.
Tại sao: Trong môi trường phát triển hoặc testing trên một máy, việc chạy cả hai thành phần trên cùng một host giúp giảm độ trễ mạng nội bộ và đơn giản hóa quy trình vận hành ban đầu.
Kết quả mong đợi: Hai tiến trình Java chạy nền, log khởi động hiển thị thông báo "JobManager started" và "TaskManager registered".
Trước khi chạy, hãy tạo thư mục để chứa các file checkpoint và savepoint (được khai báo trong config):
sudo mkdir -p /opt/flink/checkpoints /opt/flink/savepoints /opt/flink/ha
sudo chown -R $USER:$USER /opt/flink
Di chuyển vào thư mục bin và thực thi lệnh khởi động:
cd /opt/flink/bin
./start-cluster.sh
Verify: Kiểm tra các tiến trình Java đang chạy bằng lệnh:
ps aux | grep flink
Bạn sẽ thấy ít nhất 2 dòng chứa org.apache.flink.runtime.jobmanager và org.apache.flink.runtime.taskmanager.
Cấu hình cổng Web UI và truy cập Flink Dashboard
Truy cập giao diện quản trị qua trình duyệt
Flink cung cấp một Web UI mạnh mẽ để giám sát trạng thái của job, xem log thời gian thực và điều chỉnh tham số. Mặc định, cổng này là 8081.
Tại sao: Dashboard là công cụ chính để Sysadmin quan sát độ trễ (latency), tốc độ xử lý (throughput) và trạng thái checkpoint của các job HTAP.
Kết quả mong đợi: Trình duyệt mở trang Flink Dashboard hiển thị trạng thái "Healthy" và danh sách các TaskManager đã join vào cụm.
Trên server, đảm bảo firewall cho phép truy cập cổng 8081 (nếu đang bật ufw):
sudo ufw allow 8081/tcp
Trên máy trạm của bạn (hoặc trên chính server nếu có trình duyệt), mở trình duyệt và truy cập địa chỉ IP của server:
http://YOUR_SERVER_IP:8081
Verify: Trong tab "Jobs", bạn sẽ thấy danh sách trống (vì chưa submit job nào). Trong tab "Cluster", bạn sẽ thấy 1 JobManager và 1 TaskManager với trạng thái "Running".
Tối ưu hóa tham số memory và parallelism
Điều chỉnh phân bổ bộ nhớ cho mô hình HTAP
Đối với mô hình HTAP (Hybrid Transactional/Analytical Processing), bộ nhớ đóng vai trò then chốt. Chúng ta cần tối ưu managed.memory để lưu trữ state (state backend RocksDB) và network.memory để xử lý dữ liệu dòng (data streaming).
Tại sao: Nếu managed.memory quá nhỏ, RocksDB sẽ thường xuyên flush dữ liệu ra disk gây chậm. Nếu network.memory quá nhỏ, các job ETL sẽ bị lỗi backpressure khi tốc độ nhập cao.
Kết quả mong đợi: TaskManager sử dụng bộ nhớ hiệu quả hơn, giảm thiểu hiện tượng OOM (Out Of Memory) trong các job phức tạp.
Chỉnh sửa lại file cấu hình để tinh chỉnh tỷ lệ phân bổ bộ nhớ (giả sử TaskManager có 6GB total, ta muốn 4GB cho managed memory):
sudo nano /opt/flink/conf/flink-conf.yaml
Cập nhật các dòng sau trong file (nếu chưa có hoặc muốn thay đổi):
taskmanager.memory.managed.fraction: 0.66
taskmanager.memory.network.fraction: 0.15
taskmanager.heap.size: 4g
taskmanager.offheap.size: 1.5g
Để thay đổi có hiệu lực, bạn cần dừng và khởi động lại cụm:
cd /opt/flink/bin
./stop-cluster.sh
./start-cluster.sh
Verify: Vào lại Web UI (8081), chọn TaskManager, vào tab "Memory". Kiểm tra xem giá trị "Managed Memory" có khớp với tính toán (khoảng 4GB) không.
Cấu hình Parallelism mặc định
Tham số parallelism.default quyết định số lượng slot mà mỗi job sử dụng nếu không khai báo cụ thể.
Tại sao: Đối với PostgreSQL HTAP, việc tăng parallelism giúp tăng throughput ghi dữ liệu (write throughput) nhưng có thể gây tải nặng cho DB nếu không kiểm soát. Giá trị 2-4 là an toàn cho 1 node standalone.
Kết quả mong đợi: Các job được submit sẽ tự động chạy song song với số luồng đã định nghĩa, tận dụng tối đa CPU của TaskManager.
Đảm bảo dòng sau đã tồn tại trong flink-conf.yaml:
parallelism.default: 2
Verify: Khi submit một job SQL đơn giản, kiểm tra trong Dashboard xem số lượng "Parallelism" của các bước (step) trong DAG có bằng 2 không.
Cấu hình logging và giám sát cơ bản
Chuyển hướng log file ra vị trí dễ truy cập
Apache Flink ghi log theo mặc định vào thư mục logs bên cạnh thư mục bin. Để dễ dàng giám sát và rotate log, chúng ta cần cấu hình file log4j2.properties.
Tại sao: Trong môi trường sản xuất, log là nguồn dữ liệu đầu tiên để debug sự cố. Việc cấu hình log riêng biệt cho JobManager và TaskManager giúp phân tách vấn đề.
Kết quả mong đợi: Các file log được tạo mới trong thư mục logs với định dạng rõ ràng, bao gồm timestamp và level (INFO, WARN, ERROR).
Sao chép file cấu hình log mặc định ra để chỉnh sửa:
cp /opt/flink/conf/log4j2.properties /opt/flink/conf/log4j2.properties.bak
sudo nano /opt/flink/conf/log4j2.properties
Cập nhật phần Root Logger để đảm bảo log được ghi ra file và console, đồng thời tăng mức độ chi tiết lên INFO cho giai đoạn debug:
# Root logger configuration
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = Console
rootLogger.appenderRef.file.ref = File
# Console Appender
appender.console.name = Console
appender.console.type = Console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# File Appender
appender.File.name = File
appender.File.type = RollingFile
appender.File.fileName = ${sys:log.dir}/flink.log
appender.File.filePattern = ${sys:log.dir}/flink-%d{yyyy-MM-dd}.log
appender.File.layout.type = PatternLayout
appender.File.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.File.policies.type = Policies
appender.File.policies.time.type = TimeBasedTriggeringPolicy
appender.File.policies.time.interval = 1
appender.File.policies.time.modulate = true
appender.File.strategy.type = DefaultRolloverStrategy
appender.File.strategy.max = 10
Khởi động lại cụm để áp dụng cấu hình log mới:
cd /opt/flink/bin
./stop-cluster.sh
./start-cluster.sh
Verify: Theo dõi log thời gian thực bằng lệnh tail:
tail -f /opt/flink/logs/flink.log
Bạn sẽ thấy dòng log mới nhất xuất hiện ngay lập tức khi cụm hoạt động hoặc khi có job chạy.
Kiểm tra trạng thái Health của cụm
Sử dụng REST API của Flink để kiểm tra nhanh trạng thái của cụm mà không cần vào Web UI.
Tại sao: Đây là cách để tích hợp vào các script monitoring (như Prometheus Node Exporter hoặc script shell cron) để tự động cảnh báo.
Kết quả mong đợi: JSON trả về trạng thái "OK" và thông tin chi tiết về các TaskManager đang hoạt động.
Thực hiện lệnh curl để gọi API health check:
curl http://localhost:8081/jobs
Và để kiểm tra danh sách TaskManager:
curl http://localhost:8081/taskmanagers
Verify: Kết quả trả về là một chuỗi JSON. Nếu thấy "jobManagerHost" và danh sách "taskmanagers" không rỗng, cụm đã hoạt động ổn định.
Điều hướng series:
Mục lục: Series: Triển khai Database HTAP với Apache Flink và PostgreSQL trên Ubuntu 24.04
« Phần 2: Cấu hình PostgreSQL để tối ưu hóa cho mô hình HTAP
Phần 4: Tích hợp Flink với PostgreSQL thông qua JDBC Connector »