Triển khai và Cấu hình Message Broker (RabbitMQ)
Cài đặt RabbitMQ trên Ubuntu 24.04
Chúng ta sẽ sử dụng RabbitMQ làm Event Bus vì tính ổn định và cơ chế xác nhận (acknowledgement) mạnh mẽ, phù hợp cho mô hình CQRS yêu cầu độ tin cậy cao trong việc phân phối sự kiện.
Cài đặt RabbitMQ từ kho chính thức của Canonical để đảm bảo tương thích tốt nhất với Ubuntu 24.04.
sudo apt update
sudo apt install rabbitmq-server -y
sudo systemctl enable --now rabbitmq-server
Kết quả mong đợi: Dịch vụ RabbitMQ khởi động thành công và lắng nghe trên cổng 5672 (AMQP) và 15672 (Management UI).
Khởi tạo Management Plugin và Tạo User
Bật plugin quản lý để có thể cấu hình Queue/Exchange qua giao diện web hoặc API, đồng thời tạo user với quyền hạn cần thiết cho ứng dụng.
sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmqctl add_user eventbus eventbus_pass
sudo rabbitmqctl set_permissions -p / eventbus ".*" ".*" ".*"
sudo rabbitmqctl set_user_tags eventbus administrator
Kết quả mong đợi: User 'eventbus' được tạo, có quyền truy cập toàn bộ vhost mặc định '/', và bạn có thể truy cập http://localhost:15672.
Cấu hình Vhost và Exchange cho Event Sourcing
Tách biệt môi trường sản xuất bằng cách tạo vhost riêng. Sau đó, tạo một Exchange loại 'topic' để phân phối sự kiện dựa trên pattern matching, cho phép nhiều Read Model (Projection) lắng nghe cùng một luồng sự kiện.
sudo rabbitmqctl add_vhost eventstore
sudo rabbitmqctl set_permissions -p eventstore eventbus ".*" ".*" ".*"
Kết quả mong đợi: Vhost 'eventstore' được tạo. Exchange sẽ được tạo động bởi Command Side khi phát sự kiện, nhưng ta sẽ cấu hình sẵn Queue bên dưới.
Cấu hình Topic/Queue và Đồng bộ hóa Dữ liệu
Thiết lập Queue cho Read Models (Projection)
Trong CQRS, mỗi Read Model (ví dụ: OrderView, InventoryView) cần một Queue riêng để xử lý sự kiện liên quan. Điều này giúp các Projection hoạt động độc lập, tránh tắc nghẽn khi một Projection gặp lỗi.
Tạo các Queue cụ thể cho từng loại sự kiện hoặc từng Projection.
sudo rabbitmqadmin declare queue name="order-events" durable=true vhost=eventstore
sudo rabbitmqadmin declare queue name="inventory-events" durable=true vhost=eventstore
Kết quả mong đợi: Hai Queue 'order-events' và 'inventory-events' được tạo với tính chất 'durable' (bền vững), đảm bảo dữ liệu không mất khi Broker khởi động lại.
Cấu hình Binding giữa Exchange và Queue
Liên kết Exchange (nguồn phát sự kiện) với các Queue (đích nhận) thông qua Routing Key. Pattern 'event.order.*' sẽ khớp với tất cả sự kiện liên quan đến Order, cho phép mở rộng linh hoạt.
sudo rabbitmqadmin declare exchange name="event-exchange" type=topic vhost=eventstore
sudo rabbitmqadmin declare binding source=event-exchange destination=order-events routing_key="event.order.#" vhost=eventstore
sudo rabbitmqadmin declare binding source=event-exchange destination=inventory-events routing_key="event.inventory.#" vhost=eventstore
Kết quả mong đợi: Khi Command Side publish một sự kiện với routing key 'event.order.created', nó sẽ tự động được chuyển đến Queue 'order-events' mà không cần biết trước về sự tồn tại của Queue đó.
Xử lý Mất kết nối và Đảm bảo Tính nhất quán
Cấu hình Persistence (Durable Queue và Message)
Để đảm bảo không mất sự kiện khi Message Broker bị sập hoặc mất điện, chúng ta phải thiết lập tính chất 'durable' cho cả Queue và Message. Đây là yêu cầu bắt buộc trong Event Sourcing.
Trong code ứng dụng (Command Side), khi publish sự kiện, phải thiết lập DeliveryMode = 2 (Persistent).
# Ví dụ cấu hình trong ứng dụng .NET (RabbitMQ.Client) hoặc Java
# Đảm bảo Queue được tạo với durable=true
# Đảm bảo Message được publish với deliveryMode=2
Kết quả mong đợi: Các sự kiện được lưu vào disk của RabbitMQ. Nếu Broker restart, các sự kiện chưa được xử lý vẫn còn nguyên vẹn trong Queue.
Cơ chế Acknowledgement (Manual Ack)
Sử dụng cơ chế 'Manual Acknowledgement' thay vì 'Auto Ack'. Consumer chỉ gửi tín hiệu xác nhận (ACK) khi Projection đã ghi dữ liệu thành công vào Read Database. Nếu quá trình ghi thất bại, Consumer gửi NACK để RabbitMQ đưa sự kiện lại đầu hàng đợi hoặc chuyển sang Dead Letter Queue.
Logic xử lý trong Projection Handler:
try {
// Xử lý sự kiện và ghi vào Read DB
readModel.SaveEvent(event);
channel.BasicAck(deliveryTag: eventTag, multiple: false);
} catch (Exception ex) {
// Ghi log lỗi
logger.Error(ex, "Projection failed");
// Gửi NACK để tái xử lý hoặc chuyển sang DLQ
channel.BasicNack(deliveryTag: eventTag, multiple: false, requeue: false);
}
Kết quả mong đợi: Đảm bảo tính nhất quán (Consistency) giữa Event Store và Read Model. Sự kiện chỉ được coi là "xử lý xong" khi dữ liệu đã nằm trong Read DB.
Triển khai Cơ chế Retry cho Projection bị lỗi
Thiết lập Dead Letter Exchange (DLX)
Để tránh việc một sự kiện lỗi bị lặp lại vô tận gây tắc nghẽn hệ thống, chúng ta cần cấu hình Dead Letter Queue. Khi một sự kiện bị lỗi quá số lần retry cho phép, nó sẽ được chuyển sang DLQ để con người kiểm tra hoặc xử lý thủ công.
Tạo Exchange và Queue để chứa các sự kiện lỗi.
sudo rabbitmqadmin declare exchange name="dlx-order" type=topic vhost=eventstore
sudo rabbitmqadmin declare queue name="order-events-dlq" durable=true vhost=eventstore
sudo rabbitmqadmin declare binding source=dlx-order destination=order-events-dlq routing_key="order.#" vhost=eventstore
Kết quả mong đợi: Một DLQ 'order-events-dlq' sẵn sàng nhận các sự kiện bị lỗi từ Queue chính.
Cấu hình Retry Policy trong Consumer
Trong code của Projection, thực hiện cơ chế Retry với số lần cố gắng cố định (ví dụ: 3 lần) trước khi từ bỏ. Mỗi lần retry thất bại, gửi NACK với requeue=true. Sau lần thứ 3, gửi NACK với requeue=false để sự kiện bị chuyển sang DLQ đã cấu hình ở trên.
Logic giả lập trong code Projection:
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
// Thử thực hiện Projection
processEvent(event);
channel.BasicAck(deliveryTag: eventTag, multiple: false);
break; // Thành công, thoát vòng lặp
} catch (Exception ex) {
retryCount++;
if (retryCount < maxRetries) {
// Chờ một chút trước khi retry (Exponential Backoff)
Thread.Sleep(Math.Pow(2, retryCount) * 1000);
// Requeue để thử lại
channel.BasicNack(deliveryTag: eventTag, multiple: false, requeue: true);
} else {
// Thất bại sau maxRetries, chuyển sang DLQ
channel.BasicNack(deliveryTag: eventTag, multiple: false, requeue: false);
logger.Warn("Event moved to DLQ after max retries");
}
}
}
Kết quả mong đợi: Các sự kiện gặp lỗi tạm thời (ví dụ: mất kết nối DB) sẽ được tự động phục hồi. Các sự kiện lỗi vĩnh viễn sẽ bị đẩy vào DLQ để không làm tắc nghẽn luồng xử lý chính.
Verify Kết quả cuối cùng
Thực hiện kiểm tra toàn bộ luồng đồng bộ hóa:
- Publish một sự kiện hợp lệ vào Exchange 'event-exchange' với routing key 'event.order.created'. Kiểm tra Queue 'order-events' xem sự kiện đã được tiêu thụ và ACK chưa.
- Publish một sự kiện giả lập lỗi (ví dụ: dữ liệu không hợp lệ) vào Exchange. Quan sát console log của Projection để thấy cơ chế Retry hoạt động (thử 3 lần).
- Sau 3 lần retry thất bại, kiểm tra Queue 'order-events-dlq' để xác nhận sự kiện đã được chuyển sang đây.
- Khởi động lại RabbitMQ service, sau đó kiểm tra lại Queue 'order-events' để đảm bảo các sự kiện chưa xử lý vẫn còn đó (Persistence).
sudo systemctl restart rabbitmq-server
sudo rabbitmqadmin list queues name messages vhost=eventstore
Kết quả mong đợi: Số lượng message trong các Queue vẫn được giữ nguyên sau khi restart, và các sự kiện lỗi đã nằm trong DLQ sau khi thử retry.
Điều hướng series:
Mục lục: Series: Triển khai Database CQRS với Event Sourcing và Ubuntu 24.04
« Phần 5: Triển khai Query Side và xây dựng Read Models
Phần 7: Triển khai Snapshotting và khôi phục trạng thái »