Phân tích Log File để xác định nguyên nhân OOM và TaskManager Crash
Xác định vị trí và cấu trúc log của Flink
Flink trên Ubuntu 24.04 thường lưu log vào thư mục flink/log tương ứng với phiên bản cài đặt. TaskManager và JobManager có file log riêng biệt.
Trước khi debug, bạn cần xác định file log nào chứa sự cố. TaskManager crash thường do bộ nhớ (Heap) đầy hoặc lỗi trong code UDF, trong khi JobManager crash có thể do lỗi cấu hình cluster.
ls -l /opt/flink/log/
Kết quả mong đợi: Danh sách các file log với đuôi .log, ví dụ: jobmanager.log, taskmanager.log (hoặc có timestamp).
Chiến thuật tìm lỗi Out Of Memory (OOM)
Khi TaskManager bị OOM, JVM sẽ giết tiến trình (kill -9). Dấu hiệu đặc trưng là dòng log java.lang.OutOfMemoryError: Java heap space hoặc java.lang.OutOfMemoryError: GC overhead limit exceeded xuất hiện ngay trước khi file log bị cắt đứt đột ngột.
Sử dụng lệnh grep để lọc nhanh các lỗi này trong file log của TaskManager.
grep -A 5 -B 5 "OutOfMemoryError" /opt/flink/log/taskmanager.log | tail -n 50
Kết quả mong đợi: Đoạn log hiển thị stack trace của lỗi OOM, chỉ rõ task nào (ví dụ: TaskManager-1) và operator nào (ví dụ: Aggregate) gây tràn bộ nhớ.
Phân tích nguyên nhân TaskManager Crash do Lỗi Code
Nếu không phải OOM, TaskManager crash thường do exception trong code (NullPointerException, ArithmeticException). Flink sẽ ghi lại full stack trace vào log trước khi đóng connection.
Tìm kiếm các từ khóa Exception hoặc Error kèm theo Task failed để xác định điểm gãy.
grep -C 10 "Task failed" /opt/flink/log/taskmanager.log
Kết quả mong đợi: Một khối log dài mô tả lỗi, bao gồm tên exception, dòng code bị lỗi (nếu có line number) và nguyên nhân (cause).
Verify kết quả phân tích log
Để xác nhận nguyên nhân, bạn cần đối chiếu thời gian crash trong log với thời gian trên hệ thống. Sử dụng journalctl hoặc file log của systemd nếu bạn chạy Flink như một service.
systemctl status flink@jobmanager
Kết quả mong đợi: Trạng thái service là failed hoặc exited với exit code tương ứng (ví dụ: exit code 137 thường là OOM do kill -9).
Sử dụng Flink Dashboard để giám sát Metrics thời gian thực
Truy cập và cấu hình Dashboard cơ bản
Flink Dashboard (Web UI) là công cụ trực quan nhất để xem trạng thái job. Mặc định nó chạy trên cổng 8081 của JobManager. Nếu bạn đang chạy remote, cần mở cổng này trên firewall.
Kiểm tra xem JobManager có đang lắng nghe trên cổng 8081 không.
curl -s http://localhost:8081 | grep -o "Flink Version.*"
Kết quả mong đợi: Trả về dòng hiển thị phiên bản Flink (ví dụ: Flink Version 1.17.0), xác nhận Dashboard đang hoạt động.
Giám sát Throughput và Backpressure
Vào tab Jobs -> Chọn Job đang chạy -> Tab Metrics. Quan trọng nhất là chỉ số Throughput (băng thông) và Backpressure (áp lực).
Backpressure True có nghĩa là TaskManager đang xử lý chậm hơn tốc độ dữ liệu đầu vào, gây ra queue đầy. Đây là dấu hiệu cảnh báo hiệu năng kém hoặc lỗi logic.
curl -s "http://localhost:8081/jobs.json" | jq '.[] | select(.jobState == "RUNNING") | .jobID'
Kết quả mong đợi: Lấy được JobID đang chạy để tra cứu chi tiết metrics trên Dashboard hoặc qua API.
Giám sát Slot Usage và Resource Allocation
Tab Cluster trong Dashboard cho phép bạn xem phân bổ TaskManager và Slot. Nếu thấy Slot Usage luôn ở mức 100% mà Throughput thấp, đây là dấu hiệu của bottleneck (nút cổ chai).
Sử dụng API REST của Flink để lấy thông tin về slot usage dưới dạng JSON để tích hợp vào hệ thống monitoring khác (như Prometheus/Grafana).
curl -s "http://localhost:8081/clusters.json" | jq '.[] | .taskManagers'
Kết quả mong đợi: Danh sách TaskManager với thông tin số slot đang dùng và số slot trống.
Verify kết quả giám sát
Tạo một job test đơn giản và quan sát sự thay đổi metrics khi tăng lượng dữ liệu đầu vào đột ngột.
curl -s http://localhost:8081/jobs.json | jq '.[] | .jobState'
Kết quả mong đợi: Trạng thái job luôn là RUNNING và không bị FAILED hoặc CANCELED bất thường.
Xử lý các trường hợp Data bị Duplicate hoặc Mất mát
Nguyên nhân và cơ chế của Duplicate Data
Duplicate data trong Flink thường xảy ra khi có lỗi trong Sink (ví dụ: Kafka) hoặc khi Job bị restart mà không có checkpoint (exactly-once). Nếu bạn sử dụng AT_LEAST_ONCE, Flink sẽ đảm bảo không mất dữ liệu nhưng có thể gửi lại bản copy.
Kiểm tra cấu hình checkpoint trong file flink-conf.yaml để đảm bảo chế độ consistency đúng.
grep -E "state\.backend|state\.checkpoint" /opt/flink/conf/flink-conf.yaml
Kết quả mong đợi: Thấy các dòng cấu hình như state.backend: rocksdb và state.checkpoints.dir.
Xử lý Duplicate với Idempotent Sink hoặc Deduplication
Để loại bỏ duplicate, bạn cần áp dụng một trong hai cách: (1) Sử dụng Sink có tính idempotent (ghi đè nếu key trùng), hoặc (2) Thêm bước Deduplicate trong SQL bằng cách dùng ROW_NUMBER() hoặc ROW_NUMBER() OVER (PARTITION BY key).
Ví dụ SQL để loại bỏ duplicate dựa trên timestamp mới nhất cho mỗi key:
CREATE VIEW DeduplicatedStream AS
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_time DESC) AS rn
FROM RawStream
)
WHERE rn = 1;
Kết quả mong đợi: View DeduplicatedStream chỉ chứa duy nhất một bản ghi cho mỗi user_id (bản ghi mới nhất).
Xử lý Data Mất mát (Data Loss)
Data mất thường do TaskManager crash trước khi hoàn thành checkpoint. Nếu bạn không bật checkpoint hoặc checkpoint thất bại, dữ liệu giữa 2 checkpoint sẽ mất.
Để khắc phục, bật Checkpointing với mode EXACTLY_ONCE và đảm bảo Source (như Kafka) có offset được lưu lại trong state.
SET execution.checkpointing.interval = 60000;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.min-paused = 300000;
Kết quả mong đợi: Job sẽ tự động pause trong 5 phút nếu checkpoint không hoàn thành để tránh mất dữ liệu, sau đó resume hoặc fail.
Verify kết quả xử lý dữ liệu
Tạo một test case đơn giản: gửi 1000 dòng dữ liệu, gây crash TaskManager, rồi restart Job. Đếm số dòng trong Sink sau khi Job chạy lại.
SELECT COUNT(*) FROM DeduplicatedStream;
Kết quả mong đợi: Số dòng trong Sink phải bằng hoặc lớn hơn 1000 (nếu không mất dữ liệu) và không có duplicate (nếu đã deduplicate).
Mẹo debug SQL Flink và các lỗi Syntax phổ biến
Sử dụng EXPLAIN để phân tích Execution Plan
Trước khi chạy query, luôn dùng EXPLAIN để xem Flink dịch SQL thành DAG (Directed Acyclic Graph) như thế nào. Điều này giúp phát hiện lỗi logic hoặc các operation không được hỗ trợ.
EXPLAIN SELECT COUNT(*) FROM RawStream GROUP BY user_id;
Kết quả mong đợi: Hiển thị cây plan chi tiết với các operator như Source, Aggregate, Sink và các tham số cấu hình của chúng.
Debug lỗi Schema Mismatch
Lỗi phổ biến nhất là Schema mismatch khi Join hoặc Union các stream có trường không giống nhau. Flink SQL rất khắt khe về kiểu dữ liệu (Data Type).
Luôn kiểm tra schema của các table source trước khi viết query bằng lệnh DESCRIBE.
DESCRIBE RawStream;
Kết quả mong đợi: Danh sách các cột với tên, kiểu dữ liệu (VARCHAR, BIGINT, TIMESTAMP(3)) và watermark (nếu có).
Xử lý lỗi Time Characteristic và Watermark
Khi dùng Windowing, lỗi Event time without watermark thường xảy ra nếu bạn khai báo GROUP BY TUMBLE(event_time) nhưng không có watermark. Flink không thể đóng cửa sổ nếu không biết thời gian sự kiện.
Khắc phục bằng cách khai báo watermark rõ ràng trong CREATE TABLE.
CREATE TABLE StreamWithWatermark (
event_time TIMESTAMP(3),
data VARCHAR,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
Kết quả mong đợi: Query sử dụng TUMBLE(event_time) sẽ chạy thành công mà không báo lỗi watermark.
Debug lỗi Syntax với Common Table Expressions (CTE)
Nếu query quá phức tạp, hãy chia nhỏ bằng CTE (WITH). Điều này giúp cô lập lỗi và debug từng bước.
WITH CleanedData AS (
SELECT user_id, event_time, data FROM RawStream WHERE data IS NOT NULL
),
AggregatedData AS (
SELECT user_id, COUNT(*) AS cnt FROM CleanedData GROUP BY user_id
)
SELECT * FROM AggregatedData;
Kết quả mong đợi: Nếu có lỗi, Flink sẽ báo lỗi cụ thể trong CTE nào (ví dụ: lỗi trong CleanedData), giúp bạn sửa nhanh hơn.
Verify kết quả debug
Chạy lại query sau khi đã áp dụng các fix. Kiểm tra log của Flink để đảm bảo không còn cảnh báo Warning hoặc lỗi Error.
SELECT * FROM AggregatedData LIMIT 10;
Kết quả mong đợi: Trả về 10 dòng dữ liệu hợp lệ, Job giữ nguyên trạng thái RUNNING.
Điều hướng series:
Mục lục: Series: Triển khai Database Stream Processing với Apache Flink SQL và Ubuntu 24.04
« Phần 6: Tối ưu hóa hiệu năng và xử lý lỗi cho sản xuất