Cấu hình Prometheus và Grafana để giám sát Flink và PostgreSQL
Bước 1: Cài đặt Prometheus và cấu hình scrape Flink
Chúng ta cần cài đặt Prometheus để thu thập metric từ Flink JobManager và TaskManager. Flink đã tích hợp sẵn exporter metrics, việc cần làm là chỉ định Prometheus target đúng địa chỉ và cổng.
Cài đặt Prometheus trên Ubuntu 24.04 và tạo file cấu hình để scrape endpoint metrics của Flink (thường là cổng 8081 cho JobManager).
sudo apt update && sudo apt install -y prometheus
sudo useradd --no-create-home --shell /bin/false prometheus
mkdir -p /etc/prometheus /var/lib/prometheus
chown prometheus:prometheus /etc/prometheus /var/lib/prometheus
wget https://github.com/prometheus/prometheus/releases/download/v2.48.0/prometheus-2.48.0.linux-amd64.tar.gz
tar -xvf prometheus-2.48.0.linux-amd64.tar.gz
cd prometheus-2.48.0.linux-amd64
mv prometheus promtool /usr/local/bin/
chown prometheus:prometheus /usr/local/bin/prometheus /usr/local/bin/promtool
mv consoles/ console_libraries/ /etc/prometheus/
chown -R prometheus:prometheus /etc/prometheus
Kết quả mong đợi: Prometheus đã được cài đặt vào thư mục hệ thống và user 'prometheus' đã được tạo.
Để Prometheus có thể scrape Flink, ta cần chỉnh sửa file cấu hình `/etc/prometheus/prometheus.yml`. Cấu hình này chỉ định job name và target là địa chỉ IP của JobManager.
cat > /etc/prometheus/prometheus.yml
Kết quả mong đợi: File cấu hình đã được ghi đè, Prometheus sẽ bắt đầu thu thập metric từ Flink JobManager ở địa chỉ 192.168.1.10.
Bước 2: Cấu hình scrape PostgreSQL
Flink sử dụng JDBC để kết nối PostgreSQL, nhưng để giám sát sâu (CPU, RAM, WAL, Connections), chúng ta cần PostgreSQL Exporter. Đây là một sidecar hoặc service riêng biệt để chuyển đổi metric của Postgres sang format Prometheus.
Cài đặt PostgreSQL Exporter và cấu hình kết nối an toàn thông qua biến môi trường.
sudo apt install -y postgresql-client
wget https://github.com/prometheus-community/postgres_exporter/releases/download/v0.14.0/postgres_exporter-v0.14.0.linux-amd64.tar.gz
tar -xvf postgres_exporter-v0.14.0.linux-amd64.tar.gz
mv postgres_exporter-v0.14.0.linux-amd64/postgres_exporter /usr/local/bin/
sudo useradd --no-create-home --shell /bin/false postgres_exporter
chown postgres_exporter:postgres_exporter /usr/local/bin/postgres_exporter
Kết quả mong đợi: Binary postgres_exporter đã sẵn sàng trong `/usr/local/bin/`.
Tạo file service systemd để chạy exporter và cấu hình biến môi trường chứa thông tin kết nối PostgreSQL (user, password, host, port).
cat > /etc/systemd/system/postgres_exporter.service
Kết quả mong đợi: Service postgres_exporter đang chạy và lắng nghe cổng 9187 (mặc định).
Cập nhật lại file `/etc/prometheus/prometheus.yml` để thêm job scrape cho PostgreSQL.
cat > /etc/prometheus/prometheus.yml
Kết quả mong đợi: Prometheus đã restart và hiện thêm job 'postgresql' trong danh sách targets.
Bước 3: Cài đặt Grafana và Dashboard
Để trực quan hóa dữ liệu, chúng ta cài đặt Grafana và kết nối nguồn dữ liệu (Data Source) là Prometheus.
sudo apt install -y grafana
sudo systemctl enable grafana-server
sudo systemctl start grafana-server
Kết quả mong đợi: Grafana đang chạy và có thể truy cập qua trình duyệt tại địa chỉ IP server trên cổng 3000.
Truy cập Grafana (http://IP_SERVER:3000), đăng nhập (admin/admin), sau đó vào Configuration > Data Sources > Add source. Chọn Prometheus, nhập URL là `http://localhost:9090`, sau đó click "Save & Test".
Sau đó, vào Dashboards > Import, nhập ID Dashboard cho Flink là `9404` (Flink Metrics) và PostgreSQL là `9404` (PostgreSQL Exporter) hoặc tìm kiếm "PostgreSQL Exporter" trong cộng đồng để lấy ID chính xác (thường là 11063). Nhấn Load, chọn Prometheus data source vừa tạo, sau đó Import.
Kết quả mong đợi: Dashboard hiển thị các biểu đồ về Throughput, Backpressure, Memory Usage của Flink và Connections, Transactions của PostgreSQL.
Xử lý tình trạng Backpressure và Memory Overflow trong Flink
Bước 1: Nhận diện Backpressure
Backpressure xảy ra khi tốc độ ghi (sink) vào PostgreSQL chậm hơn tốc độ đọc (source) từ Kafka hoặc file, khiến buffer của TaskManager bị đầy. Dấu hiệu nhận biết là throughput giảm đột ngột và latency tăng vọt.
Để kiểm tra trạng thái backpressure, mở Flink Web UI (http://IP_JOBMANAGER:8081), vào tab "Jobs", chọn job đang chạy, sau đó xem biểu đồ "Backpressure". Nếu cột màu đỏ xuất hiện liên tục, job đang bị nghẽn.
Hoặc dùng command line để xem metric chi tiết:
curl -s http://192.168.1.10:8081/metrics | grep flink_taskmanager_job_task_backpressure
Kết quả mong đợi: Nếu giá trị `high` hoặc `medium` xuất hiện thường xuyên, hệ thống đang chịu áp lực.
Bước 2: Xử lý Backpressure bằng cách điều chỉnh Parallelism và Buffer
Nguyên nhân phổ biến nhất là Parallelism của TaskManager quá thấp so với lượng dữ liệu, hoặc buffer quá nhỏ. Chúng ta sẽ tăng kích thước buffer và số lượng instance.
Cập nhật file `flink-conf.yaml` trong thư mục `conf` của Flink để tăng `taskmanager.network.memory.fraction` và `taskmanager.network.min-input-buffer-size`.
cat >> /opt/flink/conf/flink-conf.yaml
Kết quả mong đợi: Flink sẽ sử dụng nhiều bộ nhớ hơn cho mạng, giúp giảm tắc nghẽn khi dữ liệu dồn dập.
Khởi động lại cluster hoặc submit job mới với parallelism cao hơn. Trong Flink SQL, ta có thể điều chỉnh parallelism cho toàn bộ job hoặc từng operator.
sql-client> SET execution.parallelism = 4;
sql-client> SET execution.taskmanager.memory.process.size = 2048m;
sql-client> CREATE TABLE kafka_source (...) WITH ( 'parallelism' = '4' );
sql-client> INSERT INTO postgres_sink SELECT * FROM kafka_source;
Kết quả mong đợi: Job chạy với 4 instance song song, phân tải dữ liệu, giúp giảm backpressure tại sink.
Bước 3: Xử lý Memory Overflow (OOM)
Memory Overflow xảy ra khi Flink cần nhiều bộ nhớ hơn mức cấu hình, dẫn đến OOM Killer của Linux giết tiến trình. Điều này thường do kích thước batch quá lớn hoặc state quá lớn.
Để phòng ngừa, cấu hình giới hạn bộ nhớ rõ ràng trong `flink-conf.yaml` và đảm bảo JVM heap không vượt quá 80% RAM vật lý của node.
cat >> /opt/flink/conf/flink-conf.yaml
Kết quả mong đợi: Flink được giới hạn sử dụng bộ nhớ, tránh làm sập hệ điều hành, nhưng cần đảm bảo RAM server đủ lớn.
Kiểm tra log của Flink để xem lỗi OOM:
grep -i "java.lang.OutOfMemoryError" /opt/flink/log/flink-taskmanager-*.log
Kết quả mong đợi: Nếu thấy lỗi, cần tăng `taskmanager.memory.process.size` hoặc giảm độ phức tạp của query SQL (ví dụ: giảm số lượng key trong aggregation).
Phân tích log lỗi khi PostgreSQL từ chối kết nối hoặc write timeout
Bước 1: Xử lý lỗi "too many connections"
Trong mô hình HTAP, Flink mở nhiều connection song song để ghi dữ liệu. Nếu PostgreSQL từ chối kết nối, log sẽ hiện lỗi "FATAL: sorry, too many clients already".
Kiểm tra log của PostgreSQL tại `/var/log/postgresql/postgresql-16-main.log` (tùy version) để xác nhận.
grep -i "too many clients" /var/log/postgresql/postgresql-16-main.log
Kết quả mong đợi: Thấy dòng log xác nhận số lượng kết nối đã đạt giới hạn.
Giải pháp là tăng tham số `max_connections` trong file `postgresql.conf`. Lưu ý: mỗi connection tốn RAM, cân nhắc kỹ trước khi tăng.
sed -i 's/#max_connections = 100/max_connections = 500/' /etc/postgresql/16/main/postgresql.conf
sudo systemctl restart postgresql
Kết quả mong đợi: PostgreSQL cho phép tối đa 500 kết nối đồng thời.
Tuy nhiên, cách tốt hơn cho HTAP là sử dụng Connection Pooling. Cấu hình Flink JDBC Connector để sử dụng `max-connections` và `connection-check-interval`.
sql-client> CREATE TABLE postgres_sink (
id INT,
amount DECIMAL(10,2),
ts TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://127.0.0.1:5432/htap_db',
'table-name' = 'transactions',
'username' = 'flink_user',
'password' = 'secure_password',
'connection-check-interval' = '5000',
'max-connections' = '50'
);
Kết quả mong đợi: Flink tự động quản lý pool kết nối, không tạo quá 50 connection mới mỗi lần, tránh làm đầy limit của PostgreSQL.
Bước 2: Xử lý lỗi "Write Timeout"
Lỗi này xảy ra khi Flink gửi dữ liệu nhưng PostgreSQL không phản hồi trong thời gian quy định (do quá tải CPU, lock, hoặc mạng chậm). Log Flink sẽ hiện "java.util.concurrent.TimeoutException" hoặc "SocketTimeoutException".
Để tăng thời gian chờ, ta điều chỉnh tham số `socketTimeout` trong cấu hình JDBC của Flink.
sql-client> CREATE TABLE postgres_sink (
...
) WITH (
...
'socketTimeout' = '10000',
'execution.checkpointing.mode' = 'EXACTLY_ONCE'
);
Kết quả mong đợi: Flink sẽ chờ 10 giây trước khi báo lỗi timeout, giúp hệ thống chịu được các spike ngắn hạn.
Phân tích nguyên nhân sâu hơn bằng cách xem log PostgreSQL khi lỗi xảy ra:
grep -A 5 "timeout" /var/log/postgresql/postgresql-16-main.log
Kết quả mong đợi: Nếu thấy "deadlock detected" hoặc "lock timeout", cần xem lại schema (thiếu index) hoặc logic transaction (giao dịch quá dài).
Mẹo tối ưu hóa Checkpointing để đảm bảo tính nhất quán dữ liệu
Bước 1: Cấu hình Checkpointing phù hợp cho HTAP
Trong HTAP, tính nhất quán (Consistency) quan trọng hơn tốc độ. Chúng ta cần đảm bảo Flink thực hiện Checkpoint thành công trước khi cam kết (commit) dữ liệu vào PostgreSQL. Sử dụng chế độ `EXACTLY_ONCE`.
Cấu hình file `flink-conf.yaml` để kích hoạt checkpointing với khoảng thời gian hợp lý.
cat >> /opt/flink/conf/flink-conf.yaml
Kết quả mong đợi: Flink sẽ tạo checkpoint mỗi 60 giây, đảm bảo dữ liệu được lưu vào state backend (RocksDB) và commit vào DB nếu thành công.
Bước 2: Tối ưu hóa Write-through với Two-Phase Commit (2PC)
Để đảm bảo PostgreSQL ghi dữ liệu đúng thời điểm checkpoint, Flink JDBC Connector hỗ trợ 2PC. Tuy nhiên, nó yêu cầu PostgreSQL hỗ trợ transaction manager. Cấu hình `sink.buffer-flush.max-checkpoint-wait` để tránh làm chậm checkpoint quá lâu.
sql-client> CREATE TABLE postgres_sink (
...
) WITH (
...
'sink.buffer-flush.interval' = '5s',
'sink.buffer-flush.max-checkpoint-wait' = '60s'
);
Kết quả mong đợi: Dữ liệu được cache và flush định kỳ, nhưng khi checkpoint đến, Flink sẽ chờ tối đa 60s để đảm bảo tất cả data đã vào DB trước khi hoàn thành checkpoint.
Bước 3: Xử lý lỗi Checkpoint Failures
Nếu checkpoint liên tục fail, Flink sẽ không thể đảm bảo tính nhất quán. Kiểm tra log để xem nguyên nhân (thường là PostgreSQL quá tải hoặc network partition).
grep -i "checkpoint failed" /opt/flink/log/flink-jobmanager-*.log
Kết quả mong đợi: Xác định được nguyên nhân fail (ví dụ: "Timeout during checkpoint").
Để khắc phục, tăng thời gian timeout hoặc giảm tần suất checkpoint nếu hệ thống quá tải.
sql-client> SET execution.checkpointing.interval = 120000;
sql-client> SET execution.checkpointing.timeout = 20min;
Kết quả mong đợi: Checkpoint chạy thành công với tần suất thấp hơn, giảm áp lực lên PostgreSQL.
Hướng dẫn Backup và Restore dữ liệu PostgreSQL trong môi trường HTAP
Bước 1: Backup dữ liệu PostgreSQL
Trong môi trường HTAP, dữ liệu luôn thay đổi. Backup cần được thực hiện mà không làm gián đoạn service (Online Backup). Sử dụng `pg_dump` cho backup logic hoặc `pg_basebackup` cho backup vật lý.
Tạo script backup tự động vào thư mục `/var/backups/postgres/`.
mkdir -p /var/backups/postgres
chown postgres:postgres /var/backups/postgres
cat > /usr/local/bin/backup_htap.sh
Kết quả mong đợi: Script backup đã sẵn sàng, lưu file dump định dạng custom (.dump).
Thêm vào crontab để chạy hàng ngày vào 2:00 AM.
sudo crontab -e
# Thêm dòng sau:
0 2 * * * /usr/local/bin/backup_htap.sh >> /var/log/postgres_backup.log 2>&1
Kết quả mong đợi: Backup tự động chạy hàng ngày và log được ghi vào file.
Bước 2: Restore dữ liệu PostgreSQL
Khi cần khôi phục dữ liệu (ví dụ: sau lỗi logic hoặc xóa nhầm), sử dụng `pg_restore` từ file backup.
Để restore, đầu tiên cần tạo lại database mới (hoặc xóa dữ liệu cũ nếu muốn restore toàn bộ).
sudo -u postgres createdb htap_db_restore
export PGPASSWORD="secure_password"
pg_restore -h 127.0.0.1 -p 5432 -U flink_user -d htap_db_restore /var/backups/postgres/htap_db_20231027_020000.dump
Kết quả mong đợi: Dữ liệu đã được khôi phục vào database `htap_db_restore`.
Bước 3: Khôi phục trạng thái Flink (Savepoint)
Trong HTAP, chỉ restore PostgreSQL là chưa đủ, vì Flink có state (checkpoint). Cần restore Flink từ Savepoint để đảm bảo trạng thái xử lý khớp với dữ liệu trong DB.
Đảm bảo Flink cluster đã chạy và có savepoint được lưu trong `/opt/flink/savepoints/`.
ls -l /opt/flink/savepoints/
Kết quả mong đợi: Thấy thư mục savepoint gần nhất (ví dụ: `2023-10-27T02-00-00`).
Khởi động lại job Flink từ savepoint đó để đồng bộ lại state.
flink run \
--fromSavepoint /opt/flink/savepoints/2023-10-27T02-00-00 \
--allowNonRestoredState \
/opt/flink/job.jar
Kết quả mong đợi: Job Flink chạy lại từ trạng thái đã lưu, tiếp tục xử lý dữ liệu mới từ điểm đó, đảm bảo tính nhất quán với dữ liệu đã restore trong PostgreSQL.
Điều hướng series:
Mục lục: Series: Triển khai Database HTAP với Apache Flink và PostgreSQL trên Ubuntu 24.04
« Phần 6: Triển khai mô hình HTAP: Kết hợp truy vấn OLAP và ghi OLTP