Cấu hình ClickHouse Sink Connector cho Flink
Bước đầu tiên là chuẩn bị thư viện connector ClickHouse cho Flink. Chúng ta cần thêm dependency vào file pom.xml hoặc build.sbt của project Java/Scala để Flink có thể giao tiếp với ClickHouse.
Tại sao: Flink core không bao gồm connector cho ClickHouse. Chúng ta cần sử dụng thư viện clickhouse-jdbc hoặc connector community để thiết lập kết nối JDBC và ánh xạ schema.
Kết quả mong đợi: Build project thành công, JAR file chứa class ClickHouseSinkBuilder hoặc tương đương.
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.6.0</version>
<classifier>jdbc</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.3.1</version>
</dependency>
Thư viện flink-connector-jdbc hiện là chuẩn de facto cho việc viết dữ liệu vào các DB quan hệ bao gồm ClickHouse thông qua JDBC, giúp đơn giản hóa logic sink.
Tiếp theo, chúng ta cấu hình đường dẫn JDBC. ClickHouse trên Kubernetes thường expose service với port 8123 (HTTP) hoặc 9000 (Native). Connector JDBC ưu tiên Native protocol cho throughput cao.
String jdbcUrl = "jdbc:clickhouse://clickhouse-service:9000/default";
Properties info = new Properties();
info.setProperty("user", "default");
info.setProperty("password", "");
Kết quả mong đợi: Biến jdbcUrl và info được khởi tạo sẵn sàng để truyền vào hàm JdbcSinkBuilder.
Xây dựng Logic Sink với Upsert
Bây giờ ta sẽ viết code Java để định nghĩa Sink. Yêu cầu quan trọng ở đây là áp dụng Upsert logic để xử lý dữ liệu trùng lặp (ví dụ: cùng một ID user gửi lại log mới).
Tại sao: Trong stream processing, dữ liệu có thể bị gửi lại do retry mechanism của Kafka hoặc Flink. Nếu không xử lý Upsert (Update or Insert), bảng sẽ bị dубликат (duplicates) làm sai lệch thống kê.
Kết quả mong đợi: Flink tự động chạy lệnh INSERT ... ON DUPLICATE KEY UPDATE hoặc sử dụng MergeTree engine đặc thù của ClickHouse.
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcDialect;
import org.apache.flink.connector.jdbc.JdbcDialects;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
// Giả sử RowData hoặc POJO tên là UserEvent
JdbcDialect dialect = JdbcDialects.getClickHouse();
JdbcSink<UserEvent> sink = JdbcSink.sink(
"INSERT INTO user_events (event_id, user_id, event_type, ts) VALUES (?, ?, ?, ?)",
(stmt, userEvent) -> {
stmt.setString(1, userEvent.getEventId());
stmt.setString(2, userEvent.getUserId());
stmt.setString(3, userEvent.getEventType());
stmt.setLong(4, userEvent.getTimestamp());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(100)
.withMaxRetries(5)
.build(),
dialect,
info,
jdbcUrl
);
Tham số withBatchSize(1000) gom 1000 sự kiện thành một lệnh SQL, giảm tải cho ClickHouse. withMaxRetries(5) đảm bảo tính nhất quán khi mạng chập chờn.
Kết quả mong đợi: Sink được tạo ra, sẵn sàng nhận dòng dữ liệu từ Flink DataStream.
Xử lý Batching và Retry Mechanism
Để tối ưu hiệu năng, chúng ta cần tinh chỉnh cơ chế batching và retry. ClickHouse hoạt động tốt nhất với các batch lớn hơn là write từng dòng (row-by-row).
Tại sao: Mỗi lần giao dịch (commit) với ClickHouse đều có overhead. Batching giúp giảm số lượng request network. Retry mechanism đảm bảo nếu ClickHouse restart hoặc network timeout, Flink sẽ tự động thử lại mà không mất dữ liệu (với điều kiện có checkpoint).
Kết quả mong đợi: Throughput tăng lên, số lượng lỗi "Connection refused" giảm về 0 nhờ retry.
JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder()
.withBatchSize(5000) // Tăng batch size cho production
.withBatchIntervalMs(200) // Gửi batch sau 200ms nếu chưa đủ 5000 dòng
.withMaxRetries(10) // Tăng số lần retry
.build();
Giá trị batchIntervalMs rất quan trọng: nếu dữ liệu đến thưa thớt, batch sẽ không đủ kích thước, nhưng interval đảm bảo dữ liệu vẫn được đẩy đi kịp thời, không bị ứ đọng trong buffer quá lâu.
Kết quả mong đợi: Dữ liệu được đẩy vào ClickHouse với độ trễ tối ưu (trade-off giữa latency và throughput).
Áp dụng Upsert Logic với ClickHouse MergeTree
ClickHouse không hỗ trợ chuẩn SQL ON DUPLICATE KEY UPDATE như MySQL. Để thực hiện Upsert, chúng ta phải sử dụng cơ chế ReplacingMergeTree hoặc AggregatingMergeTree của ClickHouse.
Tại sao: ClickHouse là OLAP, không phải OLTP. Nó không cập nhật hàng ngay lập tức. Thay vào đó, nó ghi bản ghi mới vào disk, sau đó merge (hợp nhất) các bản ghi để loại bỏ cái cũ trong quá trình background.
Kết quả mong đợi: Khi query, chỉ thấy bản ghi mới nhất cho mỗi key, không có dữ liệu trùng lặp.
Đầu tiên, đảm bảo bảng ClickHouse đã được tạo với engine thích hợp. Dưới đây là script SQL để tạo bảng hỗ trợ Upsert.
CREATE TABLE IF NOT EXISTS user_events (
event_id String,
user_id String,
event_type String,
ts DateTime,
version UInt64
) ENGINE = ReplacingMergeTree(version)
PARTITION BY toYYYYMM(ts)
ORDER BY (user_id, event_id, ts);
Cột version (UInt64) dùng để xác định bản ghi nào mới hơn. Trong Flink, ta phải truyền giá trị version này (thường là timestamp hoặc sequence number).
Cấu hình lại query INSERT trong Flink để bao gồm cột version.
String insertQuery = "INSERT INTO user_events (event_id, user_id, event_type, ts, version) VALUES (?, ?, ?, ?, ?)";
// Trong lambda setStatement:
stmt.setLong(5, userEvent.getTimestamp()); // Dùng timestamp làm version
Kết quả mong đợi: ClickHouse tự động giữ lại bản ghi có version cao nhất sau khi merge hoàn tất. Query trả về dữ liệu mới nhất.
Kiểm tra độ trễ (Latency) từ Source đến Storage
Để xác minh hiệu năng của pipeline, chúng ta cần đo độ trễ từ lúc dữ liệu vào Kafka đến khi có thể query được trong ClickHouse.
Tại sao: Trong hệ thống real-time, độ trễ (latency) là chỉ số quan trọng nhất. Nếu latency quá cao (ví dụ > 5 giây), dashboard sẽ không phản ánh trạng thái thực tế.
Kết quả mong đợi: Có số liệu cụ thể về độ trễ trung bình và độ trễ tail (P99).
Chúng ta sẽ thêm timestamp vào dữ liệu khi vào Flink, và so sánh với thời điểm query trong ClickHouse.
// Trong Flink Source hoặc Map function
DataStream<UserEvent> stream = env.addSource(kafkaSource)
.map(event -> {
event.setIngestTime(System.currentTimeMillis());
return event;
});
Query để kiểm tra độ trễ trong ClickHouse. Giả sử bảng đã chứa dữ liệu.
SELECT
avg(now() - toDateTime(ingest_time)) as avg_latency,
quantile(0.99)(now() - toDateTime(ingest_time)) as p99_latency
FROM user_events
WHERE ingest_time > now() - INTERVAL 1 MINUTE;
Kết quả mong đợi: Giá trị avg_latency và p99_latency trả về dưới 2 giây (tùy thuộc vào cấu hình batching và tốc độ merge của ClickHouse).
Verify bằng cách chạy query này sau khi gửi một burst dữ liệu (10,000 events) vào Kafka và quan sát kết quả trong ClickHouse.
clickhouse-client --query "SELECT max(ingest_time) FROM user_events"
So sánh giá trị max(ingest_time) này với thời gian hiện tại. Chênh lệch chính là độ trễ tổng thể của hệ thống.
Điều hướng series:
Mục lục: Series: Series: Xây dựng hệ thống Real-time Analytics và Stream Processing với Apache Kafka, Flink và ClickHouse trên Kubernetes
« Phần 6: Xử lý dữ liệu thời gian thực: Aggregation và Windowing với Flink
Phần 8: Triển khai giao diện Dashboard và Query API cho ClickHouse »