Cài đặt Driver PostgreSQL JDBC vào thư mục lib của Flink
Bước đầu tiên là cung cấp cho Flink khả năng giao tiếp với PostgreSQL bằng cách đưa file driver JDBC vào thư mục shared của Flink.
Việc này giúp Flink ClassLoader có thể tìm thấy driver mà không cần cấu hình phức tạp trong mỗi job, đảm bảo tính nhất quán cho toàn bộ cluster.
Đầu tiên, hãy tải file driver PostgreSQL JDBC phiên bản tương thích (khuyên dùng phiên bản 42.x) từ kho lưu trữ Maven Central hoặc trang chủ của PostgreSQL.
curl -L -o /tmp/postgresql-42.7.2.jar "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.2/postgresql-42.7.2.jar"
Kết quả mong đợi: File jar được tải về thành công tại thư mục tạm, kích thước khoảng 1.5MB.
Di chuyển file driver vào thư mục lib của Flink. Giả sử đường dẫn cài đặt Flink của bạn là /opt/flink.
sudo mv /tmp/postgresql-42.7.2.jar /opt/flink/lib/
Kết quả mong đợi: File driver nằm trong thư mục /opt/flink/lib/ với quyền sở hữu root hoặc user chạy Flink.
Cần khởi động lại Flink Cluster để Flink tải driver mới vào classpath. Nếu đang chạy JobManager/TaskManager, hãy thực hiện restart.
sudo systemctl restart flink-jobmanager sudo systemctl restart flink-taskmanager
Kết quả mong đợi: Dịch vụ Flink khởi động lại, console không báo lỗi về thiếu class driver PostgreSQL.
Verify kết quả: Truy cập vào trang Dashboard của Flink (thường ở cổng 8081), vào mục "Jobs" hoặc "Config", kiểm tra xem driver có được liệt kê trong thư viện hay không, hoặc chạy thử lệnh SQL đơn giản sau.
Tạo cấu hình Connection String an toàn trong Flink SQL Client
Đừng hardcode thông tin đăng nhập trực tiếp vào lệnh SQL. Thay vào đó, sử dụng cơ chế Catalog của Flink để quản lý thông tin kết nối an toàn và tập trung.
Cách này giúp bạn dễ dàng thay đổi mật khẩu hoặc cấu hình mà không cần sửa lại toàn bộ code SQL, đồng thời hỗ trợ tích hợp với Vault hoặc Secrets Manager sau này.
Trước khi tạo Catalog, hãy đảm bảo bạn đã cài đặt connector JDBC cho Flink. Nếu chưa, hãy thêm vào thư mục lib tương tự như driver PostgreSQL.
curl -L -o /opt/flink/lib/flink-connector-jdbc-3.1.0-1.18.jar "https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.18/flink-connector-jdbc-3.1.0-1.18.jar" sudo mv /opt/flink/lib/flink-connector-jdbc-3.1.0-1.18.jar /opt/flink/lib/
Kết quả mong đợi: Connector JDBC được cài đặt vào thư mục lib.
Khởi động lại Flink để nhận connector mới.
sudo systemctl restart flink-jobmanager sudo systemctl restart flink-taskmanager
Kết quả mong đợi: Flink khởi động lại thành công.
Mở Flink SQL Client và thực hiện lệnh tạo Catalog PostgreSQL. Thay thế các biến user, password, host và port bằng thông tin thực tế của bạn.
CREATE CATALOG postgres_catalog WITH ( 'type' = 'jdbc', 'jdbc.url' = 'jdbc:postgresql://127.0.0.1:5432/flink_db', 'jdbc.driver' = 'org.postgresql.Driver', 'jdbc.username' = 'flink_user', 'jdbc.password' = 'secure_password_123' );
Kết quả mong đợi: Lệnh thực thi thành công, không báo lỗi. Catalog 'postgres_catalog' đã được tạo.
Verify kết quả: Sử dụng lệnh SHOW CATALOGS để kiểm tra xem catalog đã xuất hiện trong danh sách chưa.
SHOW CATALOGS;
Kết quả mong đợi: Danh sách hiển thị 'postgres_catalog' cùng với các catalog mặc định khác (như 'default', 'env').
Tối ưu hóa tham số Batch Size và Write Mode cho Sink PostgreSQL
Để đạt hiệu suất cao trong mô hình HTAP, bạn cần giảm số lượng lần giao dịch (commit) gửi về PostgreSQL. Mặc định Flink có thể commit từng dòng, gây tải cho DB.
Cấu hình tham số 'buffer-flush-interval' và 'buffer-flush-size' cho phép Flink gom nhóm các bản ghi (batch) trước khi gửi, tối ưu hóa throughput.
Định nghĩa một Sink Table với các tham số tối ưu. Tham số 'write-mode' được đặt là 'UPSERT' để hỗ trợ cập nhật dữ liệu thay vì chỉ chèn mới, phù hợp cho HTAP.
Đoạn code sau tạo bảng sink 'user_events_sink' với cấu hình batch size 2048 bản ghi hoặc flush sau 60 giây, whichever đến trước.
CREATE TABLE user_events_sink ( id BIGINT, event_time TIMESTAMP(3), amount DECIMAL(10, 2) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://127.0.0.1:5432/flink_db', 'table-name' = 'user_events', 'username' = 'flink_user', 'password' = 'secure_password_123', 'driver' = 'org.postgresql.Driver', 'write-mode' = 'UPSERT', 'buffer-flush-interval' = '60s', 'buffer-flush-size' = '2048', 'primary-key' = 'id' );
Kết quả mong đợi: Bảng sink được tạo thành công trong Flink SQL Client. Lệnh này chuẩn bị sẵn sàng để nhận dữ liệu từ source.
Verify kết quả: Chạy lệnh INSERT INTO đơn giản để kiểm tra cơ chế batching. Nếu bạn gửi 10 dòng dữ liệu nhỏ, PostgreSQL có thể chưa có dữ liệu ngay lập tức nếu chưa đạt threshold, nhưng sau 60s hoặc đủ 2048 dòng thì dữ liệu sẽ xuất hiện.
INSERT INTO user_events_sink SELECT 1, CURRENT_TIMESTAMP(), 100.00; INSERT INTO user_events_sink SELECT 2, CURRENT_TIMESTAMP(), 200.00;
Kết quả mong đợi: Lệnh INSERT thành công. Dữ liệu sẽ được đẩy vào PostgreSQL sau khi thỏa mãn điều kiện buffer-flush.
Cấu hình Transaction Handling (Exactly-once Semantics)
Trong mô hình HTAP, tính nhất quán (consistency) là yếu tố sống còn. Bạn cần đảm bảo dữ liệu không bị mất (at-least-once) và không bị trùng lặp (at-most-once).
Flink JDBC Connector hỗ trợ cơ chế Two-Phase Commit (2PC) để đạt được Exactly-once semantics, nhưng nó yêu cầu PostgreSQL hỗ trợ và cấu hình đúng.
Để kích hoạt 2PC, bạn phải đảm bảo PostgreSQL của bạn đang chạy với phiên bản hỗ trợ (9.0+) và đã cấu hình commit_mode trong sink. Tuy nhiên, với JDBC connector hiện tại, cách phổ biến nhất để đạt Exactly-once là sử dụng Upsert với Primary Key và đảm bảo Checkpoint của Flink được kích hoạt.
Cấu hình checkpoint trong file flink-conf.yaml là bắt buộc để transaction có hiệu lực. Nếu chưa cấu hình, hãy thêm vào file /opt/flink/conf/flink-conf.yaml.
checkpointing: true checkpoint.interval: 60000 checkpoint.timeout: 600000 checkpoint.minPause: 30000 state.backend: rocksdb state.checkpoints.dir: file:///opt/flink/checkpoints
Kết quả mong đợi: File cấu hình đã được cập nhật. Flink sẽ tự động thực hiện checkpoint định kỳ để lưu trạng thái và commit transaction.
Trong SQL Client, khi tạo sink, hãy đảm bảo sử dụng 'write-mode' = 'UPSERT' và chỉ định 'primary-key'. Điều này giúp Flink sử dụng cơ chế Upsert của PostgreSQL để xử lý sự trùng lặp nếu có retry.
CREATE TABLE orders_sink ( order_id BIGINT, status STRING, created_at TIMESTAMP(3) ) PRIMARY KEY (order_id) NOT ENFORCED WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://127.0.0.1:5432/flink_db', 'table-name' = 'orders', 'username' = 'flink_user', 'password' = 'secure_password_123', 'driver' = 'org.postgresql.Driver', 'write-mode' = 'UPSERT' );
Kết quả mong đợi: Bảng sink được tạo với cơ chế Upsert. Khi Flink checkpoint thành công, transaction sẽ được commit vào PostgreSQL. Nếu Flink fail, rollback sẽ xảy ra.
Verify kết quả: Chạy một job stream đơn giản ghi vào bảng này, sau đó cố tình kill job (Ctrl+C hoặc systemctl stop). Sau đó khởi động lại job cùng dữ liệu. Kiểm tra trong PostgreSQL xem số lượng dòng có tăng gấp đôi không. Nếu Exactly-once hoạt động, số lượng dòng sẽ không đổi (hoặc chỉ tăng phần dữ liệu mới thực sự).
Kiểm tra kết nối và chạy lệnh SQL đơn giản qua Flink
Bước cuối cùng là xác minh toàn bộ luồng dữ liệu từ Flink vào PostgreSQL đã hoạt động trơn tru bằng một lệnh SQL đơn giản.
Chúng ta sẽ tạo một bảng source giả lập (hoặc dùng bảng đã có sẵn), thực hiện phép tính đơn giản và ghi kết quả vào PostgreSQL.
Đầu tiên, tạo một bảng source từ bảng 'user_events' đã có trong PostgreSQL (giả định bạn đã có bảng này trong phần 2 của series).
CREATE TABLE user_events_source ( id BIGINT, event_time TIMESTAMP(3), amount DECIMAL(10, 2) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://127.0.0.1:5432/flink_db', 'table-name' = 'user_events', 'username' = 'flink_user', 'password' = 'secure_password_123', 'driver' = 'org.postgresql.Driver', 'scan.incremental.poll.interval' = '1s', 'scan.incremental.snapshot.enabled' = 'false' );
Kết quả mong đợi: Bảng source được tạo thành công, Flink bắt đầu poll dữ liệu từ PostgreSQL.
Tạo một job SQL để chuyển dữ liệu từ source sang sink đã cấu hình ở phần trước, kèm theo một phép biến đổi đơn giản (ví dụ: làm tròn amount).
INSERT INTO user_events_sink SELECT id, event_time, ROUND(amount, 2) FROM user_events_source;
Kết quả mong đợi: Job bắt đầu chạy. Trong console Flink SQL Client, bạn sẽ thấy trạng thái "Running".
Verify kết quả: Mở terminal mới và truy cập trực tiếp vào PostgreSQL bằng psql để kiểm tra dữ liệu.
psql -U flink_user -d flink_db -c "SELECT * FROM user_events ORDER BY id DESC LIMIT 5;"
Kết quả mong đợi: Bạn thấy các dòng dữ liệu mới nhất được ghi vào bảng 'user_events' từ Flink, với giá trị amount đã được làm tròn. Nếu dữ liệu xuất hiện, nghĩa là kết nối JDBC, connector, driver và transaction đều hoạt động chính xác.
Điều hướng series:
Mục lục: Series: Triển khai Database HTAP với Apache Flink và PostgreSQL trên Ubuntu 24.04
« Phần 3: Triển khai cụm Apache Flink và cấu hình cluster
Phần 5: Xây dựng pipeline ETL thời gian thực với Flink SQL »