Thiết kế kiến trúc và phân tách luồng dữ liệu HTAP
Chúng ta sẽ thiết lập mô hình Hybrid Transactional/Analytical Processing (HTAP) nơi PostgreSQL đóng vai trò là nguồn dữ liệu duy nhất (Single Source of Truth). Luồng dữ liệu sẽ được chia thành hai đường song song: đường OLTP ghi dữ liệu gốc từ Flink, và đường OLAP xây dựng bảng tổng hợp (Snapshot) dựa trên dữ liệu lịch sử.
Việc phân tách này giúp tránh tình trạng query phân tích nặng (OLAP) làm chậm các giao dịch ghi (OLTP) hoặc làm tắc nghẽn kết nối JDBC của Flink. Flink sẽ thực hiện tính toán dòng chảy (streaming aggregation) và ghi kết quả vào một bảng riêng biệt trong PostgreSQL dành riêng cho đọc.
Cấu trúc bảng trong PostgreSQL
Trước khi triển khai, ta cần tạo hai bảng: một bảng lưu trữ dữ liệu thô (raw events) cho mục đích OLTP và lịch sử, một bảng lưu trữ dữ liệu đã tổng hợp (snapshot) cho dashboard thời gian thực.
Thực thi lệnh SQL sau trên PostgreSQL để tạo schema và bảng:
psql -U postgres -d htap_demo -c "
CREATE TABLE IF NOT EXISTS public.raw_events (
event_id BIGINT PRIMARY KEY,
user_id INT NOT NULL,
event_type VARCHAR(50) NOT NULL,
amount DECIMAL(10, 2),
event_time TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS public.hourly_snapshot (
hour_key BIGINT PRIMARY KEY,
user_id INT NOT NULL,
event_type VARCHAR(50) NOT NULL,
total_amount DECIMAL(10, 2),
event_count BIGINT,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"
Kết quả mong đợi: PostgreSQL trả về thông báo "CREATE TABLE" cho cả hai bảng. Ta đã có nơi chứa dữ liệu chi tiết và nơi chứa dữ liệu tổng hợp.
Cấu hình PostgreSQL cho tải nặng OLAP và Write Stream
PostgreSQL mặc định được tối ưu cho OLTP. Để hỗ trợ đồng thời các query OLAP nặng (quét toàn bộ bảng) và việc ghi dữ liệu liên tục từ Flink, ta cần điều chỉnh các tham số trong postgresql.conf để cân bằng giữa bộ nhớ cache và bộ đệm ghi (WAL).
Tham số shared_buffers cần được tăng để cache dữ liệu OLAP, trong khi wal_buffers và synchronous_commit cần được tinh chỉnh để đảm bảo tốc độ ghi của Flink không bị nghẽn.
Điều chỉnh tham số hiệu năng
Chỉnh sửa file cấu hình /etc/postgresql/16/main/postgresql.conf (phiên bản 16 là mặc định trên Ubuntu 24.04). Thay đổi các giá trị sau để tối ưu cho workload hỗn hợp:
Viết nội dung sau vào file cấu hình (hoặc append nếu file đã có sẵn các tham số này):
cat >> /etc/postgresql/16/main/postgresql.conf
Kết quả mong đợi: File cấu hình được cập nhật. Sau khi restart PostgreSQL, server sẽ phân bổ bộ nhớ lớn hơn cho cache query và giảm độ trễ ghi WAL.
Khởi động lại dịch vụ
Áp dụng các thay đổi cấu hình bằng cách khởi động lại PostgreSQL:
systemctl restart postgresql@16-main
Kết quả mong đợi: Dịch vụ PostgreSQL chạy lại thành công. Kiểm tra trạng thái bằng systemctl status postgresql@16-main thấy trạng thái là active (running).
Xây dựng bảng Snapshot (Materialized View) trong PostgreSQL
Để tối ưu hóa việc đọc dữ liệu lịch sử cho báo cáo, thay vì quét toàn bộ bảng raw_events mỗi lần query, chúng ta sẽ sử dụng cơ chế Materialized View hoặc một bảng tổng hợp được cập nhật định kỳ. Trong kiến trúc HTAP này, bảng hourly_snapshot đóng vai trò là Materialized View logic.
Chúng ta sẽ tạo một Materialized View thật trong PostgreSQL để phục vụ cho các query OLAP cực nhanh, sau đó sẽ dùng Flink để cập nhật nó hoặc sử dụng bảng hourly_snapshot đã tạo ở trên làm nơi chứa dữ liệu streaming.
Tạo Materialized View cho báo cáo
Tạo Materialized View để gom nhóm dữ liệu theo giờ, giúp query dashboard chỉ cần quét số dòng rất nhỏ (số giờ tồn tại) thay vì hàng triệu dòng sự kiện.
psql -U postgres -d htap_demo -c "
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_hourly_summary AS
SELECT
date_trunc('hour', event_time)::bigint AS hour_key,
user_id,
event_type,
sum(amount) AS total_amount,
count(*) AS event_count
FROM raw_events
GROUP BY date_trunc('hour', event_time)::bigint, user_id, event_type;
CREATE INDEX IF NOT EXISTS idx_mv_hourly_summary ON mv_hourly_summary (hour_key, user_id, event_type);
"
Kết quả mong đợi: PostgreSQL tạo Materialized View và Index. Khi dữ liệu mới được ghi vào raw_events, view này sẽ không tự động cập nhật (tính năng của Mat View), nhưng nó phục vụ cực tốt cho các query lịch sử. Flink sẽ cập nhật bảng hourly_snapshot theo thời gian thực để thay thế việc refresh view thủ công.
Tối ưu hóa Query Flink để giảm tải cho PostgreSQL
Vấn đề lớn nhất trong HTAP là khi Flink cần đọc dữ liệu lịch sử (batch) để tính toán snapshot ban đầu, query SELECT * từ PostgreSQL có thể làm tê liệt server. Chúng ta cần cấu hình Flink JDBC Connector để giới hạn độ trễ và sử dụng cơ chế "Incremental Snapshotting" hoặc giới hạn số lượng kết nối.
Trong phần này, chúng ta sẽ viết Flink SQL để đọc từ bảng raw_events với tham số scan.incremental.snapshot.enabled để chỉ đọc các dòng mới thay vì quét toàn bộ bảng mỗi lần job chạy lại.
Định nghĩa Table Source với chế độ Incremental
Viết câu lệnh DDL trong Flink SQL để định nghĩa bảng nguồn. Chúng ta sử dụng tham số scan.incremental.columns để chỉ định Flink chỉ đọc các dòng mới dựa trên timestamp hoặc ID tăng dần.
File script SQL cho Flink (lưu vào /opt/flink/conf/htap_snapshot_job.sql):
cat > /opt/flink/conf/htap_snapshot_job.sql
Kết quả mong đợi: File SQL được tạo. Cấu hình scan.incremental.snapshot.enabled đảm bảo khi job khởi động lại, Flink chỉ lấy dữ liệu mới từ điểm ID cao nhất, tránh quét lại toàn bộ bảng raw_events.
Viết Logic Aggregation cho Snapshot
Sử dụng Flink SQL để tính toán tổng hợp theo giờ và ghi vào bảng đích. Sử dụng hàm TUMBLE hoặc HOP để tạo window, nhưng vì chúng ta muốn snapshot theo giờ cố định, ta sẽ dùng TRUNC logic trong SQL.
cat >> /opt/flink/conf/htap_snapshot_job.sql
Kết quả mong đợi: Logic SQL hoàn chỉnh. Flink sẽ tự động group dữ liệu stream theo giờ và ghi vào bảng hourly_snapshot. Bảng này sẽ luôn chứa dữ liệu mới nhất của giờ hiện tại.
Demo trường hợp sử dụng: Dashboard thời gian thực
Bây giờ chúng ta sẽ chạy toàn bộ pipeline để chứng minh mô hình HTAP hoạt động. Dữ liệu sẽ được tạo ra, Flink sẽ ghi vào bảng OLTP, đồng thời tính toán và cập nhật bảng Snapshot cho dashboard.
Khởi động Job Flink
Chạy script SQL đã tạo trên Flink SQL Client để khởi tạo job xử lý stream và snapshot.
cd /opt/flink && ./bin/sql-client.sh -f /opt/flink/conf/htap_snapshot_job.sql
Kết quả mong đợi: Flink SQL Client hiển thị thông báo Job submitted và ID của job. Job sẽ ở trạng thái RUNNING và bắt đầu lắng nghe các thay đổi trong bảng raw_events.
Mô phỏng dữ liệu ghi vào OLTP
Chạy script Python hoặc SQL để tạo dữ liệu giả lập sự kiện mua hàng liên tục. Đây là phần OLTP: ghi dữ liệu thô.
python3
Kết quả mong đợi: Console in ra 100 dòng xác nhận ghi dữ liệu vào PostgreSQL. Dữ liệu này nằm trong bảng raw_events.
Verify kết quả trên Dashboard (OLAP)
Kiểm tra xem Flink đã xử lý và tổng hợp dữ liệu vào bảng hourly_snapshot chưa. Đây là phần OLAP: đọc dữ liệu đã tổng hợp.
psql -U postgres -d htap_demo -c "SELECT * FROM hourly_snapshot ORDER BY hour_key DESC, user_id DESC LIMIT 10;"
Kết quả mong đợi: PostgreSQL trả về các dòng trong bảng hourly_snapshot với các giá trị total_amount và event_count đã được tính toán chính xác từ 100 sự kiện vừa tạo. Thời gian thực hiện query này cực nhanh (< 10ms) so với việc quét bảng raw_events.
So sánh hiệu năng Query
Thực hiện cùng một câu hỏi trên cả hai bảng để chứng minh sự khác biệt về hiệu năng trong mô hình HTAP.
psql -U postgres -d htap_demo -c "EXPLAIN (ANALYZE, BUFFERS) SELECT sum(amount) FROM raw_events WHERE event_time > NOW() - INTERVAL '1 hour';"
Kết quả mong đợi: Query trên bảng raw_events sẽ có chỉ số Seq Scan (Quét tuần tự) và thời gian thực hiện cao hơn (ví dụ: 50ms - 200ms tùy số lượng dữ liệu).
psql -U postgres -d htap_demo -c "EXPLAIN (ANALYZE, BUFFERS) SELECT sum(total_amount) FROM hourly_snapshot WHERE hour_key = (date_trunc('hour', NOW())::bigint);"
Kết quả mong đợi: Query trên bảng hourly_snapshot sẽ sử dụng Index Scan hoặc Bitmap Index Scan, thời gian thực hiện cực thấp (< 1ms). Điều này chứng tỏ mô hình HTAP đã tách biệt thành công tải OLTP và OLAP.
Điều hướng series:
Mục lục: Series: Triển khai Database HTAP với Apache Flink và PostgreSQL trên Ubuntu 24.04
« Phần 5: Xây dựng pipeline ETL thời gian thực với Flink SQL
Phần 7: Giám sát hiệu năng, xử lý sự cố và mẹo nâng cao »