Cài đặt PostgreSQL và tạo bảng mẫu
Chúng ta cần một kho chứa dữ liệu đích (sink) để lưu trữ kết quả xử lý từ Flink. PostgreSQL sẽ đóng vai trò này.
Cài đặt PostgreSQL phiên bản mới nhất từ kho mặc định của Ubuntu 24.04.
sudo apt update && sudo apt install -y postgresql postgresql-contrib
Hệ thống sẽ cài đặt gói và khởi động dịch vụ PostgreSQL tự động. Kiểm tra trạng thái dịch vụ bằng lệnh systemctl status postgresql, kết quả mong đợi là active (running).
Chuyển sang user postgres để thực hiện các cấu hình cơ sở dữ liệu.
sudo -u postgres psql
Bạn sẽ vào được shell của PostgreSQL với prompt postgres=#. Đây là nơi thực thi các lệnh SQL.
Tạo cơ sở dữ liệu mới tên flink_demo và user flink_user với mật khẩu flink_pass.
CREATE DATABASE flink_demo;
CREATE USER flink_user WITH PASSWORD 'flink_pass';
GRANT ALL PRIVILEGES ON DATABASE flink_demo TO flink_user;
Không có lỗi báo cáo, lệnh thực thi thành công và trở về prompt postgres=#.
Chuyển sang database vừa tạo để định nghĩa bảng đích.
\c flink_demo
Chuyển đổi ngữ cảnh thành công, prompt thay đổi thành flink_demo=#.
Tạo bảng user_events để lưu dữ liệu sự kiện người dùng.
CREATE TABLE user_events (
event_id BIGINT PRIMARY KEY,
user_id VARCHAR(50) NOT NULL,
event_type VARCHAR(50) NOT NULL,
event_data TEXT,
event_time TIMESTAMP NOT NULL
);
Bảng được tạo thành công. Kiểm tra bằng lệnh \d user_events để xem cấu trúc cột.
Thoát khỏi shell PostgreSQL.
\q
Trao quyền điều khiển trở lại terminal của bạn. Bây giờ PostgreSQL đã sẵn sàng nhận dữ liệu từ Flink.
Verify kết quả PostgreSQL
Chạy lệnh sau để đảm bảo user mới có thể kết nối và truy vấn bảng.
PGPASSWORD=flink_pass psql -h localhost -U flink_user -d flink_demo -c "SELECT * FROM user_events;"
Kết quả mong đợi là một bảng rỗng ((0 rows)) nhưng không có lỗi kết nối.
Cấu hình connector PostgreSQL-JDBC trong Flink SQL
Thêm JAR connector vào thư mục classpath
Flink SQL cần thư viện JDBC để giao tiếp với PostgreSQL. Chúng ta sẽ tải connector chính thức từ Maven Central.
Xác định phiên bản Flink đang chạy (giả sử là 1.18.1 theo series) và tải file JAR tương ứng.
FLINK_VERSION=1.18.1
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/${FLINK_VERSION}/flink-connector-jdbc-${FLINK_VERSION}.jar -P /opt/flink/lib/
File JAR được tải về thư mục /opt/flink/lib/. Đây là nơi Flink tự động scan và load các connector.
Khởi động lại Flink Cluster để nạp connector mới.
systemctl restart flink-jobmanager
systemctl restart flink-taskmanager
Cả JobManager và TaskManager sẽ khởi động lại. Kiểm tra lại Web UI (http://localhost:8081) để đảm bảo cluster hoạt động bình thường.
Verify kết quả Connector
Mở Flink SQL Client để kiểm tra khả năng kết nối.
/opt/flink/bin/sql-client.sh local
Chạy lệnh SQL sau để thử kết nối (sẽ thành công nếu connector đã load đúng).
SELECT 1 AS test;
Kết quả trả về 1 mà không báo lỗi ClassNotFoundException liên quan đến PostgreSQL driver.
Cài đặt Apache Kafka và Zookeeper trên cùng host
Chuẩn bị thư mục và cài đặt Zookeeper
Kafka cần Zookeeper để quản lý metadata. Chúng ta sẽ cài đặt cả hai trên cùng một máy chủ Ubuntu.
Tạo thư mục dữ liệu và tải Kafka (phiên bản 3.6.0 tương thích tốt với Flink 1.18).
mkdir -p /opt/kafka
cd /opt
wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
mv kafka_2.13-3.6.0 /opt/kafka
File archive được giải nén và di chuyển vào /opt/kafka.
Cấu hình file zookeeper.properties cho Zookeeper.
cat > /opt/kafka/config/zookeeper.properties
File cấu hình được ghi vào đúng đường dẫn. Tạo thư mục data nếu chưa tồn tại bằng lệnh sudo mkdir -p /var/lib/zookeeper.
Khởi động Zookeeper bằng script shell (chạy trong background).
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
Zookeeper bắt đầu chạy. Chờ khoảng 5-10 giây để nó khởi động hoàn tất.
Cấu hình và khởi động Kafka Broker
Cấu hình file server.properties để Kafka lắng nghe kết nối từ Flink.
cat > /opt/kafka/config/server.properties
File cấu hình được ghi đè. Lưu ý advertised.listeners chỉ định localhost vì Flink và Kafka chạy cùng máy.
Tạo thư mục log và khởi động Kafka Broker.
sudo mkdir -p /var/lib/kafka-logs
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Kafka Broker bắt đầu chạy. Kiểm tra log bằng lệnh tail -f /opt/kafka/logs/kafka.log để đảm bảo không có lỗi OutOfMemoryError hoặc BindException.
Tạo Topic mẫu cho Flink
Tạo topic user_events_source để chứa dữ liệu đầu vào.
/opt/kafka/bin/kafka-topics.sh --create --topic user_events_source --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
Topic được tạo thành công với 3 partitions. Xác nhận bằng lệnh /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092.
Verify kết quả Kafka
Gửi một tin nhắn mẫu vào topic để kiểm tra tính sẵn sàng.
echo '{"event_id": 1, "user_id": "user_001", "event_type": "click", "event_data": "home_page", "event_time": "2023-10-27T10:00:00Z"}' | /opt/kafka/bin/kafka-console-producer.sh --topic user_events_source --bootstrap-server localhost:9092
Không có lỗi xuất hiện. Dữ liệu đã được đẩy vào Kafka.
Tạo DDL để import data từ Kafka vào Flink và xuất ra PostgreSQL
Thêm connector Kafka vào Flink
Flink SQL cũng cần connector Kafka để đọc dữ liệu. Tải file JAR tương ứng với phiên bản Kafka.
Tải connector Kafka cho Flink 1.18.1.
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.18.1/flink-connector-kafka-1.18.1.jar -P /opt/flink/lib/
File JAR được tải về /opt/flink/lib/.
Khởi động lại Flink JobManager và TaskManager để nạp connector Kafka.
systemctl restart flink-jobmanager
systemctl restart flink-taskmanager
Cluster Flink sẵn sàng xử lý cả Kafka và PostgreSQL.
Viết kịch bản SQL trong Flink SQL Client
Mở lại SQL Client của Flink.
/opt/flink/bin/sql-client.sh local
Chuyển vào chế độ SQL Client.
Tạo Table Source từ Kafka. Dữ liệu được định dạng JSON.
CREATE TABLE kafka_source (
event_id BIGINT,
user_id VARCHAR(50),
event_type VARCHAR(50),
event_data TEXT,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_source',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink_sql_group',
'format' = 'json'
);
Bảng nguồn được tạo. Flink sẽ tự động ánh xạ schema JSON sang các cột SQL.
Tạo Table Sink vào PostgreSQL. Sử dụng connector JDBC.
CREATE TABLE postgres_sink (
event_id BIGINT,
user_id VARCHAR(50),
event_type VARCHAR(50),
event_data TEXT,
event_time TIMESTAMP
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/flink_demo',
'table-name' = 'user_events',
'username' = 'flink_user',
'password' = 'flink_pass',
'driver' = 'org.postgresql.Driver'
);
Bảng đích được định nghĩa. Flink sẽ tự động chuyển đổi dữ liệu từ Kafka sang PostgreSQL khi chạy lệnh INSERT.
Chạy lệnh INSERT để bắt đầu luồng xử lý (Stream Processing).
INSERT INTO postgres_sink
SELECT event_id, user_id, event_type, event_data, event_time
FROM kafka_source;
Lệnh này sẽ chạy liên tục (long-running job) và chờ dữ liệu từ Kafka. Bạn sẽ thấy thông báo Job started successfully trong console.
Để kiểm tra, mở một terminal mới và gửi thêm dữ liệu vào Kafka.
echo '{"event_id": 2, "user_id": "user_002", "event_type": "purchase", "event_data": "item_123", "event_time": "2023-10-27T10:05:00Z"}' | /opt/kafka/bin/kafka-console-producer.sh --topic user_events_source --bootstrap-server localhost:9092
Dữ liệu mới được đẩy vào Kafka.
Verify kết quả ETL
Quay lại terminal kiểm tra PostgreSQL để xem dữ liệu đã được lưu chưa.
PGPASSWORD=flink_pass psql -h localhost -U flink_user -d flink_demo -c "SELECT * FROM user_events;"
Kết quả mong đợi là thấy 2 dòng dữ liệu (một từ bước verify trước, một từ dòng lệnh INSERT vừa rồi) trong bảng user_events.
Trong terminal SQL Client, bạn có thể dừng job bằng cách nhấn Ctrl+C hoặc gõ lệnh CANCEL QUERY; nếu muốn dừng xử lý.
Điều hướng series:
Mục lục: Series: Triển khai Database Stream Processing với Apache Flink SQL và Ubuntu 24.04
« Phần 2: Cấu hình kiến trúc Standalone và chạy job đơn giản đầu tiên
Phần 4: Xử lý dữ liệu stream thời gian thực với Windowing và Watermark »