Tinh chỉnh cấu hình Kafka để tối đa hóa Throughput
Để đạt được throughput cao, chúng ta cần giảm overhead của việc phân mảnh dữ liệu và tăng hiệu quả nén. Điều này thực hiện bằng cách điều chỉnh các tham số `batch.size` và `compression.type` trên Producer (Vector) và Broker (Kafka).
Trước tiên, hãy cập nhật cấu hình của Vector để gom dữ liệu thành các batch lớn hơn trước khi gửi lên Kafka. Điều này giảm số lượng request gửi đến Kafka, giảm latency và tăng throughput.
Chỉnh sửa file cấu hình Vector, thường nằm tại /etc/vector/vector.toml hoặc /etc/vector/agents.toml tùy theo cách triển khai. Thêm hoặc sửa các dòng sau trong phần sinks.kafka:
[sinks.kafka]
type = "kafka"
inputs = ["my_metrics"]
batch = { max_bytes = 1048576, max_events = 65536, timeout = 100 }
compression = "lz4"
Tham số max_bytes được đặt thành 1MB để gom dữ liệu lớn hơn, max_events giới hạn số lượng sự kiện trong mỗi batch, và compression sử dụng thuật toán LZ4 cho tốc độ nén/giải nén nhanh nhất phù hợp với dữ liệu time-series.
Kết quả mong đợi: Bạn sẽ thấy số lượng request gửi đến Kafka giảm đi đáng kể trong khi lượng dữ liệu (bytes) truyền tải vẫn giữ nguyên hoặc tăng lên.
Tiếp theo, tinh chỉnh các tham số trên Kafka Broker để hỗ trợ các batch lớn này. Chỉnh sửa file /etc/kafka/server.properties trên tất cả các node Kafka.
log.segment.bytes=1073741824
num.io.threads=8
num.network.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
num.replica.fetchers=4
Tham số log.segment.bytes tăng kích thước mỗi file log lên 1GB để giảm số lượng file trên disk. Tăng num.io.threads và num.network.threads giúp xử lý nhiều I/O và mạng song song hơn. Tăng num.replica.fetchers tăng tốc độ replication giữa các broker.
Kết quả mong đợi: Kafka có khả năng xử lý các batch lớn từ Vector mà không bị queue đầy hoặc latency tăng cao, đồng thời giảm tải cho disk I/O.
Để verify kết quả, sử dụng công cụ kafka-producer-perf-test.sh có sẵn trong thư mục bin của Kafka để chạy benchmark ngay trên node Broker.
bin/kafka-producer-perf-test.sh --topic test-metrics --num-threads 4 --message-size 1000 --request-timeout 30000 --throughput 10000 --producer-config /tmp/producer-perf.properties
Tạo file /tmp/producer-perf.properties với nội dung:
batch.size=1048576
linger.ms=10
compression.type=lz4
Kết quả mong đợi: Bạn sẽ thấy output hiển thị "Messages/sec" và "MB/sec" cao hơn đáng kể so với cấu hình mặc định.
Tối ưu hóa ClickHouse cho Write Performance
ClickHouse được thiết kế để write cực nhanh, nhưng cần cấu hình đúng để tận dụng tối đa tài nguyên CPU và Disk, đặc biệt là với dữ liệu time-series đến liên tục từ Kafka.
Bước đầu tiên là tăng số lượng luồng (threads) ClickHouse sử dụng khi thực hiện thao tác INSERT. Chỉnh sửa file /etc/clickhouse-server/config.d/01-write-optimization.xml.
8
16
2
Tham số max_insert_threads cho phép ClickHouse song song hóa việc ghi dữ liệu vào các phần của partition khác nhau. max_threads là giới hạn tổng số luồng cho tất cả các query. Đặt insert_quorum = 2 đảm bảo dữ liệu được ghi vào ít nhất 2 replica trước khi trả về thành công.
Kết quả mong đợi: Thời gian thực hiện lệnh INSERT giảm xuống, và thông số Rows per second trong bảng system.metrics tăng lên.
Để tăng tốc độ merge (gộp các phần dữ liệu nhỏ thành lớn) và giảm load cho disk, chúng ta cần tinh chỉnh các tham số của bảng MergeTree. Giả sử bạn đang sử dụng bảng metrics trong database default.
Thực thi lệnh SQL sau trên ClickHouse Client:
ALTER TABLE metrics ON CLUSTER default_cluster MODIFY SETTINGS
merge_tree_max_bytes_to_merge_insert = 1073741824,
merge_tree_max_bytes_to_merge_at_max_interval = 1073741824,
merge_tree_min_rows_for_concurrent_insert = 1000,
merge_tree_min_bytes_for_concurrent_insert = 10485760,
max_insert_block_size = 10485760;
Tham số merge_tree_max_bytes_to_merge_insert cho phép ClickHouse gộp dữ liệu ngay trong quá trình INSERT nếu kích thước đủ lớn, giảm số lượng file nhỏ (part) trên disk. max_insert_block_size tăng kích thước khối dữ liệu tối đa cho mỗi lần ghi.
Kết quả mong đợi: Số lượng file (parts) trong thư mục dữ liệu của bảng sẽ giảm, và tốc độ merge sẽ nhanh hơn, tránh tình trạng "merge storm" khi dữ liệu đến quá nhanh.
Để verify, kiểm tra bảng system.parts để xem số lượng phần đang ở trạng thái "Mutating" hoặc "Merging".
SELECT count() FROM system.parts WHERE table = 'metrics' AND database = 'default' AND active = 1;
Kết quả mong đợi: Số lượng active parts thấp và ổn định, không tăng vọt theo thời gian.
Triển khai Cluster ClickHouse để Scale Horizontal
Khi dữ liệu vượt quá khả năng của một node, chúng ta cần mở rộng theo chiều ngang (horizontal scaling) bằng cách thêm các node mới vào cluster và cấu hình sharding.
Giả sử chúng ta có 3 node ClickHouse với IP: 192.168.1.10, 192.168.1.11, 192.168.1.12. Chúng ta sẽ cấu hình cluster sử dụng ZooKeeper.
Trên từng node ClickHouse, chỉnh sửa file /etc/clickhouse-server/config.d/02-cluster-configuration.xml để định nghĩa cluster.
192.168.1.10
9000
192.168.1.11
9000
192.168.1.12
9000
192.168.1.5
2181
File này định nghĩa một cluster tên default_cluster với 3 shards (mỗi shard 1 replica). Nếu muốn replica cho HA, hãy thêm nhiều vào cùng một . Tham số zookeeper trỏ đến node ZooKeeper đang chạy.
Kết quả mong đợi: ClickHouse có thể nhận biết các node khác trong cluster để phân phối dữ liệu và query.
Tiếp theo, tạo lại bảng dữ liệu với clause ON CLUSTER để đảm bảo cấu trúc bảng đồng bộ trên tất cả các node.
CREATE TABLE IF NOT EXISTS metrics ON CLUSTER default_cluster
(
timestamp DateTime,
metric_name String,
value Float64,
tags Map(String, String)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/metrics', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (metric_name, timestamp);
Tham số ReplicatedMergeTree yêu cầu đường dẫn ZooKeeper /clickhouse/tables/{shard}/metrics để quản lý replication. toYYYYMM giúp phân vùng dữ liệu theo tháng để dễ quản lý.
Kết quả mong đợi: Lệnh chạy thành công trên một node sẽ tự động tạo bảng giống hệt trên 2 node còn lại.
Cấu hình Vector để gửi dữ liệu vào cluster này. Thay vì gửi vào một IP cụ thể, chúng ta gửi vào một topic Kafka và để ClickHouse đọc từ Kafka, hoặc cấu hình Vector gửi trực tiếp vào một node và ClickHouse sẽ tự balance (nếu dùng Distributed table). Ở đây, chúng ta tạo một Distributed table để query toàn cluster.
CREATE TABLE IF NOT EXISTS metrics_distributed ON CLUSTER default_cluster
(
timestamp DateTime,
metric_name String,
value Float64,
tags Map(String, String)
)
ENGINE = Distributed(default_cluster, default, metrics, rand());
Bảng metrics_distributed không chứa dữ liệu thực tế, nó chỉ là một proxy để phân phối query hoặc insert đến các bảng metrics trên các shards khác nhau dựa trên hàm hash rand() (hoặc cityHash64(metric_name) để dữ liệu cùng tên luôn nằm cùng shard).
Kết quả mong đợi: Khi bạn INSERT vào bảng metrics_distributed, dữ liệu sẽ tự động phân tán đều lên 3 node.
Để verify, thực hiện INSERT dữ liệu mẫu vào bảng distributed và kiểm tra số lượng dòng trên từng node.
INSERT INTO metrics_distributed VALUES (now(), 'cpu_usage', 85.5, {'host', 'node1'});
SELECT count() FROM metrics;
Thực thi lệnh SELECT count() FROM metrics trên từng node riêng biệt (hoặc dùng SELECT count() FROM metrics ON CLUSTER default_cluster).
Kết quả mong đợi: Tổng số dòng trên 3 node bằng tổng số dòng đã insert, và dữ liệu được phân bổ đều (hoặc theo hash key) giữa các node.
Đánh giá hiệu năng với Benchmark
Sau khi tinh chỉnh và mở rộng cluster, cần có một quy trình benchmark đơn giản để so sánh hiệu năng trước và sau tối ưu.
Sử dụng công cụ clickhouse-benchmark hoặc script shell đơn giản để tạo tải. Dưới đây là script tạo tải giả lập dòng dữ liệu từ Vector.
Tạo file /tmp/generate-load.sh:
#!/bin/bash
NUM_ROWS=1000000
CLICKHOUSE_HOST="192.168.1.10"
echo "Generating $NUM_ROWS records..."
time seq 1 $NUM_ROWS | parallel -j 4 "
clickhouse-client --host $CLICKHOUSE_HOST --query \"
INSERT INTO metrics_distributed VALUES (now(), 'metric_$RANDOM', $RANDOM, {'tag', 'val'})
\"
"
Script này sử dụng parallel để chạy 4 luồng song song, mỗi luồng thực hiện INSERT vào ClickHouse. Thay đổi CLICKHOUSE_HOST để test trên các node khác nhau.
Kết quả mong đợi: Bạn sẽ thấy tổng thời gian (real) để insert 1 triệu dòng. Thời gian này nên giảm đáng kể so với cấu hình mặc định.
Đồng thời, trên máy chủ ClickHouse, theo dõi các metric thời gian thực bằng cách chạy query song song trong terminal khác.
watch -n 1 "clickhouse-client --query \"SELECT metric_name, sum(value) FROM system.metrics WHERE metric_name IN ('InsertRows', 'InsertBytes') AND event_time > now() - 60\""
Query này hiển thị tổng số dòng và bytes đã được insert trong 60 giây qua lại.
Kết quả mong đợi: Giá trị InsertRows và InsertBytes tăng liên tục với tốc độ cao (ví dụ: hàng triệu rows/giây) mà không có dấu hiệu của lỗi hoặc chậm trễ.
Để kiểm tra khả năng mở rộng (scaling), hãy chạy cùng một script trên 2 node khác nhau cùng lúc và quan sát tổng throughput của cluster.
clickhouse-client --query "SELECT sum(value) FROM system.metrics ON CLUSTER default_cluster WHERE metric_name = 'InsertRows' AND event_time > now() - 60"
Kết quả mong đợi: Tổng số dòng insert trên cluster sẽ xấp xỉ bằng tổng của các node riêng lẻ, chứng tỏ việc scale horizontal đã thành công.
Điều hướng series:
Mục lục: Series: Xây dựng nền tảng Data Streaming Time-Series với Kafka, ClickHouse và Vector
« Phần 7: Xử lý dữ liệu trễ (Late Data) và đảm bảo tính toàn vẹn
Phần 9: Troubleshooting và các mẹo vận hành nâng cao »