1. Xử lý dữ liệu trễ (Late Data) trong ClickHouse
Trong hệ thống streaming, dữ liệu thường không đến đúng thời gian thực (real-time) do network latency, backpressure hoặc sự cố phần cứng. Dữ liệu này gọi là Late Data.
ClickHouse không tự động cập nhật các giá trị cũ trong bảng MergeTree nếu không cấu hình. Chúng ta sẽ sử dụng cơ chế ReplacingMergeTree kết hợp với logic thời gian để đảm bảo dữ liệu mới nhất luôn được giữ lại, bất kể nó đến muộn như thế nào.
Cấu hình bảng ClickHouse hỗ trợ Late Data
Bước này tạo một bảng mới với engine ReplacingMergeTree. Engine này cho phép lưu nhiều phiên bản của cùng một key (trong trường hợp này là timestamp + device_id) và tự động loại bỏ bản cũ khi thực hiện merge, giữ lại bản có timestamp lớn nhất.
Đường dẫn: /etc/clickhouse-server/config.d/late-data-config.xml (Tùy chọn, không bắt buộc nếu dùng query trực tiếp, nhưng tốt hơn là tạo bảng mới).
Tạo bảng metrics_late_safe để xử lý dữ liệu trễ:
CREATE TABLE IF NOT EXISTS metrics_late_safe (
timestamp DateTime,
device_id String,
metric_name String,
value Float64,
version DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(version)
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (device_id, timestamp)
TTL timestamp + INTERVAL 90 DAY;
Kết quả mong đợi: ClickHouse trả về Ok.. Bảng được tạo với engine ReplacingMergeTree, sử dụng cột version (mặc định là thời gian hiện tại) để xác định phiên bản mới nhất. Khi dữ liệu trễ đến, nó sẽ có version lớn hơn dữ liệu cũ, do đó sẽ được giữ lại sau khi merge.
Vận hành và Verify dữ liệu
Để kiểm tra xem cơ chế xử lý late data hoạt động, ta sẽ chèn dữ liệu "cũ" sau dữ liệu "mới".
-- Chèn dữ liệu thời gian T1 (giả sử 10 phút trước)
INSERT INTO metrics_late_safe (timestamp, device_id, metric_name, value)
VALUES (now() - INTERVAL 10 MINUTE, 'sensor-A', 'temp', 25.0);
-- Chèn dữ liệu thời gian T2 (hiện tại)
INSERT INTO metrics_late_safe (timestamp, device_id, metric_name, value)
VALUES (now(), 'sensor-A', 'temp', 26.0);
-- Chèn dữ liệu TRỄ (về thời gian T1, nhưng gửi sau T2)
-- Giả sử hệ thống gửi lại dữ liệu 10 phút trước với giá trị cập nhật là 27.0
INSERT INTO metrics_late_safe (timestamp, device_id, metric_name, value)
VALUES (now() - INTERVAL 10 MINUTE, 'sensor-A', 'temp', 27.0);
Kết quả mong đợi: Cả 3 lệnh INSERT đều thành công. Dữ liệu chưa được merge ngay lập tức.
Thực hiện truy vấn để xem dữ liệu thô (có thể thấy 3 dòng) và dữ liệu sau khi optimize (chỉ thấy 2 dòng, trong đó dòng 10 phút trước là 27.0).
-- Xem dữ liệu thô (có thể thấy trùng lặp)
SELECT count() FROM metrics_late_safe;
-- Thực hiện Optimize để kích hoạt merge logic
OPTIMIZE TABLE metrics_late_safe FINAL;
-- Truy vấn để xem dữ liệu đã được xử lý (chỉ giữ bản mới nhất)
SELECT device_id, timestamp, value
FROM metrics_late_safe
WHERE device_id = 'sensor-A'
ORDER BY timestamp;
Kết quả mong đợi: Sau khi OPTIMIZE ... FINAL, kết quả query chỉ trả về 2 dòng: một dòng tại now() (giá trị 26.0) và một dòng tại now() - 10 MIN (giá trị 27.0). Dòng giá trị 25.0 đã bị loại bỏ.
2. Áp dụng cơ chế Deduplication để tránh dữ liệu trùng lặp
Trong môi trường streaming, Vector hoặc Kafka có thể gửi cùng một record nhiều lần do retry mechanism hoặc network glitch. Để đảm bảo tính toàn vẹn (idempotency), chúng ta cần cơ chế deduplication.
Chúng ta sẽ sử dụng một cột dedup_key (thường là hash của nội dung hoặc unique ID) và cấu hình bảng để tự động loại bỏ bản sao.
Cấu hình bảng với Deduplication tự động
Tạo bảng metrics_dedup sử dụng ReplacingMergeTree dựa trên một key duy nhất. Nếu không có unique ID trong dữ liệu nguồn, ta dùng cityHash64 của các trường quan trọng.
CREATE TABLE IF NOT EXISTS metrics_dedup (
timestamp DateTime,
device_id String,
metric_name String,
value Float64,
dedup_key UInt64 DEFAULT cityHash64(device_id, metric_name, timestamp)
) ENGINE = ReplacingMergeTree(dedup_key)
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (device_id, timestamp)
TTL timestamp + INTERVAL 90 DAY;
Kết quả mong đợi: Bảng được tạo. Cột dedup_key được tự động tính toán bằng hàm cityHash64 trên các trường đầu vào. Engine sẽ so sánh dedup_key để loại bỏ bản trùng.
Inject dữ liệu trùng lặp và Verify
Chèn cùng một dòng dữ liệu 3 lần để mô phỏng lỗi trùng lặp từ Vector/Kafka.
INSERT INTO metrics_dedup (timestamp, device_id, metric_name, value)
VALUES ('2023-10-27 10:00:00', 'sensor-B', 'pressure', 101.5);
INSERT INTO metrics_dedup (timestamp, device_id, metric_name, value)
VALUES ('2023-10-27 10:00:00', 'sensor-B', 'pressure', 101.5);
INSERT INTO metrics_dedup (timestamp, device_id, metric_name, value)
VALUES ('2023-10-27 10:00:00', 'sensor-B', 'pressure', 101.5);
Kết quả mong đợi: 3 lệnh INSERT thành công. Nếu query ngay lập tức (SELECT count()), bạn có thể thấy 3 dòng.
Thực hiện optimize và kiểm tra kết quả cuối cùng.
OPTIMIZE TABLE metrics_dedup FINAL;
SELECT count() as total_rows, value
FROM metrics_dedup
WHERE device_id = 'sensor-B';
Kết quả mong đợi: total_rows trả về 1. ClickHouse đã tự động merge và loại bỏ 2 bản sao trùng lặp, chỉ giữ lại một bản duy nhất với giá trị 101.5.
3. Thiết lập Alert cho các khoảng thời gian thiếu dữ liệu
Ngay cả khi xử lý late data, nếu một thiết bị ngừng gửi dữ liệu hoàn toàn trong một khoảng thời gian dài, đây là sự cố nghiêm trọng cần báo động. Chúng ta sẽ tạo một view hoặc query để phát hiện các "khoảng trống" (gaps) trong chuỗi thời gian.
Tạo View để phát hiện khoảng trống dữ liệu
Sử dụng hàm arrayJoin kết hợp với generateSeries để tạo một chuỗi thời gian đầy đủ (theo phút hoặc giây), sau đó join với bảng dữ liệu thực tế để tìm những khoảng trống.
Đường dẫn: Tạo view trực tiếp trong ClickHouse.
CREATE VIEW gaps_detector AS
WITH
-- Tạo chuỗi thời gian mẫu (ví dụ: từ 1h trước đến nay, theo phút)
series AS (
SELECT now() - INTERVAL 1 HOUR AS start_time
),
time_range AS (
SELECT
start_time + INTERVAL (number * 1) MINUTE as expected_minute
FROM series, numbers(60)
),
actual_data AS (
SELECT
toStartOfMinute(timestamp) as actual_minute,
device_id
FROM metrics_late_safe
WHERE timestamp > now() - INTERVAL 1 HOUR
)
SELECT
tr.expected_minute,
tr.expected_minute - INTERVAL 1 MINUTE as gap_start,
tr.expected_minute as gap_end,
'Missing data for sensor' as alert_message
FROM time_range tr
LEFT JOIN actual_data ad ON tr.expected_minute = ad.actual_minute AND ad.device_id = 'sensor-A'
WHERE ad.actual_minute IS NULL;
Kết quả mong đợi: View gaps_detector được tạo. View này sẽ trả về danh sách các phút mà sensor-A không có dữ liệu trong vòng 1 giờ qua.
Verify kết quả phát hiện gap
Thực hiện query view để kiểm tra các khoảng trống hiện tại.
SELECT * FROM gaps_detector LIMIT 10;
Kết quả mong đợi: Nếu sensor-A có khoảng thời gian không gửi dữ liệu (ví dụ: mất mạng 5 phút), query sẽ trả về 5 dòng tương ứng với 5 phút đó. Nếu dữ liệu liên tục, kết quả sẽ rỗng.
Tích hợp Alert vào Vector (Optional)
Để tự động hóa, ta có thể cấu hình Vector để chạy query này định kỳ và gửi alert nếu count > 0.
File config: /etc/vector/vector.toml (Thêm đoạn này vào phần sources hoặc sinks nếu dùng k8s, hoặc thêm vào sources để fetch từ ClickHouse).
Tuy nhiên, cách phổ biến nhất là dùng ClickHouse INTERVAL kết hợp với Prometheus exporter hoặc đơn giản là cron job gọi query.
Dưới đây là lệnh shell để test logic alert:
clickhouse-client --query "SELECT count() FROM gaps_detector" | awk '{if ($1 > 0) print "ALERT: Data gap detected! Count: " $1; else print "OK: No gaps detected."}'
Kết quả mong đợi: Nếu có gap, màn hình hiển thị ALERT: Data gap detected! Count: N. Nếu không, hiển thị OK.
Điều hướng series:
Mục lục: Series: Xây dựng nền tảng Data Streaming Time-Series với Kafka, ClickHouse và Vector
« Phần 6: Viết các truy vấn hiệu quả và phân tích dữ liệu Time-Series
Phần 8: Tối ưu hóa hiệu năng và scaling hệ thống »