Cấu hình State Backend RocksDB cho phép Join và CEP
Để xử lý các phép Join giữa hai stream và các pattern phức tạp của CEP, Flink cần lưu trữ state khổng lồ. Mặc định, Flink dùng MemoryStateBackend, không đủ cho sản xuất.
Chúng ta cần chuyển sang RocksDB State Backend. RocksDB lưu state trên ổ cứng (local disk) và chỉ cache phần quan trọng vào RAM, giúp xử lý state terabytes mà không bị OOM (Out of Memory).
Bước 1: Cập nhật flink-conf.yaml
Chỉnh sửa file cấu hình chính của Flink để bật RocksDB và tối ưu hóa tham số cho việc lưu state lớn.
Đường dẫn file: /opt/flink/conf/flink-conf.yaml
Thay thế hoặc thêm các dòng sau vào cuối file:
state.backend: rocksdb
state.checkpoints.dir: file:///opt/flink/checkpoints
state.savepoints.dir: file:///opt/flink/savepoints
state.checkpoints.num-retained: 3
state.checkpoints.min-pause: 60s
rocksdb.memory.managed: true
rocksdb.memory.max-per-operator: 1024m
rocksdb.increase-memory-on-checkpoint: true
rocksdb.incremental-checkpoint-enabled: true
Kết quả mong đợi: Khi khởi động lại Flink, log sẽ ghi nhận việc sử dụng RocksDB State Backend. Job chạy sẽ không bị sập khi state lớn.
Bước 2: Tạo thư mục lưu state và checkpoint
Tạo các thư mục vật lý tương ứng với đường dẫn đã khai báo trong file config để Flink có nơi ghi dữ liệu.
sudo mkdir -p /opt/flink/checkpoints /opt/flink/savepoints
sudo chown -R flink:flink /opt/flink/checkpoints /opt/flink/savepoints
chmod -R 755 /opt/flink/checkpoints /opt/flink/savepoints
Kết quả mong đợi: Các thư mục được tạo thành công, user 'flink' có quyền ghi dữ liệu vào.
Thực hiện Stream-to-Stream Join (Interval Join)
Trong môi trường stream, việc Join hai bảng dựa trên timestamp là phức tạp vì sự kiện đến không đồng bộ. Interval Join (Interval Join) cho phép chúng ta chỉ Join các sự kiện nằm trong một khoảng thời gian nhất định (ví dụ: 5 phút) kể từ khi sự kiện đầu tiên đến.
Kỹ thuật này giúp giảm thiểu lượng state lưu trữ so với Tumbling Join và xử lý hiệu quả dữ liệu bị trễ (late data).
Bước 1: Chuẩn bị Dữ liệu Giả lập (Mock Data)
Chúng ta cần tạo 2 topic Kafka: một cho "Giao dịch" (Transactions) và một cho "Thông tin khách hàng" (Customers). Sử dụng script Python đơn giản để đẩy dữ liệu vào Kafka.
Tạo file /home/flink/scripts/generate_join_data.py với nội dung:
#!/usr/bin/env python3
import json, time, random, os
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
topics = ['transactions', 'customers']
for _ in range(100):
# Tạo sự kiện khách hàng
cust_id = random.randint(1000, 9999)
cust_data = {
"cust_id": cust_id,
"name": f"Customer_{cust_id}",
"tier": random.choice(["Gold", "Silver", "Bronze"]),
"event_time": int(time.time() * 1000) - random.randint(0, 300000) # Trễ tối đa 5 phút
}
producer.send('customers', cust_data)
# Tạo sự kiện giao dịch liên quan đến khách hàng
txn_data = {
"txn_id": f"TXN_{random.randint(10000, 99999)}",
"cust_id": cust_id,
"amount": round(random.uniform(100, 50000), 2),
"currency": "VND",
"event_time": int(time.time() * 1000) - random.randint(0, 300000) # Trễ tối đa 5 phút
}
producer.send('transactions', txn_data)
time.sleep(0.5)
print("Data generation finished.")
Kết quả mong đợi: Script chạy và đẩy 100 cặp sự kiện vào Kafka, các sự kiện có timestamp trong quá khứ (để test Interval Join).
Bước 2: Viết Query SQL cho Interval Join
Chúng ta sẽ sử dụng Flink SQL để định nghĩa 2 bảng nguồn, tạo watermark, và thực hiện Interval Join.
Mở SQL Client: flink run -m local -c com.alibaba.flink.sqldriver.SqlClient -Dflink.query.default.parallelism=2
Chạy các lệnh SQL sau trong SQL Client:
-- Định nghĩa bảng Transactions
CREATE TABLE transactions (
txn_id STRING,
cust_id INT,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-join-group',
'format' = 'json'
);
-- Định nghĩa bảng Customers
CREATE TABLE customers (
cust_id INT,
name STRING,
tier STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'customers',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-join-group',
'format' = 'json'
);
-- Thực hiện Interval Join
-- Join khi timestamp của transaction nằm trong khoảng [customer.event_time - 5s, customer.event_time + 5s]
SELECT
t.txn_id,
c.name,
t.amount,
t.event_time
FROM transactions AS t
INNER JOIN customers AS c
ON t.cust_id = c.cust_id
AND t.event_time BETWEEN c.event_time - INTERVAL '5' SECOND AND c.event_time + INTERVAL '5' SECOND;
Kết quả mong đợi: Flink sẽ in ra các dòng kết quả chứa txn_id, name, amount. Chỉ những giao dịch có khách hàng khớp trong khoảng thời gian 5 giây mới được in. Các sự kiện không khớp sẽ bị lọc bỏ hoặc giữ trong state cho đến khi hết khoảng thời gian.
Bước 3: Verify kết quả Join
Chạy script generate_data.py ở trên để đẩy dữ liệu mới. Quan sát output trong SQL Client.
Để kiểm tra xem state có hoạt động không, hãy gửi một sự kiện "Giao dịch" trước, sau đó gửi sự kiện "Khách hàng" tương ứng sau 3 giây. Nếu Interval Join hoạt động đúng, kết quả vẫn sẽ xuất hiện.
Kết quả mong đợi: Thấy dòng log khớp dữ liệu ngay cả khi thứ tự đến của sự kiện bị đảo lộn (trong khoảng 5s).
Xử lý Pattern phức tạp với CEP (Complex Event Processing)
CEP cho phép phát hiện các chuỗi sự kiện có ý nghĩa theo thời gian. Ví dụ: Phát hiện gian lận khi một khách hàng thực hiện 3 giao dịch lớn trong vòng 1 phút.
Flink SQL hỗ trợ CEP thông qua toán tử `MATCH_RECOGNIZE` (theo chuẩn SQL:2016).
Bước 1: Định nghĩa Pattern trong SQL
Chúng ta sẽ tạo một query để phát hiện pattern: "Giao dịch A (lớn) -> Giao dịch B (lớn) -> Giao dịch C (lớn)" của cùng một khách hàng trong vòng 60 giây.
Trong SQL Client, chạy lệnh sau:
SELECT
cust_id,
SUM(amount) as total_amount,
MIN(event_time) as start_time,
MAX(event_time) as end_time,
COUNT(*) as event_count
FROM (
SELECT *
FROM transactions
WHERE amount > 10000 -- Lọc chỉ các giao dịch lớn
) AS filtered_txns
MATCH_RECOGNIZE (
PARTITION BY cust_id
ORDER BY event_time
MEASURES
A.event_time AS start_time,
C.event_time AS end_time,
A.amount + B.amount + C.amount AS total_amount
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATERN (A B C)
DEFINE
A AS A.amount > 10000,
B AS B.amount > 10000 AND B.event_time > A.event_time + INTERVAL '10' SECOND,
C AS C.amount > 10000 AND C.event_time > B.event_time + INTERVAL '10' SECOND
);
Lưu ý: Đoạn code trên dùng cú pháp chuẩn. Tuy nhiên, để đơn giản hơn cho ví dụ thực tế, chúng ta sẽ dùng một pattern cụ thể hơn: 3 giao dịch liên tiếp.
Chạy lại với cú pháp tối ưu cho Flink SQL:
SELECT
cust_id,
SUM(amount) as total_suspicious_amount,
MIN(event_time) as start_ts,
MAX(event_time) as end_ts
FROM (
SELECT *
FROM transactions
WHERE amount > 5000
) AS high_value_txns
MATCH_RECOGNIZE (
PARTITION BY cust_id
ORDER BY event_time
MEASURES
A.amount + B.amount + C.amount AS total_suspicious_amount,
A.event_time AS start_ts,
C.event_time AS end_ts
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A B C)
DEFINE
A AS TRUE,
B AS TRUE,
C AS TRUE
)
WHERE (end_ts - start_ts) < INTERVAL '60' SECOND;
Kết quả mong đợi: Query chạy, nhưng chưa có kết quả vì chưa có dữ liệu đủ điều kiện. Flink sẽ lưu state để chờ đủ 3 sự kiện cho mỗi khách hàng.
Bước 2: Tạo Script đẩy dữ liệu Pattern
Tạo script /home/flink/scripts/generate_cep_data.py để đẩy 3 giao dịch liên tiếp cho một khách hàng giả định (cust_id = 8888) trong vòng 30 giây.
#!/usr/bin/env python3
import json, time
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
cust_id = 8888
base_time = int(time.time() * 1000)
# Pattern: 3 giao dịch lớn cách nhau 10s
for i in range(3):
txn = {
"txn_id": f"CEP_{cust_id}_{i}",
"cust_id": cust_id,
"amount": 15000.00,
"event_time": base_time + (i * 10000) # Cách nhau 10s
}
producer.send('transactions', txn)
time.sleep(1)
print(f"Sent CEP pattern for cust_id {cust_id}")
Kết quả mong đợi: Script chạy và đẩy 3 sự kiện vào topic 'transactions' với timestamp tăng dần.
Bước 3: Verify kết quả CEP
Chạy script generate_cep_data.py.
Quan sát SQL Client. Bạn sẽ thấy một dòng kết quả xuất hiện sau khi sự kiện thứ 3 được xử lý.
Dòng kết quả sẽ chứa: cust_id = 8888, total_suspicious_amount = 45000.00, start_ts và end_ts cách nhau khoảng 20s.
Kết quả mong đợi: Flink phát hiện pattern "A-B-C" và in ra tổng số tiền, xác nhận logic CEP hoạt động chính xác.
Quản lý và Giám sát State trong CEP
Khi chạy CEP và Join, state (trạng thái) của Flink sẽ tăng lên rất nhanh. Nếu không giám sát, server có thể hết bộ nhớ hoặc ổ cứng.
Bước 1: Kiểm tra dung lượng State qua Web UI
Mở trình duyệt, truy cập: http://localhost:8081
Đi vào tab Jobs -> Chọn job SQL đang chạy -> Tab State.
Kiểm tra các chỉ số:
- State Size (Bytes): Tổng dung lượng state hiện tại.
- State Growth Rate: Tốc độ tăng state.
- Number of Checkpoints: Số lượng checkpoint đã thực hiện thành công.
Kết quả mong đợi: State Size tăng lên sau khi bạn chạy script CEP. Nếu state tăng quá nhanh (>100MB/s) với dữ liệu nhỏ, cần xem lại logic Watermark hoặc Pattern.
Bước 2: Trigger Checkpoint thủ công
Để đảm bảo dữ liệu state được lưu an toàn vào RocksDB và Kafka, hãy kích hoạt checkpoint thủ công.
Vào Web UI -> Tab Checkpoints -> Click nút Trigger Checkpoint.
Hoặc dùng command line:
flink checkpoint -t 10000
Kết quả mong đợi: Trạng thái checkpoint chuyển từ "Running" sang "Completed". Dung lượng file checkpoint trên thư mục /opt/flink/checkpoints tăng lên.
Bước 3: Dọn dẹp State khi không cần thiết
Nếu muốn xóa state của một job cụ thể (ví dụ: test xong), dùng lệnh cancel và xóa savepoint.
flink cancel
rm -rf /opt/flink/checkpoints/*
rm -rf /opt/flink/savepoints/*
Kết quả mong đợi: Job dừng, thư mục checkpoints và savepoints trống, giải phóng dung lượng ổ đĩa.
Điều hướng series:
Mục lục: Series: Triển khai Database Stream Processing với Apache Flink SQL và Ubuntu 24.04
« Phần 4: Xử lý dữ liệu stream thời gian thực với Windowing và Watermark
Phần 6: Tối ưu hóa hiệu năng và xử lý lỗi cho sản xuất »