1. Cấu hình Vector để xuất dữ liệu vào Kafka
Bước này xác định đường dẫn xuất (sink) của Vector, chuyển đổi định dạng dòng (line-based) sang JSON và đẩy vào topic Kafka đã tạo ở phần trước.
Tại sao: Vector cần biết địa chỉ bootstrap của Kafka, chủ đề (topic) đích, và cách định dạng payload để ClickHouse có thể đọc đúng.
Kết quả mong đợi: Vector khởi động thành công, log cho thấy dòng dữ liệu được gửi đến Kafka, và topic Kafka nhận được message.
Sửa file cấu hình /etc/vector/vector.toml (hoặc file config tương ứng của bạn) để thêm sink kafka:
[sources.metrics]
type = "file"
include = ["/var/log/app/*.log"]
file_data_dir = "/var/lib/vector/file_data"
[transforms.enrich]
type = "remap"
inputs = ["metrics"]
source = '''
. = parse_json!(string!(.message))
.ingestion_time = now()
'''
[sinks.kafka]
type = "kafka"
inputs = ["enrich"]
format = { codec = "json" }
topic = "app_metrics"
brokers = ["192.168.1.100:9092"]
key_field = "service_id"
Sau khi lưu file, khởi động lại Vector để áp dụng thay đổi:
systemctl restart vector
Kiểm tra trạng thái hoạt động của Vector:
journalctl -u vector -f --no-pager | grep -i "kafka\|sent"
Xác minh dữ liệu đã vào Kafka bằng công cụ kafka-console-consumer (chạy trên máy chứa Kafka hoặc máy khác có kết nối):
kafka-console-consumer --bootstrap-server 192.168.1.100:9092 --topic app_metrics --from-beginning --max-messages 5
2. Cấu hình ClickHouse để tiêu thụ dữ liệu từ Kafka
Bước này tạo bảng nguồn (source table) trong ClickHouse để đọc dữ liệu trực tiếp từ Kafka, và bảng đích (target table) để lưu trữ bền vững.
Tại sao: ClickHouse không đọc Kafka theo cách thông thường mà cần một engine đặc biệt là Kafka để liên tục poll dữ liệu, sau đó dùng Materialized View để chuyển đổi tự động vào bảng lưu trữ.
Kết quả mong đợi: Bảng metrics_kafka_source xuất hiện, dữ liệu bắt đầu chảy vào, và bảng metrics_final tăng số lượng dòng.
Truy cập vào shell của ClickHouse (client CLI) để thực thi các lệnh SQL:
clickhouse-client
Tạo database nếu chưa tồn tại:
CREATE DATABASE IF NOT EXISTS metrics_db;
Tạo bảng nguồn (source table) với engine Kafka. Lưu ý cấu trúc cột phải khớp với JSON mà Vector đã gửi:
CREATE TABLE metrics_db.metrics_kafka_source
ON CLUSTER default
(
timestamp DateTime,
service_id String,
metric_name String,
value Float64,
ingestion_time DateTime,
message String
)
ENGINE = Kafka()
SETTINGS
kafka_brokers = '192.168.1.100:9092',
kafka_topic_list = 'app_metrics',
kafka_group_name = 'clickhouse_consumer_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1048576;
Tạo bảng đích (target table) để lưu trữ dữ liệu cuối cùng (sử dụng engine MergeTree phù hợp cho Time-Series):
CREATE TABLE metrics_db.metrics_final
ON CLUSTER default
(
timestamp DateTime,
service_id String,
metric_name String,
value Float64,
ingestion_time DateTime
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (service_id, timestamp);
Tạo Materialized View để tự động chuyển dữ liệu từ bảng Kafka sang bảng MergeTree:
CREATE MATERIALIZED VIEW metrics_db.metrics_mv
TO metrics_db.metrics_final
AS SELECT
timestamp,
service_id,
metric_name,
value,
ingestion_time
FROM metrics_db.metrics_kafka_source;
Để kiểm tra xem dữ liệu đã vào bảng đích chưa:
SELECT count() FROM metrics_db.metrics_final;
Chờ 5-10 giây, chạy lại lệnh trên. Kết quả phải lớn hơn 0 nếu có dữ liệu từ Vector.
3. Xác minh luồng dữ liệu và kiểm tra tính toàn vẹn
Bước này kiểm tra toàn bộ chuỗi: Vector -> Kafka -> ClickHouse bằng cách so sánh dữ liệu gốc và dữ liệu đã lưu.
Tại sao: Đảm bảo không có mất mát dữ liệu (data loss) hoặc sai lệch định dạng trong quá trình truyền tải.
Kết quả mong đợi: Số lượng dòng trong ClickHouse tương đương với số log đã sinh ra, các trường dữ liệu khớp nhau.
Tạo một log mẫu từ ứng dụng hoặc máy chủ để kích hoạt luồng:
echo '{"service_id": "web-01", "metric_name": "cpu_usage", "value": 85.5}' >> /var/log/app/app.log
Chờ Vector xử lý và đẩy vào Kafka (khoảng 1-2 giây).
Truy vấn ClickHouse để xem dữ liệu mới nhất:
SELECT service_id, metric_name, value, timestamp, ingestion_time FROM metrics_db.metrics_final ORDER BY timestamp DESC LIMIT 5;
So sánh giá trị value và service_id trong kết quả SQL với nội dung file log bạn vừa tạo.
Kiểm tra offset của Kafka để đảm bảo ClickHouse đã đọc hết:
kafka-consumer-groups --bootstrap-server 192.168.1.100:9092 --describe --group clickhouse_consumer_group
Quan sát cột Lag. Nếu Lag = 0, nghĩa là ClickHouse đã đọc kịp dữ liệu. Nếu Lag tăng dần, ClickHouse đang bị tụt hậu.
4. Xử lý dữ liệu bị lỗi và định dạng không khớp
Bước này cấu hình ClickHouse để xử lý các dòng JSON bị lỗi từ Vector mà không làm sập toàn bộ luồng dữ liệu.
Tại sao: Trong môi trường thực tế, log có thể bị lỗi định dạng (malformed JSON). Nếu không xử lý, ClickHouse sẽ báo lỗi và dừng tiêu thụ, gây tắc nghẽn Kafka.
Kết quả mong đợi: Dòng dữ liệu lỗi được ghi vào bảng riêng (dead-letter queue), dữ liệu hợp lệ vẫn tiếp tục được lưu vào bảng chính.
Tạo bảng để chứa dữ liệu lỗi (Error Table):
CREATE TABLE metrics_db.metrics_error_log
ON CLUSTER default
(
timestamp DateTime,
raw_message String,
error_message String,
service_id String
)
ENGINE = MergeTree()
ORDER BY timestamp;
Thay đổi cấu hình của Materialized View để bắt lỗi bằng ON ERROR (chỉ áp dụng cho phiên bản ClickHouse mới hỗ trợ, hoặc dùng logic REPLACE trong transform). Tuy nhiên, cách phổ biến và an toàn nhất là cấu hình format trong bảng nguồn Kafka để bỏ qua lỗi hoặc sử dụng REMAP.
Đối với trường hợp JSON bị lỗi, ta sẽ sửa lại bảng nguồn Kafka để thêm tham số kafka_skip_broken_messages (bỏ qua N dòng đầu bị lỗi nếu không thể parse) hoặc cấu hình Materialized View để catch lỗi.
Tạo lại Materialized View với cơ chế xử lý lỗi đơn giản: nếu parse JSON thất bại, chuyển vào bảng lỗi (cần dùng try trong logic SQL nếu dùng transform, nhưng với engine Kafka, ta dùng tham số):
ALTER TABLE metrics_db.metrics_kafka_source MODIFY SETTING kafka_skip_broken_messages = 10;
Lệnh trên cho phép ClickHouse bỏ qua tối đa 10 dòng liên tiếp bị lỗi định dạng để không bị treo. Để ghi lại các dòng này, ta cần tạo một View khác chỉ lấy dữ liệu thô:
CREATE MATERIALIZED VIEW metrics_db.metrics_error_mv
TO metrics_db.metrics_error_log
AS SELECT
now() as timestamp,
message as raw_message,
'Parse Error' as error_message,
'' as service_id
FROM metrics_db.metrics_kafka_source
WHERE message NOT LIKE '{%' OR message = '';
Để test lỗi, tạo một dòng log không hợp lệ:
echo 'This is not valid JSON data at all' >> /var/log/app/app.log
Chờ Vector đẩy lên Kafka.
Kiểm tra bảng lỗi:
SELECT * FROM metrics_db.metrics_error_log ORDER BY timestamp DESC LIMIT 1;
Xác nhận dòng dữ liệu lỗi đã được ghi lại, trong khi bảng metrics_final không bị ảnh hưởng bởi dòng này.
Quản trị viên cần xem log lỗi của ClickHouse để debug nguyên nhân:
clickhouse-client --query "SELECT * FROM system.query_log WHERE query LIKE '%metrics_kafka_source%' ORDER BY query_start_time DESC LIMIT 10;"
Cuối cùng, để sửa lỗi tại nguồn, kiểm tra lại cấu hình remap trong Vector để đảm bảo nó chỉ gửi JSON hợp lệ:
grep -A 10 "transforms.enrich" /etc/vector/vector.toml
Đảm bảo logic parse_json! hoặc parse_json (không có dấu !) được sử dụng đúng ngữ cảnh để tránh gửi dữ liệu thô vào Kafka.
Điều hướng series:
Mục lục: Series: Xây dựng nền tảng Data Streaming Time-Series với Kafka, ClickHouse và Vector
« Phần 4: Cài đặt ClickHouse và cấu hình bảng Time-Series
Phần 6: Viết các truy vấn hiệu quả và phân tích dữ liệu Time-Series »