Cơ chế Dead Letter Queue (DLQ) để xử lý lỗi Consumer
Trong mô hình Event Sourcing, việc một event bị lỗi khi xử lý không được phép làm dừng toàn bộ luồng Consumer. Chiến lược an toàn nhất là chuyển các event lỗi sang một topic riêng biệt gọi là Dead Letter Queue.
Bước 1: Tạo topic DLQ để lưu trữ các event không thể xử lý.
Mục đích: Tạo nơi chứa "xác" của các event lỗi, tránh mất dữ liệu và ngăn Consumer bị treo (poison pill).
Kết quả mong đợi: Topic order-dlq được tạo với 3 phân vùng và replication factor 3.
kafka-topics --bootstrap-server localhost:9092 --create --topic order-dlq --partitions 3 --replication-factor 3
Bước 2: Cấu hình Consumer để tự động chuyển event lỗi sang DLQ.
Mục đích: Trong ứng dụng Java/Spring, ta cần bắt exception khi xử lý event và ghi lại nguyên nhân vào header của message trước khi đẩy sang DLQ.
Kết quả mong đợi: Consumer không còn bị crash, message lỗi được ghi log và đẩy sang topic DLQ.
// Trong lớp EventConsumer (Java/Spring Kafka)
@KafkaListener(topics = "orders", groupId = "order-consumer-group", containerFactory = "kafkaListenerContainerFactory")
public void consumeOrder(OrderEvent event) {
try {
processEvent(event); // Logic nghiệp vụ
} catch (Exception e) {
// Ghi nguyên nhân lỗi vào header
ProducerRecord<String, String> errorRecord = new ProducerRecord(
"order-dlq",
event.getPartition(),
event.getKey(),
event.getValue()
);
errorRecord.headers().add("error-reason", e.getMessage().getBytes());
errorRecord.headers().add("original-topic", "orders".getBytes());
kafkaTemplate.send(errorRecord);
log.error("Event sent to DLQ: {}", event.getEventId(), e);
}
}
Bước 3: Cấu hình Factory để xử lý lỗi ở mức container (bắt buộc nếu dùng Spring Kafka).
Mục đích: Đảm bảo cơ chế retry và DLQ hoạt động đúng ngay cả khi lỗi ở cấp độ infra.
Kết quả mong đợi: Bean errorHandler được khởi tạo, hỗ trợ cơ chế RetryTemplate và DeadLetterPublishingRecoveryHandler.
// Trong lớp KafkaConfig (Java/Spring)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory,
KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// Cấu hình Retry
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setMaxAttempts(3);
retryTemplate.setRetryOn(RetryableException.class);
// Cấu hình DLQ Handler
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoveryHandler(kafkaTemplate),
retryTemplate
));
return factory;
}
Verify kết quả: Chạy consumer, gửi một event có dữ liệu không hợp lệ (ví dụ: thiếu trường bắt buộc), sau đó check topic DLQ.
kafka-console-consumer --bootstrap-server localhost:9092 --topic order-dlq --from-beginning --property print.key=true --property print.value=true --property print.headers=true
Giám sát hiệu suất với Kafka Consumer Lag
Cấu hình và sử dụng Kafka Lag Monitor
Consumer Lag là số lượng message chưa được xử lý giữa Offset hiện tại của Consumer và Offset cuối cùng của Topic. Đây là chỉ số quan trọng nhất để đánh giá sức khỏe hệ thống Event Sourcing.
Bước 1: Cài đặt công cụ giám sát Lag (kafka-lag-monitor) hoặc sử dụng command line.
Mục đích: Đo lường độ trễ xử lý theo thời gian thực để phát hiện nghẽn cổ chai.
Kết quả mong đợi: Xuất ra bảng hiển thị lag của từng partition.
# Cài đặt kafka-lag-monitor (nếu chưa có)
# Hoặc sử dụng script shell đơn giản để check lag thủ công
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic orders | awk -F: '{print $1, $2}' > /tmp/end_offsets.txt
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --group order-consumer-group --topic orders | awk -F: '{print $1, $2}' > /tmp/current_offsets.txt
paste /tmp/end_offsets.txt /tmp/current_offsets.txt | awk '{print "Partition "$1" Lag: "$2-$3}'
Bước 2: Tích hợp metric Lag vào Prometheus/Grafana (cho môi trường Production).
Mục đích: Tự động cảnh báo khi lag vượt ngưỡng an toàn (ví dụ: > 10,000 events).
Kết quả mong đợi: Grafana hiển thị biểu đồ Lag tăng dần nếu Consumer quá tải.
# Thêm vào file prometheus.yml
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['localhost:9093'] # JMX Exporter port
metrics_path: '/metrics'
scheme: http
Bước 3: Điều chỉnh số lượng instance Consumer dựa trên Lag.
Mục đích: Nếu Lag tăng cao và CPU thấp, thêm instance Consumer. Nếu CPU cao, tối ưu code xử lý.
Kết quả mong đợi: Lag giảm xuống về 0 hoặc mức ổn định sau khi scale-out.
# Kiểm tra số lượng partition hiện tại để đảm bảo không scale Consumer vượt quá số partition
kafka-topics --bootstrap-server localhost:9092 --describe --topic orders | grep "Partitions:"
Verify kết quả: Tạo tải giả (flood messages) vào topic, quan sát lag tăng lên, sau đó thêm instance consumer và quan sát lag giảm xuống.
# Flood test (gửi 1000 message nhanh)
for i in {1..1000}; do echo "{\"id\":$i,\"data\":\"test\"}" | kafka-console-producer --bootstrap-server localhost:9092 --topic orders; done
Tối ưu cấu hình Partition và Replication
Chiến lược đặt Partition cho Event Sourcing
Trong Event Sourcing, thứ tự sự kiện (ordering) là tuyệt đối quan trọng. Việc chọn sai số lượng partition có thể phá vỡ tính nhất quán của Aggregate.
Bước 1: Thiết lập Partitioner dựa trên Key (Aggregate ID).
Mục đích: Đảm bảo tất cả các event của cùng một Entity (ví dụ: Order #123) luôn nằm trong cùng một partition để giữ nguyên thứ tự thời gian.
Kết quả mong đợi: Các event của cùng một ID luôn được Consumer xử lý tuần tự, không bị race condition.
# Trong Producer (Java)
public void sendEvent(String aggregateId, String eventJson) {
// Key phải là ID của Aggregate để đảm bảo ordering
ProducerRecord<String, String> record = new ProducerRecord(
"orders",
aggregateId, // Key quan trọng nhất
eventJson
);
kafkaTemplate.send(record);
}
Bước 2: Tối ưu số lượng Partition và Replication Factor.
Mục đích: Balance giữa độ trễ (latency) và khả năng chịu lỗi (fault tolerance). Rule of thumb: Partition = Số lượng Consumer tối đa cần thiết; Replication = 3 cho Production.
Kết quả mong đợi: Topic có cấu hình phù hợp với mô hình scaling.
# Tạo topic với 12 partitions (cho 12 worker nodes tối đa) và 3 replicas
kafka-topics --bootstrap-server localhost:9092 --create --topic orders --partitions 12 --replication-factor 3 --config cleanup.policy=delete
Bước 3: Cấu hình tối ưu cho Broker (server.properties).
Mục đích: Giảm latency khi ghi event và tăng throughput đọc.
Kết quả mong đợi: Kafka ghi log nhanh hơn, giảm độ trễ khi commit transaction.
# File: /etc/kafka/server.properties
# Tăng bộ nhớ đệm cho log
log.segment.bytes=1073741824
# Giảm độ trễ commit (chờ 1 replica thay vì toàn bộ ISR cho write nhanh hơn, nhưng trade-off với durability)
num.replica.fetchers=4
# Tăng buffer cho producer
message.max.bytes=1000000
# Tối ưu network
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
Verify kết quả: Chạy benchmark đơn giản gửi 100k events và đo thời gian hoàn thành.
time (for i in {1..100000}; do echo "{\"id\":$i}" | kafka-console-producer --bootstrap-server localhost:9092 --topic orders; done)
Xử lý xung đột dữ liệu trong Event Sourcing
Case Study: Giải quyết Race Condition khi cập nhật trạng thái
Vấn đề: Hai consumer xử lý event cho cùng một Order cùng lúc, dẫn đến trạng thái cuối cùng bị ghi đè (Lost Update).
Bước 1: Áp dụng cơ chế Optimistic Concurrency Control (OCC) với Versioning.
Mục đích: Mỗi event phải mang theo version của Aggregate tại thời điểm nó được tạo. Khi lưu vào DB, kiểm tra version hiện tại có khớp không.
Kết quả mong đợi: Nếu version không khớp, transaction bị rollback và event được đưa vào DLQ để xử lý lại sau (Retry).
-- SQL: Tạo bảng Aggregate State với cột version
CREATE TABLE order_state (
order_id VARCHAR(50) PRIMARY KEY,
status VARCHAR(20),
current_version INT NOT NULL,
payload JSONB
);
-- SQL: Cập nhật có điều kiện (Optimistic Locking)
UPDATE order_state
SET status = 'PAID',
current_version = current_version + 1,
payload = '{"event": "Paid", "amount": 100}'
WHERE order_id = 'ORDER-123'
AND current_version = 5; -- Kiểm tra version từ event
Bước 2: Xử lý Conflict khi Update trả về 0 rows.
Mục đích: Khi DB báo không cập nhật được, nghĩa là có xung đột. Lúc này cần lấy lại state mới nhất, tái tạo (rebuild) từ Event Store, và apply lại event.
Kết quả mong đợi: Hệ thống tự động hòa giải xung đột mà không mất dữ liệu.
// Logic trong Consumer (Java)
public void processEvent(OrderEvent event) {
int expectedVersion = event.getVersion();
// Bước 1: Thử update với version hiện tại
int rowsAffected = orderRepository.updateOrder(
event.getOrderId(),
event.getStatus(),
expectedVersion + 1,
event.getPayload()
);
// Bước 2: Xử lý xung đột
if (rowsAffected == 0) {
// Lấy lại state mới nhất từ DB
OrderState currentState = orderRepository.findById(event.getOrderId());
// Kiểm tra xem version mới có khớp không (tránh infinite loop)
if (currentState.getVersion() > expectedVersion) {
// Có xung đột: Cần tái tạo state từ Event Store
// Logic: Lấy toàn bộ events của Order này từ Kafka/DB -> Apply lại -> Apply event hiện tại
rebuildAndApply(event, currentState);
} else {
throw new ConcurrencyException("Version mismatch, retry failed");
}
}
}
Bước 3: Sử dụng Transactional Producer để đảm bảo Atomicity.
Mục đích: Đảm bảo việc ghi Event vào Kafka và cập nhật DB (nếu cần) diễn ra đồng bộ hoặc rollback cả hai.
Kết quả mong đợi: Không có trường hợp Event được ghi vào Kafka nhưng DB không cập nhật (hoặc ngược lại).
// Cấu hình TransactionalProducerFactory trong Spring Kafka
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Bật transaction
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-tx-id");
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory(configProps);
}
Verify kết quả: Chạy 2 instances của Consumer cùng lúc xử lý event cho cùng một Order ID, quan sát log và kiểm tra DB xem version có tăng đúng thứ tự hay không.
SELECT order_id, status, current_version FROM order_state WHERE order_id = 'ORDER-123';
Điều hướng series:
Mục lục: Series: Triển khai Database Event Sourcing với Apache Kafka và Ubuntu 24.04
« Phần 6: Xây dựng Consumer và tái tạo trạng thái (Replay)