1. Cấu hình State Backend: Chuyển đổi từ Memory sang RocksDB
1.1. Khái niệm về State Backend
State Backend là cơ chế lưu trữ trạng thái (state) của ứng dụng Flink. Trong môi trường streaming có trạng thái, Flink cần lưu lại dữ liệu lịch sử để tính toán.
loại chính: MemoryStateBackend (lưu trong RAM, nhanh nhưng mất dữ liệu khi restart và giới hạn dung lượng), FileSystemStateBackend (lưu trong HDFS/S3, chậm hơn nhưng an toàn hơn), và RocksDBStateBackend (lưu trên đĩa cứng của TaskManager, hỗ trợ state lớn hơn bộ nhớ RAM, hiệu suất cao cho state khổng lồ).
Đối với bài này, chúng ta sẽ cấu hình RocksDBStateBackend vì nó là tiêu chuẩn cho các ứng dụng production có state lớn.
1.2. Cấu hình RocksDB trong flink-conf.yaml
Chỉnh sửa file cấu hình chính của Flink để chuyển đổi backend sang RocksDB. File này nằm ở thư mục installation của bạn.
Đường dẫn file: /opt/flink/conf/flink-conf.yaml
Ta sẽ thêm các dòng cấu hình sau vào cuối file. Nội dung dưới đây bao gồm cả phần cơ bản và một số tham số tối ưu hóa RocksDB.
state.backend: rocksdb
state.backend.rocksdb.db-root-dir: /opt/flink/data/rocksdb
state.backend.incremental-checkpointing: true
state.checkpoints.dir: file:///opt/flink/checkpoints
state.savepoints.dir: file:///opt/flink/savepoints
Kết quả mong đợi: File cấu hình đã được cập nhật. Khi restart JobManager hoặc TaskManager, Flink sẽ khởi tạo RocksDB instance thay vì dùng bộ nhớ RAM cho state.
1.3. Tạo thư mục lưu trữ và cấp quyền
RocksDB cần một thư mục vật lý trên disk để lưu trữ database. Nếu không tạo trước, Flink có thể gặp lỗi permission hoặc OOM nếu disk đầy.
Thực hiện tạo thư mục và cấp quyền cho user flink (hoặc user hiện tại đang chạy).
sudo mkdir -p /opt/flink/data/rocksdb
sudo mkdir -p /opt/flink/checkpoints
sudo mkdir -p /opt/flink/savepoints
sudo chown -R flink:flink /opt/flink/data
sudo chmod -R 755 /opt/flink/data
Kết quả mong đợi: Các thư mục được tạo thành công, user flink có quyền đọc/ghi. Không có lỗi permission khi chạy job.
1.4. Verify cấu hình State Backend
Để đảm bảo cấu hình đã được tải, hãy kiểm tra thông tin của TaskManager sau khi khởi động lại cluster.
curl http://localhost:8081/jobs?limit=0
# Hoặc xem chi tiết trong Web UI tại tab "Configuration" của TaskManager
Kết quả mong đợi: Trong Web UI Flink, mục State Backend hiển thị rõ ràng là org.apache.flink.runtime.state.RocksDBStateBackend.
2. Triển khai Windowing: Tumbling và Sliding
2.1. Khái niệm về Windowing
Windowing chia luồng dữ liệu vô tận thành các tập hữu hạn để xử lý. Flink hỗ trợ nhiều loại window.
Tumbling Window: Cửa sổ không trùng lặp, kích thước cố định (ví dụ: 5 giây). Khi hết 5 giây, tính toán và đóng cửa sổ, bắt đầu cửa sổ mới.
Sliding Window: Cửa sổ trượt, có kích thước cố định nhưng trượt theo một khoảng thời gian nhỏ hơn (ví dụ: 10 giây trượt 5 giây). Dữ liệu có thể thuộc về nhiều cửa sổ.
2.2. Viết ứng dụng xử lý Window
Ta sẽ viết một job Java đơn giản để demo cả hai loại window. Job này đọc từ Kafka (hoặc source mock), nhóm theo key, và áp dụng window.
Đường dẫn file source: /home/flink/workspaces/WindowingExample/src/main/java/com/example/WindowingJob.java
Dưới đây là nội dung hoàn chỉnh của file Java.
package com.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
public class WindowingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Mock data stream (thay thế Kafka source trong demo này)
DataStream source = env.fromElements(
"user1,100", "user1,200", "user2,50",
"user1,150", "user2,100", "user1,300"
).map(line -> line.split(","))
.map(parts -> new Event(parts[0], Integer.parseInt(parts[1])));
// 1. Tumbling Window (5 giây)
DataStream tumblingResult = source
.keyBy(Event::userId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new SumAggregator(), new PrintTumblingFunction());
// 2. Sliding Window (10 giây trượt 5 giây)
DataStream slidingResult = source
.keyBy(Event::userId)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new SumAggregator(), new PrintSlidingFunction());
tumblingResult.print("Tumbling:");
slidingResult.print("Sliding:");
env.execute("Windowing Demo");
}
static class Event {
String userId;
int amount;
public Event(String userId, int amount) { this.userId = userId; this.amount = amount; }
public String userId() { return userId; }
public int amount() { return amount; }
}
static class WindowResult {
String userId;
int total;
long start;
long end;
public WindowResult(String userId, int total, long start, long end) {
this.userId = userId; this.total = total; this.start = start; this.end = end;
}
@Override public String toString() {
return "User=" + userId + " Sum=" + total + " Window=[" + start + "," + end + "]";
}
}
static class SumAggregator implements AggregateFunction {
@Override public Integer createAccumulator() { return 0; }
@Override public Integer add(Event value, Integer acc) { return acc + value.amount; }
@Override public Integer getResult(Integer acc) { return acc; }
@Override public Integer merge(Integer acc1, Integer acc2) { return acc1 + acc2; }
}
static class PrintTumblingFunction extends ProcessWindowFunction {
@Override public void process(String key, Context context, Iterable values, Collector out) {
int sum = 0;
for (int v : values) sum += v;
out.collect(new WindowResult(key, sum, context.window().getStart(), context.window().getEnd()));
}
}
static class PrintSlidingFunction extends ProcessWindowFunction {
@Override public void process(String key, Context context, Iterable values, Collector out) {
int sum = 0;
for (int v : values) sum += v;
out.collect(new WindowResult(key, sum, context.window().getStart(), context.window().getEnd()));
}
}
}
Kết quả mong đợi: Khi build và chạy job, console sẽ hiển thị dòng output cho Tumbling (mỗi 5s một lần) và Sliding (mỗi 5s một lần nhưng với khoảng thời gian bao phủ khác nhau). Dữ liệu được nhóm theo userId.
2.3. Build và Submit Job
Sử dụng Maven để build jar file và submit vào cluster đang chạy.
cd /home/flink/workspaces/WindowingExample
mvn clean package
flink run -c com.example.WindowingJob target/WindowingExample-1.0-SNAPSHOT.jar
Kết quả mong đợi: Job xuất hiện trong Web UI Flink với trạng thái RUNNING. Logs trong console bắt đầu in các dòng kết quả window.
3. Xử lý Late Data và Watermark
3.1. Khái niệm Watermark và Late Data
Watermark: Là dấu mốc thời gian logic trong luồng sự kiện, báo cho Flink biết rằng "không còn sự kiện nào đến trước thời gian này nữa". Nó kích hoạt việc đóng window.
Late Data: Là dữ liệu đến muộn hơn thời gian đóng window của nó. Nếu không xử lý, dữ liệu này sẽ bị mất.
Flink cung cấp cơ chế Late Data Handling để giữ lại dữ liệu này trong một khoảng thời gian chờ (allowed lateness) hoặc gửi đến side output.
3.2. Cấu hình Watermark Strategy
Chúng ta sẽ cập nhật code để xử lý late data. Thêm WatermarkStrategy để định nghĩa cách tính watermark (dựa trên timestamp trong data) và cấu hình allowedLateness.
File: /home/flink/workspaces/LateDataJob/src/main/java/com/example/LateDataJob.java
package com.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class LateDataJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Giả lập data có timestamp (eventTime)
DataStream source = env.fromElements(
new Event("user1", 100, 1000L),
new Event("user1", 200, 2000L),
new Event("user1", 150, 1500L), // Data đến muộn (out of order)
new Event("user1", 300, 3000L),
new Event("user1", 250, 2500L)
).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((event, timestamp) -> event.timestamp)
);
// Tumbling Window 2 giây (2000ms)
// Allowed lateness 1 giây: giữ window thêm 1s để chờ late data
DataStream result = source
.keyBy(Event::userId)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.allowedLateness(Time.seconds(1))
.process(new LateDataProcessFunction())
.name("LateDataProcess");
result.print();
env.execute("Late Data Handling Demo");
}
static class Event {
String userId;
int value;
long timestamp;
public Event(String userId, int value, long timestamp) {
this.userId = userId; this.value = value; this.timestamp = timestamp;
}
public String userId() { return userId; }
public int value() { return value; }
}
static class WindowResult {
String userId; int sum; long start; long end; boolean isLate;
public WindowResult(String u, int s, long st, long en, boolean late) {
this.userId=u; this.sum=s; this.start=st; this.end=en; this.isLate=late;
}
@Override public String toString() {
return "User=" + userId + " Sum=" + sum + " [" + start + "," + end + "] Late=" + isLate;
}
}
static class LateDataProcessFunction extends ProcessWindowFunction {
@Override
public void process(String key, Context ctx, Iterable elements, Collector out) {
int sum = 0;
for (Event e : elements) sum += e.value();
// Kiểm tra nếu đang ở trong phase xử lý late data
boolean isLate = ctx.isEventTime();
// Logic đơn giản: nếu watermark > end window thì coi là late data trong allowed lateness
long watermark = ctx.currentWatermark();
boolean isLateData = watermark > ctx.window().getEnd();
out.collect(new WindowResult(key, sum, ctx.window().getStart(), ctx.window().getEnd(), isLateData));
}
}
}
Kết quả mong đợi: Window sẽ không đóng ngay khi hết 2 giây. Nó sẽ chờ thêm 1 giây (allowed lateness). Dữ liệu đến trong khoảng chờ đó sẽ được cộng vào kết quả. Output sẽ có cờ Late=true nếu dữ liệu đó đến sau khi window đã đóng ban đầu.
3.3. Build và Chạy Job xử lý Late Data
cd /home/flink/workspaces/LateDataJob
mvn clean package
flink run -c com.example.LateDataJob target/LateDataJob-1.0-SNAPSHOT.jar
Kết quả mong đợi: Trong logs, bạn thấy kết quả window được in ra muộn hơn so với thời gian thực tế do cơ chế allowedLateness và Watermark.
4. Quản lý và Reset State
4.1. Kiểm tra State sau khi lỗi
Khi job bị lỗi (failure), Flink sẽ cố gắng recover từ checkpoint gần nhất nếu cấu hình tự động recovery. Nếu state backend là RocksDB, state được lưu trên disk.
Để kiểm tra xem state có tồn tại không, ta xem thư mục RocksDB hoặc dùng Web UI.
ls -lh /opt/flink/data/rocksdb
# Kiểm tra checkpoint hiện tại
curl http://localhost:8081/jobs/{jobId}/checkpoints
Kết quả mong đợi: Thư mục /opt/flink/data/rocksdb chứa các file dữ liệu của RocksDB. API trả về danh sách các checkpoint đã lưu.
4.2. Reset State bằng Savepoint
Để reset state (xóa sạch dữ liệu cũ) mà không cần xóa file trên disk thủ công, cách chuẩn là tạo một Savepoint rồi submit job với option --fromSavepoint nhưng với cấu hình state mới, hoặc đơn giản hơn là dùng lệnh cancel rồi chạy lại job với flag xóa state.
Tuy nhiên, cách phổ biến nhất để reset state trong production là tạo savepoint mới, sau đó khi submit lại job, chỉ định savepoint đó nhưng đảm bảo schema state mới tương thích, hoặc dùng lệnh CLI để xóa state.
Để xóa state hoàn toàn và bắt đầu từ đầu (clean state), ta dùng lệnh submit với tùy chọn không load state cũ.
# 1. Dừng job hiện tại
flink cancel {jobId}
# 2. Xóa dữ liệu RocksDB cũ (Cẩn thận: Xóa toàn bộ state vật lý)
sudo rm -rf /opt/flink/data/rocksdb/*
# 3. Submit lại job (State sẽ trống)
flink run -c com.example.WindowingJob target/WindowingExample-1.0-SNAPSHOT.jar
Kết quả mong đợi: Job chạy lại từ đầu, kết quả tính toán sẽ bắt đầu từ 0 hoặc giá trị mặc định, không còn dữ liệu tích lũy từ lần chạy trước.
4.3. Tạo Savepoint để Backup State
Trước khi reset hoặc upgrade, nên tạo savepoint để backup state hiện tại.
flink savepoint {jobId} /opt/flink/savepoints/my-backup-$(date +%s)
Kết quả mong đợi: Một thư mục mới được tạo trong /opt/flink/savepoints/ chứa snapshot của state RocksDB tại thời điểm đó. Bạn có thể dùng đường dẫn này để restore job sau này.
Điều hướng series:
Mục lục: Series: Triển khai Database Streaming với Apache Flink và Ubuntu 24.04
« Phần 5: Phát triển và submit ứng dụng Streaming đơn giản
Phần 7: Cấu hình High Availability (HA) cho Flink Cluster »