Cấu hình Watermark và Xử lý Dữ liệu Trễ
Thiết lập Watermark cho dòng sự kiện
Để Flink hiểu được thời gian trong dữ liệu (Event Time) thay vì thời gian hệ thống, ta cần định nghĩa cách Flink sinh ra watermark.
Watermark là cơ chế báo hiệu rằng tất cả sự kiện có timestamp nhỏ hơn giá trị watermark đã được nhận hết. Điều này kích hoạt các window đóng lại để tính toán.
Định nghĩa hàm WatermarkStrategy với độ trễ tối đa (allowed lateness) là 1000ms (1 giây).
WatermarkStrategy<SensorReading> strategy =
WatermarkStrategy
.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
Kết quả mong đợi: Flink sẽ bắt đầu phát watermark sau khi nhận được sự kiện đầu tiên, giá trị watermark sẽ là timestamp của sự kiện đó cộng thêm 1 giây.
Xử lý dữ liệu đến trễ (Late Data)
Dữ liệu trễ là những sự kiện đến sau khi watermark đã vượt qua timestamp của chúng, khiến window đã đóng.
Chúng ta sử dụng cơ chế Side Output để chuyển dữ liệu này sang một luồng riêng biệt thay vì bị bỏ qua.
Định nghĩa một SideOutputTag để đánh dấu dữ liệu trễ, sử dụng cho các phép tính Aggregation sau này.
public static class LateDataTag extends SideOutputTag<SensorReading> {
public LateDataTag() {}
}
Kết quả mong đợi: Khi cấu hình vào window, Flink sẽ tự động đẩy các sự kiện trễ vào Side Output này để ta xử lý riêng.
Cấu hình Allowed Lateness
Cho phép window mở lại một khoảng thời gian ngắn để chờ dữ liệu trễ đến và cập nhật kết quả ngay lập tức trước khi phát ra final result.
Thêm tham số allowedLateness vào pipeline xử lý window.
.dataStream(strategy)
.keyBy(SensorReading::getSensorId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(5))
.sideOutputs(new LateDataTag())
.aggregate(new SensorAggregator())
Kết quả mong đợi: Window sẽ đóng sau 10s + 5s (5s cho allowed lateness), dữ liệu đến trong 5s cuối vẫn được cập nhật vào kết quả, dữ liệu đến sau đó sẽ bị đẩy vào Side Output.
Triển khai các loại Window
Tumbling Window (Cửa sổ trượt không chồng lấn)
Tumbling Window chia dòng dữ liệu thành các khoảng thời gian cố định, rời rạc. Mỗi sự kiện chỉ thuộc về đúng một window.
Phù hợp cho các báo cáo theo phút/giờ chuẩn (ví dụ: tổng doanh thu mỗi 5 phút).
Cấu hình Tumbling Event Time Window với độ dài 10 giây.
window(TumblingEventTimeWindows.of(Time.seconds(10)))
Kết quả mong đợi: Window 1 chạy từ 0s-10s, Window 2 chạy từ 10s-20s. Không có sự giao thoa giữa các window.
Sliding Window (Cửa sổ trượt chồng lấn)
Sliding Window có độ dài (size) và bước trượt (slide) cố định. Các window sẽ chồng lên nhau.
Phù hợp khi cần tính toán xu hướng mượt mà hơn (ví dụ: trung bình 10 phút, cập nhật mỗi 5 phút).
Cấu hình Sliding Window với size 10s và slide 5s.
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
Kết quả mong đợi: Tại thời điểm 15s, Flink sẽ kích hoạt tính toán cho 2 window: window bắt đầu từ 5s (kết thúc 15s) và window bắt đầu từ 10s (kết thúc 20s).
Session Window (Cửa phiên)
Session Window nhóm các sự kiện dựa trên khoảng thời gian không hoạt động (gap).
Phù hợp cho phân tích hành vi người dùng (ví dụ: session hoạt động cho đến khi user không click gì trong 5 phút).
Cấu hình Session Window với gap là 10 giây.
window(SessionWindows.withGap(Time.seconds(10)))
Kết quả mong đợi: Window sẽ mở khi có sự kiện đầu tiên và chỉ đóng nếu không có sự kiện nào đến trong 10 giây tiếp theo.
Thực hiện Aggregation theo Key
Định nghĩa hàm Aggregation
Thay vì dùng WindowFunction phức tạp, ta sử dụng AggregateFunction để tính toán hiệu năng cao hơn.
Hàm này sẽ tích lũy state (Accumulator) và tạo ra kết quả cuối cùng (Result) khi window đóng.
Tạo class SensorAggregator kế thừa AggregateFunction.
public class SensorAggregator implements AggregateFunction<SensorReading, SensorAccumulator, SensorResult> {
@Override
public SensorAccumulator createAccumulator() {
return new SensorAccumulator();
}
@Override
public SensorAccumulator add(SensorReading value, SensorAccumulator acc) {
acc.sum += value.getTemperature();
acc.count++;
return acc;
}
@Override
public SensorResult getResult(SensorAccumulator acc) {
double avg = acc.sum / acc.count;
return new SensorResult(acc.count, acc.sum, avg);
}
@Override
public SensorAccumulator merge(SensorAccumulator a1, SensorAccumulator a2) {
a1.sum += a2.sum;
a1.count += a2.count;
return a1;
}
}
Kết quả mong đợi: Flink sẽ tự động gọi các hàm add, merge, getResult để tính sum, count, avg cho từng key.
Triển khai KeyBy và Aggregate
Áp dụng hàm Aggregation đã tạo vào dòng dữ liệu đã được chia window.
Sử dụng keyBy để nhóm dữ liệu theo ID cảm biến trước khi áp dụng window.
dataStream
.keyBy(SensorReading::getSensorId)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new SensorAggregator())
.print();
Kết quả mong đợi: Console sẽ in ra kết quả tổng hợp cho từng SensorId mỗi 10 giây.
Xử lý Side Output của Late Data
Để xem dữ liệu bị trễ đã được xử lý ra sao, ta cần lấy Side Output ra và in nó.
Sử dụng getSideOutput để tách luồng dữ liệu trễ.
DataStream<SensorResult> results = ...aggregate(...);
DataStream<SensorReading> lateStream = results.getSideOutput(new LateDataTag());
lateStream.print("LATE DATA:");
Kết quả mong đợi: Trong log Flink, bạn sẽ thấy dòng "LATE DATA:" xuất hiện khi có sự kiện đến trễ hơn allowed lateness.
Tối ưu hóa Memory và State Backend
Cấu hình State Backend cho Job nặng
Đối với stream processing có state (windowing, aggregation), ta cần lưu trữ trạng thái tạm thời.
Sử dụng RocksDBStateBackend thay vì HashMapStateBackend để xử lý lượng state lớn, giúp giảm tiêu thụ RAM.
Chỉnh sửa file configuration của Flink Job (thường là trong code hoặc flink-conf.yaml).
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
Kết quả mong đợi: Flink sẽ sử dụng RocksDB để lưu state trên disk (hoặc local fs) thay vì giữ toàn bộ trong RAM, tránh lỗi OOM.
Cấu hình Checkpoint để đảm bảo Exactly-Once
Checkpoint giúp lưu snapshot state định kỳ để phục hồi khi job crash.
Thiết lập thời gian checkpoint là 10 giây và đảm bảo enable checkpointing.
CheckpointsConfig checkpointsConfig = new CheckpointsConfig();
checkpointsConfig.enableCheckpoints();
checkpointsConfig.setCheckpointTimeout(Time.minutes(10));
checkpointsConfig.setCheckpointInterval(Time.seconds(10));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Kết quả mong đợi: Job sẽ chạy và tạo checkpoint mỗi 10 giây. Nếu job crash, khi restart nó sẽ load state từ checkpoint gần nhất.
Quản lý Memory cho TaskManager
Đảm bảo TaskManager có đủ bộ nhớ cho Managed Memory (State) và Task Heap (Code/Objects).
Chỉnh sửa file deployment (k8s-deployment.yaml hoặc flink-conf.yaml) để phân bổ memory hợp lý.
# flink-conf.yaml
taskmanager.memory.process.size: 4096m
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.1
Kết quả mong đợi: Flink sẽ tự động tính toán Managed Memory khoảng 1638MB (40% của 4GB) để chứa state, tránh tràn bộ nhớ.
Verify kết quả và Kiểm thử
Chạy Job và quan sát Log
Triển khai job đã cấu hình lên Kubernetes hoặc chạy local.
Sử dụng lệnh kubectl để xem log của pod Flink JobManager.
kubectl logs -f | grep "LATE DATA"
Kết quả mong đợi: Bạn thấy các dòng log về dữ liệu trễ và kết quả aggregation in ra theo chu kỳ.
Kiểm tra Web UI
Mở Flink Web UI (thường là port 8081) để kiểm tra trạng thái Window và Checkpoint.
Vào tab "Job", chọn job đang chạy, sau đó vào "State" và "Checkpoints".
# Forward port để truy cập từ local
kubectl port-forward svc/flink-jobmanager 8081:8081
Kết quả mong đợi: Web UI hiển thị trạng thái "Running", các checkpoint đang được tạo định kỳ và dung lượng state tăng dần.
Inject dữ liệu trễ thủ công
Để test tính năng xử lý late data, gửi sự kiện có timestamp cũ hơn hiện tại qua Kafka.
Sử dụng kafka-console-producer để gửi record có timestamp cố định trong quá khứ.
echo "{\"sensorId\":\"S1\",\"temperature\":25.5,\"timestamp\":1609459200000}" | \
kafka-console-producer --broker-list kafka:9092 --topic sensor-readings
Kết quả mong đợi: Nếu timestamp cũ hơn watermark hiện tại, sự kiện này sẽ xuất hiện trong log "LATE DATA:" chứ không nằm trong kết quả aggregation chính.
Điều hướng series:
Mục lục: Series: Series: Xây dựng hệ thống Real-time Analytics và Stream Processing với Apache Kafka, Flink và ClickHouse trên Kubernetes
« Phần 5: Xây dựng pipeline ingest: Kết nối Kafka Source với Flink
Phần 7: Tích hợp Flink Sink để đẩy dữ liệu vào ClickHouse »