Cấu hình Source Table từ Kafka hoặc File Log giả lập
Để mô phỏng dữ liệu thời gian thực, chúng ta sẽ tạo một bảng nguồn (Source Table) trong Flink SQL. Bạn có thể chọn Kafka nếu đã triển khai ở phần trước, hoặc dùng File Source để test nhanh trên môi trường đơn node mà không cần Kafka.
Ở đây, tôi hướng dẫn phương pháp dùng Text File Source để giả lập dòng log đến liên tục. Cách này giúp bạn tập trung vào logic SQL mà không bị vướng vào việc cấu hình Kafka Cluster phức tạp.
Trước tiên, hãy tạo thư mục dữ liệu và file log mẫu trên server Ubuntu.
mkdir -p /var/data/flink-etl
cat > /var/data/flink-etl/raw_logs.txt
Kết quả mong đợi: File raw_logs.txt được tạo thành công với 6 dòng dữ liệu mẫu, định dạng phân cách bằng dấu |.
Tạo Source Table trong Flink SQL Client
Bây giờ, mở Flink SQL Client (hoặc chạy script SQL) để định nghĩa bảng nguồn. Chúng ta sẽ dùng filesystem connector để đọc file này như một stream dữ liệu.
Cấu hình này yêu cầu Flink đọc file theo chế độ append, tức là chỉ đọc các dòng mới được thêm vào, giúp mô phỏng hành vi stream thời gian thực.
CREATE TABLE raw_logs (
event_time TIMESTAMP(3) AS TO_TIMESTAMP(event_time_str, 'yyyy-MM-dd HH:mm:ss'),
event_time_str STRING,
user_id STRING,
action_type STRING,
ip_address STRING,
details STRING,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'filesystem',
'path' = '/var/data/flink-etl/raw_logs.txt',
'format' = 'csv',
'csv.delimiter' = '|',
'scan.incremental.files.poll-interval' = '1000',
'scan.incremental.files.partitioned-by-time' = 'false',
'read.start.mode' = 'earliest-offset'
);
Kết quả mong đợi: Flink SQL Client báo CREATE TABLE thành công. Bảng raw_logs đã sẵn sàng để nhận dữ liệu từ file trên disk.
Viết câu lệnh Flink SQL để làm sạch và chuyển đổi dữ liệu
Trong môi trường HTAP, dữ liệu nguồn thường "bẩn" (thiếu trường, định dạng sai, hoặc chứa ký tự đặc biệt). Bước này chúng ta sẽ viết một truy vấn SQL để lọc và chuẩn hóa dữ liệu trước khi đưa vào kho dữ liệu.
Chúng ta sẽ tạo một view ảo hoặc bảng trung gian để thực hiện các phép biến đổi: lọc bỏ dòng lỗi, chuẩn hóa chuỗi, và trích xuất giá trị số từ trường details.
CREATE VIEW cleaned_events AS
SELECT
event_time,
user_id,
UPPER(action_type) AS action_type,
ip_address,
CASE
WHEN details = 'success' THEN 0
WHEN details = 'failed' THEN 0
ELSE CAST(details AS BIGINT)
END AS amount
FROM raw_logs
WHERE action_type IN ('LOGIN', 'PURCHASE', 'BROWSE')
AND user_id IS NOT NULL
AND ip_address IS NOT NULL;
Kết quả mong đợi: View cleaned_events được tạo. Khi bạn chạy SELECT * FROM cleaned_events, bạn sẽ thấy dữ liệu đã được chuyển đổi: action_type viết hoa, trường amount đã chuyển từ string sang số (BIGINT), và các dòng không hợp lệ bị loại bỏ.
Verify kết quả làm sạch
Chạy lệnh kiểm tra để đảm bảo logic CASE WHEN và WHERE hoạt động chính xác.
SELECT * FROM cleaned_events;
Kết quả mong đợi: Trả về 5 dòng dữ liệu (loại bỏ dòng login failed nếu logic yêu cầu, hoặc giữ lại tùy thuộc vào cách bạn định nghĩa amount. Ở đây dòng 'failed' sẽ có amount = 0). Các giá trị string đã được chuẩn hóa.
Sử dụng hàm UDF để xử lý logic phức tạp
Flink SQL mặc định hỗ trợ các hàm cơ bản, nhưng để xử lý logic nghiệp vụ phức tạp (ví dụ: mã hóa IP, tính điểm tín dụng, hoặc phân tích regex nâng cao), chúng ta cần viết User Defined Function (UDF) bằng Java hoặc Scala.
Chúng ta sẽ viết một UDF Java đơn giản để mã hóa IP address (masking) nhằm tuân thủ quy định bảo mật trước khi lưu vào PostgreSQL.
Bước 1: Viết và biên dịch UDF Java
Tạo file Java IpMasker.java trên server. Logic: Lấy phần cuối của IP và thay thế các phần trước bằng dấu *.
cat > /home/flink/udf/IpMasker.java
Kết quả mong đợi: File IpMasker.java được tạo.
Bước 2: Biên dịch và tạo JAR
Sử dụng javac (cần JDK đã cài ở phần 1) để biên dịch file này thành .class và đóng gói thành .jar để Flink load.
cd /home/flink/udf
javac IpMasker.java
jar cvf ip-masker-udf.jar IpMasker.class
Kết quả mong đợi: File ip-masker-udf.jar được tạo thành công trong thư mục /home/flink/udf.
Bước 3: Load UDF và sử dụng trong Flink SQL
Vào Flink SQL Client, load file JAR vừa tạo và đăng ký hàm vào namespace của Flink.
LOAD 'ip-masker-udf.jar' AS 'udf-jar';
CREATE FUNCTION mask_ip AS 'IpMasker';
Kết quả mong đợi: Flink báo LOAD và CREATE FUNCTION thành công. Hàm mask_ip đã sẵn sàng để dùng trong query.
Bước 4: Áp dụng UDF vào pipeline
Viết lại truy vấn để sử dụng hàm mask_ip trên trường ip_address trước khi lưu vào đích.
CREATE VIEW secured_events AS
SELECT
event_time,
user_id,
action_type,
mask_ip(ip_address) AS masked_ip,
amount
FROM cleaned_events;
Kết quả mong đợi: View secured_events được tạo. Khi query, trường IP sẽ hiển thị dạng ***.***.***.10.
Cấu hình Windowing để tính toán tổng hợp theo thời gian
Trong mô hình HTAP, chúng ta thường cần tính toán chỉ số thời gian thực (Real-time Aggregation), ví dụ: tổng doanh thu trong 1 phút qua. Flink SQL hỗ trợ điều này thông qua OVER và GROUP BY kết hợp với Window.
Chúng ta sẽ sử dụng Tumbling Window (cửa sổ trượt không chồng lấn) với kích thước 1 phút để tổng hợp doanh thu theo từng khoảng thời gian.
Viết câu lệnh Window Aggregation
Tạo một bảng ảo (hoặc View) để tính tổng amount và đếm số lượng giao dịch (COUNT) cho mỗi user_id trong từng cửa sổ 1 phút.
CREATE VIEW user_stats_window AS
SELECT
TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
user_id,
SUM(amount) AS total_amount,
COUNT(*) AS transaction_count
FROM cleaned_events
GROUP BY
TUMBLE(event_time, INTERVAL '1' MINUTE),
user_id;
Kết quả mong đợi: View user_stats_window được tạo. Khi dữ liệu mới vào, Flink sẽ tự động nhóm chúng theo phút và tính toán tổng hợp.
Verify kết quả Window
Chạy query để xem dữ liệu đã được nhóm như thế nào.
SELECT * FROM user_stats_window;
Kết quả mong đợi: Trả về các dòng với trường window_end (thời điểm kết thúc cửa sổ), user_id, và các giá trị tổng hợp. Ví dụ: 2024-05-20 10:01:00 | USER_02 | 1700000 | 2.
Đẩy dữ liệu đã xử lý vào bảng đích trên PostgreSQL
Bước cuối cùng là kết nối Flink với PostgreSQL để lưu trữ dữ liệu đã được làm sạch, mã hóa và tổng hợp. Đây là bước chuyển đổi từ OLTP (stream) sang OLAP (table) trong mô hình HTAP.
Tạo bảng đích trên PostgreSQL
Vào PostgreSQL Client (psql) để tạo bảng đích. Cấu trúc bảng phải khớp với kết quả của view user_stats_window hoặc secured_events tùy vào mục đích lưu trữ.
Ở đây, chúng ta tạo bảng để lưu kết quả tổng hợp (OLAP style).
psql -U flink_user -d etl_db -c "
CREATE TABLE IF NOT EXISTS user_stats_realtime (
window_end TIMESTAMP,
user_id VARCHAR(50),
total_amount BIGINT,
transaction_count BIGINT,
PRIMARY KEY (window_end, user_id)
);
"
Kết quả mong đợi: PostgreSQL báo CREATE TABLE thành công.
Tạo Sink Table trong Flink SQL
Quay lại Flink SQL Client, định nghĩa bảng đích (Sink Table) sử dụng jdbc connector. Cấu hình này sẽ tự động map các trường từ Flink sang PostgreSQL.
Chú ý: Cần chỉ định đúng url, user, password và table-name đã tạo ở bước trên.
CREATE TABLE pg_user_stats (
window_end TIMESTAMP(3),
user_id STRING,
total_amount BIGINT,
transaction_count BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/etl_db',
'table-name' = 'user_stats_realtime',
'username' = 'flink_user',
'password' = 'flink_password',
'driver' = 'org.postgresql.Driver',
'execution.parallelism' = '1',
'sink.buffer-flush.interval' = '5000',
'sink.buffer-flush.max-batch-size' = '256'
);
Kết quả mong đợi: Flink SQL Client báo CREATE TABLE thành công. Bảng pg_user_stats đã sẵn sàng để nhận dữ liệu.
Chạy Pipeline ETL (INSERT INTO)
Khởi chạy query INSERT để đẩy dữ liệu từ View tổng hợp vào bảng PostgreSQL. Đây là bước kích hoạt pipeline chạy liên tục (streaming).
INSERT INTO pg_user_stats
SELECT
window_end,
user_id,
total_amount,
transaction_count
FROM user_stats_window;
Kết quả mong đợi: Flink bắt đầu chạy job. Bạn sẽ thấy log Job has been started. Dữ liệu sẽ được đẩy vào PostgreSQL theo từng batch (5 giây hoặc 256 dòng tùy config).
Verify dữ liệu trong PostgreSQL
Để xác nhận pipeline hoạt động, hãy truy vấn trực tiếp vào PostgreSQL để xem dữ liệu đã được ghi vào chưa.
psql -U flink_user -d etl_db -c "SELECT * FROM user_stats_realtime ORDER BY window_end DESC LIMIT 5;"
Kết quả mong đợi: Trả về các dòng dữ liệu tương ứng với kết quả tính toán từ Flink. Các giá trị total_amount và transaction_count phải khớp với dữ liệu đầu vào đã được làm sạch và tổng hợp.
Điều hướng series:
Mục lục: Series: Triển khai Database HTAP với Apache Flink và PostgreSQL trên Ubuntu 24.04
« Phần 4: Tích hợp Flink với PostgreSQL thông qua JDBC Connector
Phần 6: Triển khai mô hình HTAP: Kết hợp truy vấn OLAP và ghi OLTP »