Cài đặt Vector Agent và cấu hình Source
Chúng ta sẽ cài đặt Vector agent trực tiếp trên máy chủ Linux để bắt đầu quá trình thu thập dữ liệu.
Việc cài đặt thông qua script chính thức của Vector đảm bảo phiên bản mới nhất và các thư viện phụ thuộc được xử lý tự động.
curl -L https://github.com/vectordotdev/vector/releases/download/v0.35.0/vector-v0.35.0-x86_64-unknown-linux-musl.tar.gz | tar -xz -C /usr/bin/ vector
Kết quả mong đợi: Command chạy thành công, không có lỗi, và file binary vector xuất hiện trong thư mục /usr/bin/.
Tiếp theo, tạo cấu trúc thư mục để chứa file cấu hình của Vector.
mkdir -p /etc/vector
touch /etc/vector/vector.toml
Kết quả mong đợi: Thư mục /etc/vector được tạo và file vector.toml trống sẵn sàng để cấu hình.
Cấu hình source đầu tiên là stdin để mô phỏng dữ liệu từ ứng dụng gửi trực tiếp vào Vector qua pipe.
Bạn cần định nghĩa source với tên "stdin-source" và loại "stdin" để Vector lắng nghe luồng dữ liệu từ console.
[sources.stdin-source]
type = "stdin"
encoding.codec = "json"
Kết quả mong đợi: Vector sẽ chờ nhận dữ liệu JSON từ bàn phím khi khởi động, dữ liệu này sẽ được parse thành JSON object.
Để thu thập log từ file thực tế (ví dụ: log của ứng dụng), chúng ta thêm source file.
Source này sẽ đọc các dòng mới được ghi vào file log giả lập tại /var/log/app/application.log.
[sources.file-source]
type = "file"
include = ["/var/log/app/application.log"]
encoding.codec = "utf8"
multiline = { start_pattern = "^\d{4}-\d{2}-\d{2}" }
Kết quả mong đợi: Vector sẽ đọc file log, tự động nhận diện dòng mới bắt đầu bằng ngày tháng (YYYY-MM-DD) để gộp nhiều dòng log thành một event nếu cần.
Verify kết quả bằng cách chạy Vector ở chế độ debug và gõ dữ liệu vào terminal.
vector -c /etc/vector/vector.toml -e debug
Kết quả mong đợi: Console hiện log "Listening on stdin" hoặc "Reading from file", và khi bạn nhập một dòng JSON hợp lệ, bạn thấy log "Received event" kèm nội dung JSON đã parse.
Cấu hình Transform để Parse JSON và chuẩn hóa Timestamp
Sau khi dữ liệu vào Source, chúng ta cần một bước Transform để làm sạch và chuẩn hóa dữ liệu trước khi gửi đi.
Việc này đảm bảo dữ liệu về Kafka và ClickHouse luôn có cấu trúc đồng nhất, đặc biệt là trường timestamp phải ở định dạng Unix Epoch (giây hoặc mili giây).
Tạo transform "parse-json" để xử lý dữ liệu thô từ source file (nếu dữ liệu chưa phải JSON) hoặc để tái cấu trúc lại object JSON.
[transforms.parse-json]
type = "remap"
inputs = ["file-source"]
source = '''
. = parse_json!(.message)
'''
Kết quả mong đợi: Nếu source file gửi chuỗi string JSON, transform này sẽ chuyển đổi nó thành object JSON có các trường riêng biệt.
Tiếp theo, tạo transform "add-timestamp" để đảm bảo mọi event đều có trường "timestamp" chuẩn cho Time-Series Database.
Chúng ta sử dụng hàm "now()" để gán thời gian hiện tại nếu dữ liệu gốc không có timestamp, hoặc map trường "ts" từ dữ liệu vào thành "timestamp".
[transforms.add-timestamp]
type = "remap"
inputs = ["stdin-source", "parse-json"]
source = '''
if .timestamp == null {
.timestamp = now()
}
# Đảm bảo timestamp là integer (giây) nếu cần cho ClickHouse
if .timestamp != null {
.timestamp = to_unix_timestamp(.timestamp)
}
'''
Kết quả mong đợi: Dữ liệu đi ra khỏi transform này luôn có trường "timestamp" là số nguyên (Unix timestamp), không còn dạng string.
Thêm transform "rename-fields" để chuẩn hóa tên trường theo yêu cầu của schema Avro hoặc ClickHouse Table.
Ví dụ: đổi tên trường "user_id" thành "user_id" (nếu cần mapping) hoặc thêm trường "environment" cố định.
[transforms.rename-fields]
type = "remap"
inputs = ["add-timestamp"]
source = '''
.environment = "production"
.event_type = "metrics"
'''
Kết quả mong đợi: Dữ liệu đầu ra của transform này có thêm các trường metadata cố định "environment" và "event_type".
Verify kết quả bằng cách xem log debug của Vector khi có dữ liệu đi qua transform.
vector -c /etc/vector/vector.toml -e debug 2>&1 | grep "Event after transform"
Kết quả mong đợi: Log hiển thị cấu trúc JSON đã được sửa đổi, có trường "timestamp" là số và các trường metadata mới.
Sử dụng Vector Sink để gửi dữ liệu vào Kafka
Bây giờ chúng ta cấu hình Sink để đẩy dữ liệu đã xử lý từ Vector vào Kafka Topic đã được tạo ở Phần 2.
Vector hỗ trợ cả định dạng JSON (nhẹ, dễ debug) và Avro (nặng, cần Schema Registry). Ở đây chúng ta sẽ cấu hình gửi JSON trước, sau đó mở rộng sang Avro.
Cấu hình sink "kafka-json-sink" để gửi dữ liệu ở định dạng JSON thuần.
Chúng ta chỉ định topic là "time-series-raw" và chỉ định cluster Kafka đã chạy ở local hoặc remote.
[sinks.kafka-json-sink]
type = "kafka"
inputs = ["rename-fields"]
topic = "time-series-raw"
key_field = "user_id"
compression = "lz4"
brokers = ["kafka:9092"]
format = { encoding = "json" }
tls.enabled = false
Kết quả mong đợi: Vector kết nối thành công với Kafka broker, dữ liệu được gửi vào topic "time-series-raw" với key là "user_id" để đảm bảo ordering.
Cấu hình sink "kafka-avro-sink" để gửi dữ liệu theo định dạng Avro (tùy chọn nâng cao).
Để dùng Avro, bạn cần có Confluent Schema Registry hoặc dùng schema đơn giản trong Vector. Ở đây giả định dùng schema động hoặc JSON schema mapping.
[sinks.kafka-avro-sink]
type = "kafka"
inputs = ["rename-fields"]
topic = "time-series-avro"
brokers = ["kafka:9092"]
format = { encoding = "avro" }
# Nếu dùng schema registry
# schema_registry_url = "http://schema-registry:8081"
# subject_name_strategy = "topic"
compression = "zstd"
tls.enabled = false
Kết quả mong đợi: Dữ liệu được mã hóa thành Avro binary trước khi gửi vào Kafka, tiết kiệm băng thông nhưng cần consumer hiểu schema.
Verify kết quả bằng cách kiểm tra topic trong Kafka xem đã có dữ liệu chưa.
kafka-console-consumer --bootstrap-server kafka:9092 --topic time-series-raw --from-beginning --max-messages 5
Kết quả mong đợi: Console hiển thị 5 dòng dữ liệu JSON đã được Vector gửi đến, có đầy đủ các trường "timestamp", "environment", "event_type".
Mô phỏng việc gửi dữ liệu từ ứng dụng giả lập
Để hoàn tất luồng dữ liệu, chúng ta cần một script giả lập (mock application) để tạo dữ liệu và gửi vào Vector.
Script này sẽ chạy một vòng lặp vô tận, tạo các record metrics ngẫu nhiên và đẩy vào stdin của Vector hoặc ghi vào file log.
Tạo script shell "generate-metrics.sh" để sinh dữ liệu JSON ngẫu nhiên mô phỏng metrics CPU/Memory.
#!/bin/bash
while true; do
ts=$(date +%s)
cpu=$((RANDOM % 100))
mem=$((RANDOM % 100))
user_id="user_$((RANDOM % 1000))"
echo "{\"timestamp\": $ts, \"user_id\": \"$user_id\", \"cpu_usage\": $cpu, \"memory_usage\": $mem, \"message\": \"System metrics updated\"}"
sleep 1
done
Kết quả mong đợi: Script tạo ra các dòng JSON hợp lệ với timestamp hiện tại và các giá trị ngẫu nhiên cho CPU, Memory.
Chạy script và pipe trực tiếp vào Vector đang lắng nghe stdin.
Cách này mô phỏng ứng dụng gửi log trực tiếp qua socket hoặc pipe vào agent.
./generate-metrics.sh | vector -c /etc/vector/vector.toml -e info
Kết quả mong đợi: Vector nhận dữ liệu, chạy qua các transform (parse, add timestamp, rename), và đẩy vào Kafka. Bạn thấy log "Sent X events" trong terminal.
Phương án thay thế: Ghi dữ liệu vào file để test source file.
mkdir -p /var/log/app
./generate-metrics.sh >> /var/log/app/application.log
Kết quả mong đợi: File /var/log/app/application.log tăng lên theo thời gian thực. Vector (nếu cấu hình file-source) sẽ đọc và gửi dữ liệu này vào Kafka.
Verify toàn bộ luồng bằng cách kiểm tra dữ liệu trong Kafka từ cả hai nguồn (stdin và file).
kafka-console-consumer --bootstrap-server kafka:9092 --topic time-series-raw --from-beginning --max-messages 10 --property print.key=true
Kết quả mong đợi: Hiển thị 10 dòng dữ liệu, mỗi dòng có Key (user_id) và Value (JSON object) đã được chuẩn hóa timestamp, chứng tỏ Vector đã xử lý thành công từ nguồn giả lập đến Kafka.
Điều hướng series:
Mục lục: Series: Xây dựng nền tảng Data Streaming Time-Series với Kafka, ClickHouse và Vector
« Phần 2: Triển khai và cấu hình nền tảng Apache Kafka
Phần 4: Cài đặt ClickHouse và cấu hình bảng Time-Series »