1. Lựa chọn Database cho Event Store
Trong kiến trúc Event Sourcing, yêu cầu cốt lõi là ghi (append-only) cực nhanh và đọc theo chuỗi sự kiện (stream) hiệu quả. PostgreSQL được lựa chọn vì tính ACID mạnh mẽ, khả năng xử lý transaction và hỗ trợ JSONB linh hoạt cho metadata, đảm bảo tính toàn vẹn dữ liệu tuyệt đối khi lưu trữ Event Stream.
Chúng ta sẽ sử dụng PostgreSQL 16 (mặc định trên Ubuntu 24.04) với extension pg_trgm để tối ưu hóa việc tìm kiếm và uuid-ossp để tạo ID sự kiện phân tán (Distributed UUID).
Cài đặt PostgreSQL và các extension cần thiết:
sudo apt update
sudo apt install -y postgresql postgresql-contrib postgresql-client
sudo systemctl enable postgresql
sudo systemctl start postgresql
Kết quả mong đợi: Dịch vụ PostgreSQL chạy và sẵn sàng chấp nhận kết nối trên port 5432.
2. Thiết kế bảng lưu trữ sự kiện (Event Store)
Bảng event_store sẽ đóng vai trò là nguồn chân lý (Source of Truth). Thiết kế bảng cần tối ưu cho thao tác INSERT liên tục và SELECT theo stream_id kết hợp position.
Cấu trúc bảng bao gồm:
id: UUID chính xác cho từng sự kiện.
stream_id: Khóa để nhóm các sự kiện của cùng một Aggregate (ví dụ: Order-123).
event_type: Tên loại sự kiện (ví dụ: OrderCreated, OrderPaid).
metadata: JSONB lưu trữ các thuộc tính động, timestamp, correlation_id.
data: JSONB lưu trữ payload thực tế của sự kiện.
version: Số thứ tự trong stream (position) để phát hiện conflict (Optimistic Locking).
Chạy lệnh SQL sau để tạo database, extension và bảng:
sudo -u postgres psql -c "CREATE DATABASE event_store_db;"
sudo -u postgres psql -d event_store_db -c "CREATE EXTENSION IF NOT EXISTS 'uuid-ossp';"
sudo -u postgres psql -d event_store_db -c "CREATE EXTENSION IF NOT EXISTS 'pg_trgm';"
Kết quả mong đợi: Database event_store_db được tạo thành công cùng 2 extension.
Tạo bảng lưu trữ sự kiện với cấu trúc tối ưu:
sudo -u postgres psql -d event_store_db -c "
CREATE TABLE event_store (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
stream_id VARCHAR(255) NOT NULL,
event_type VARCHAR(100) NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT unique_event_in_stream UNIQUE (stream_id, version)
);"
Kết quả mong đợi: Bảng event_store được tạo với ràng buộc duy nhất đảm bảo không có hai sự kiện cùng version trong một stream.
2.1. Tạo Index để tối ưu hóa truy vấn
Trong Event Sourcing, hai pattern truy vấn phổ biến nhất là: (1) Lấy toàn bộ lịch sử của một Aggregate (bằng stream_id), và (2) Lấy toàn bộ sự kiện trong một khoảng thời gian (bằng created_at).
Chúng ta cần tạo chỉ mục composite để tối ưu hóa việc đọc theo stream_id và version (sắp xếp thời gian):
sudo -u postgres psql -d event_store_db -c "
CREATE INDEX idx_event_store_stream_id ON event_store (stream_id, version);
CREATE INDEX idx_event_store_created_at ON event_store (created_at);
CREATE INDEX idx_event_store_event_type ON event_store (event_type);"
Kết quả mong đợi: Ba chỉ mục được tạo. Index idx_event_store_stream_id là quan trọng nhất, giúp việc tái tạo trạng thái (Replay) của một Aggregate đạt hiệu suất O(log N).
Tạo GIN index cho trường JSONB metadata để hỗ trợ tìm kiếm nhanh theo các attribute bên trong (ví dụ: tìm sự kiện có correlation_id cụ thể):
sudo -u postgres psql -d event_store_db -c "
CREATE INDEX idx_event_store_metadata ON event_store USING GIN (metadata jsonb_path_ops);"
Kết quả mong đợi: Index GIN được tạo, hỗ trợ truy vấn JSONB hiệu quả.
3. Tích hợp Producer với Database
Bước này mô phỏng hành vi của Producer sau khi đã publish sự kiện lên Kafka. Producer cần lưu log sự kiện vào Database để đảm bảo tính nhất quán (Dual Write Pattern) hoặc sử dụng Database làm backup cho Kafka.
Chúng ta sẽ tạo một script Python đơn giản sử dụng thư viện psycopg2 để thực hiện việc này. Script này sẽ nhận dữ liệu sự kiện, tạo ID, xác định version hiện tại của stream và INSERT vào DB.
Trước tiên, cài đặt thư viện Python cần thiết:
sudo apt install -y python3-pip python3-venv
mkdir -p /opt/event-sourcing && cd /opt/event-sourcing
python3 -m venv venv
source venv/bin/activate
pip install psycopg2-binary
Kết quả mong đợi: Môi trường ảo Python được tạo và thư viện psycopg2 được cài đặt.
Tạo file cấu hình kết nối database (/opt/event-sourcing/config.py):
cat > /opt/event-sourcing/config.py
Kết quả mong đợi: File cấu hình được tạo. Lưu ý: Trong môi trường production thực tế, hãy sử dụng biến môi trường hoặc Secret Manager thay vì hardcode password.
Tạo script tích hợp lưu sự kiện (/opt/event-sourcing/save_event.py):
cat > /opt/event-sourcing/save_event.py
Kết quả mong đợi: Script save_event.py được tạo với logic xác định version tự động và lưu sự kiện.
3.1. Chạy thử nghiệm và Verify
Chạy script để lưu sự kiện đầu tiên vào database:
cd /opt/event-sourcing
python3 save_event.py
Kết quả mong đợi: Xuất hiện thông báo Event saved successfully. Stream: order-1001, Version: 0.
Chạy thêm một lần nữa để lưu sự kiện thứ hai cho cùng stream (ví dụ: OrderPaid):
python3 -c "
from save_event import save_event
save_event('order-1001', 'OrderPaid', {'payment_method': 'credit_card', 'status': 'paid'}, {'correlation_id': 'corr-xyz-123'})
"
Kết quả mong đợi: Thông báo Event saved successfully. Stream: order-1001, Version: 1.
Verify dữ liệu trong PostgreSQL để đảm bảo cấu trúc bảng, index và dữ liệu JSONB đúng:
sudo -u postgres psql -d event_store_db -c "
SELECT id, stream_id, event_type, version, created_at, data->>'order_id' as order_id
FROM event_store
WHERE stream_id = 'order-1001'
ORDER BY version ASC;"
Kết quả mong đợi: Xuất hiện 2 dòng dữ liệu với version 0 và 1, sắp xếp theo thứ tự thời gian, và trường order_id được trích xuất từ JSONB.
Kiểm tra hiệu suất của index bằng lệnh EXPLAIN ANALYZE:
sudo -u postgres psql -d event_store_db -c "
EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF)
SELECT * FROM event_store WHERE stream_id = 'order-1001' ORDER BY version;"
Kết quả mong đợi: Output hiển thị Index Scan using idx_event_store_stream_id on event_store, chứng tỏ index đã được sử dụng để truy vấn nhanh.
4. Kiểm tra tính toàn vẹn và xử lý lỗi
Để đảm bảo cơ chế Event Store hoạt động đúng, chúng ta cần thử nghiệm trường hợp lỗi khi cố gắng ghi đè một sự kiện đã tồn tại (duplicate version) hoặc ghi sự kiện bị nhảy version (gap version).
Thử nghiệm ghi đè sự kiện version 0 (lỗi do ràng buộc UNIQUE):
python3 -c "
from save_event import save_event
# Cố gắng lưu lại version 0 (đã tồn tại)
try:
# Giả lập bằng cách gọi hàm với logic version cố định (ở đây script tự tính nên cần hack test)
# Thực tế: Nếu code logic tính version sai, DB sẽ chặn nhờ constraint UNIQUE
import psycopg2
from config import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD
conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD)
cur = conn.cursor()
cur.execute(\"\"\"
INSERT INTO event_store (id, stream_id, event_type, metadata, data, version)
VALUES ('00000000-0000-0000-0000-000000000001', 'order-1001', 'OrderCreated', '{}', '{}', 0)
\"\"\")
conn.commit()
except Exception as e:
print(f'Expected Error (Duplicate Key): {e}')
conn.rollback()
conn.close()
"
Kết quả mong đợi: Lỗi ERROR: duplicate key value violates unique constraint "unique_event_in_stream". Điều này chứng tỏ Database đã bảo vệ dữ liệu khỏi bị ghi đè.
Xóa dữ liệu test để làm sạch môi trường sau khi hoàn thành bài hướng dẫn:
sudo -u postgres psql -d event_store_db -c "DELETE FROM event_store WHERE stream_id = 'order-1001';"
Kết quả mong đợi: Dòng lệnh trả về DELETE 2, xác nhận dữ liệu test đã được xóa.
Điều hướng series:
Mục lục: Series: Triển khai Database Event Sourcing với Apache Kafka và Ubuntu 24.04
« Phần 4: Xây dựng ứng dụng Producer với mô hình Event Sourcing
Phần 6: Xây dựng Consumer và tái tạo trạng thái (Replay) »