Hiểu cơ chế Event Time và Processing Time trong Flink
Flink xử lý thời gian dựa trên hai khái niệm cốt lõi: Event Time (thời điểm sự kiện thực tế xảy ra, thường nằm trong payload) và Processing Time (thời điểm Flink nhận và xử lý sự kiện đó).
Khi làm việc với dữ liệu stream từ Kafka, chúng ta ưu tiên Event Time để đảm bảo tính chính xác của kết quả tính toán, đặc biệt khi dữ liệu bị trễ hoặc đến không theo thứ tự.
Verification: Nếu bạn chọn Processing Time, kết quả sẽ phụ thuộc vào tốc độ xử lý của server. Nếu chọn Event Time, kết quả phản ánh đúng thực tế kinh doanh dù dữ liệu có đến trễ.
Cấu hình Watermark Strategy cho dữ liệu đến trễ
Watermark là một con dấu thời gian đặc biệt của Flink dùng để đánh dấu "thời gian hiện tại" của stream, giúp hệ thống biết khi nào dữ liệu được coi là "đã đến hết" trong một khoảng thời gian.
Để Flink tự động sinh ra Watermark từ timestamp trong dữ liệu Kafka, chúng ta cần khai báo WATERMARKS trong định nghĩa bảng (Table DDL).
Cấu hình phổ biến nhất là bounded out-of-orderness (hạn chế độ trễ), cho phép dữ liệu đến trễ tối đa X giây so với thời gian hiện tại.
Giả sử dữ liệu doanh thu có timestamp ở cột event_time, chúng ta sẽ cấu hình watermark trễ 5 giây để xử lý dữ liệu bị delay nhẹ.
Đoạn DDL sau định nghĩa bảng nguồn Kafka với cơ chế Watermark:
CREATE TABLE kafka_sales (
transaction_id VARCHAR PRIMARY KEY NOT ENFORCED,
amount DECIMAL(10, 2),
event_time TIMESTAMP(3),
WATERMARKS FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sales-stream',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-sql-consumer',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);
Kết quả mong đợi: Flink sẽ đọc timestamp từ JSON, chuyển thành TIMESTAMP và tự động tính toán watermark dựa trên công thức event_time - 5s. Dữ liệu đến muộn hơn 5 giây so với watermark hiện tại sẽ bị coi là quá hạn (late data) và bị bỏ qua hoặc xử lý riêng tùy chiến lược.
Triển khai Tumbling Window (Cửa sổ trượt không giao nhau)
Tumbling Window chia dòng thời gian thành các khoảng cố định không giao nhau (ví dụ: 0-1 phút, 1-2 phút). Mỗi sự kiện chỉ thuộc về đúng một window.
Đây là dạng window phổ biến nhất để tính tổng doanh thu theo phút, theo giờ.
Để sử dụng Tumbling Window trong Flink SQL, ta dùng hàm TUMBLE trong mệnh đề GROUP BY.
Câu lệnh SQL sau sẽ tính tổng doanh thu (amount) cho mỗi cửa sổ 1 phút dựa trên Event Time:
CREATE VIEW sales_per_minute AS
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
SUM(amount) AS total_revenue
FROM kafka_sales
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE);
Kết quả mong đợi: Flink sẽ chờ watermark vượt qua thời điểm kết thúc của window (ví dụ: nếu window là 10:00-10:01, Flink sẽ chờ watermark > 10:01:05 do cấu hình 5s trễ) rồi mới xuất kết quả tổng doanh thu của phút đó.
Triển khai Sliding Window (Cửa sổ trượt có giao nhau)
Sliding Window khác Tumbling Window ở chỗ các window có thể giao nhau. Nó được định nghĩa bởi Window Size (kích thước cửa sổ) và Slide Interval (tần suất trượt).
Ví dụ: Window Size 5 phút, Slide Interval 1 phút. Mỗi phút, Flink sẽ tính toán lại tổng doanh thu của 5 phút gần nhất.
Điều này hữu ích khi bạn cần theo dõi xu hướng (trend) liên tục thay vì chỉ các khoảng rời rạc.
Câu lệnh SQL sử dụng hàm HOP để tạo Sliding Window:
CREATE VIEW sales_sliding_5min AS
SELECT
HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS window_start,
SUM(amount) AS total_revenue
FROM kafka_sales
GROUP BY HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE);
Kết quả mong đợi: Kết quả sẽ xuất hiện mỗi phút (Slide 1 phút), mỗi dòng chứa tổng doanh thu của khoảng thời gian 5 phút trước đó. Ví dụ: lúc 10:05, kết quả là tổng doanh thu từ 10:00 đến 10:05.
Triển khai Query tính tổng doanh thu theo phút từ stream Kafka
Bây giờ chúng ta sẽ viết câu lệnh SQL hoàn chỉnh để tính tổng doanh thu theo phút (Tumbling Window) và xuất kết quả ra bảng đích (ví dụ: Console Sink hoặc Kafka Sink).
Bước này kết hợp: Định nghĩa nguồn Kafka, cấu hình Watermark, áp dụng Tumbling Window và xuất kết quả.
Trước tiên, tạo bảng đích để lưu kết quả (ví dụ lưu vào console để debug hoặc vào Kafka topic kết quả):
CREATE TABLE console_sink (
window_start TIMESTAMP(3),
total_revenue DECIMAL(10, 2)
) WITH (
'connector' = 'print',
'format' = 'json'
);
Kết quả mong đợi: Bảng console_sink được tạo, sẵn sàng nhận dữ liệu từ query.
Thực thi câu lệnh INSERT để chạy job stream processing:
INSERT INTO console_sink
SELECT
window_start,
total_revenue
FROM (
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
SUM(amount) AS total_revenue
FROM kafka_sales
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)
);
Kết quả mong đợi: Job sẽ chạy ở trạng thái RUNNING. Khi bạn gửi dữ liệu test vào topic sales-stream (với timestamp trong quá khứ hoặc hiện tại), sau khi window đóng (Event Time + 5s delay), Flink sẽ in ra console kết quả dạng JSON: {"window_start":"2024-05-21T10:00:00.000","total_revenue":1500.50}.
Verify kết quả và kiểm tra xử lý dữ liệu trễ
Để xác nhận cơ chế Watermark và Window hoạt động đúng, thực hiện các bước kiểm tra sau:
- Gửi 3 sự kiện vào Kafka topic
sales-stream với timestamp cách nhau 30 giây (ví dụ: 10:00:00, 10:00:30, 10:01:00).
- Quan sát console output của Flink. Kết quả sẽ KHÔNG xuất hiện ngay lập tức.
- Chờ thêm 5 giây (theo cấu hình watermark). Khi watermark vượt qua 10:01:05, Flink sẽ in ra kết quả của window 10:00-10:01.
- Gửi thêm 1 sự kiện có timestamp là 10:00:45 (đến trễ hơn 5 giây so với watermark hiện tại đang chạy ở 10:01:05).
Kết quả kiểm tra: Sự kiện 10:00:45 sẽ không được cộng vào tổng của window 10:00-10:01 vì nó đã quá hạn (late data) do cơ chế Watermark event_time - INTERVAL '5' SECOND. Điều này chứng tỏ Flink đang xử lý đúng theo Event Time và Watermark strategy.
Điều hướng series:
Mục lục: Series: Triển khai Database Stream Processing với Apache Flink SQL và Ubuntu 24.04
« Phần 3: Thiết lập kết nối với PostgreSQL và Kafka trong Flink SQL
Phần 5: Xử lý dữ liệu phức tạp: Join Stream và CEP Pattern »