Cơ chế tạo Snapshot định kỳ để giảm tải khi đọc
Snapshotting là kỹ thuật lưu trữ trạng thái hiện tại của Aggregate tại một điểm thời gian cụ thể. Thay vì phải replay toàn bộ lịch sử event từ đầu (0) đến hiện tại (N) để tính toán state, hệ thống chỉ cần load Snapshot tại điểm M và replay các event từ M+1 đến N. Điều này giảm đáng kể chi phí CPU và thời gian I/O.
Chúng ta sẽ cấu hình một cơ chế tự động tạo Snapshot sau mỗi 100 sự kiện (Event) cho một Aggregate. Logic này sẽ được triển khai dưới dạng một service chạy trên Ubuntu 24.04, lưu Snapshot vào Redis (Key-Value Store) để truy xuất nhanh.
Đầu tiên, cài đặt Redis Server và khởi động service:
sudo apt update && sudo apt install redis-server -y
sudo systemctl enable redis-server
sudo systemctl start redis-server
Kết quả mong đợi: Redis chạy trên port 6379, có thể kết nối qua `redis-cli ping` trả về "PONG".
Tiếp theo, tạo thư mục chứa script Snapshot Generator. Script này sẽ đọc Event Store, tính toán state, và lưu vào Redis khi đạt ngưỡng.
sudo mkdir -p /opt/cqrs-app/snapshot-service
sudo nano /opt/cqrs-app/snapshot-service/config.yaml
Dưới đây là nội dung file cấu hình hoàn chỉnh cho cơ chế Snapshot:
snapshot:
enabled: true
interval_events: 100
storage:
type: redis
host: 127.0.0.1
port: 6379
db: 0
key_prefix: "snapshot:"
compression: gzip
retention_days: 30
Kết quả mong đợi: File config.yaml được lưu tại đường dẫn trên, định dạng YAML chuẩn.
Bây giờ, triển khai script Python để thực thi logic tạo Snapshot. Script này giả lập việc đọc events và áp dụng logic business để tạo state mới.
sudo nano /opt/cqrs-app/snapshot-service/snapshot_generator.py
Nội dung hoàn chỉnh của file script:
#!/usr/bin/env python3
import redis
import gzip
import json
import os
import yaml
# Load config
with open('/opt/cqrs-app/snapshot-service/config.yaml', 'r') as f:
config = yaml.safe_load(f)
redis_client = redis.Redis(
host=config['snapshot']['storage']['host'],
port=config['snapshot']['storage']['port'],
db=config['snapshot']['storage']['db'],
decode_responses=True
)
def calculate_state_from_events(aggregate_id, events):
"""
Logic business: Replay events để tính state.
Đây là phần cần implement theo nghiệp vụ cụ thể của bạn.
"""
state = {"id": aggregate_id, "version": 0, "data": {}}
for event in events:
# Giả lập áp dụng event
state['version'] = event.get('version', 0)
state['data'].update(event.get('data', {}))
return state
def create_snapshot(aggregate_id, events, snapshot_version):
state = calculate_state_from_events(aggregate_id, events)
state['snapshot_version'] = snapshot_version
state['snapshot_at'] = events[-1].get('timestamp', 'now')
# Serialize
json_str = json.dumps(state)
# Compress
compressed = gzip.compress(json_str.encode('utf-8'))
# Save to Redis
key = f"{config['snapshot']['storage']['key_prefix']}{aggregate_id}:{snapshot_version}"
redis_client.setex(key, config['snapshot']['retention_days'] * 86400, compressed)
print(f"[INFO] Snapshot created for {aggregate_id} at version {snapshot_version}")
if __name__ == "__main__":
# Giả lập trigger từ Event Bus hoặc Cron
# Trong thực tế, hàm này được gọi khi version % interval == 0
test_aggregate = "ORDER-12345"
test_events = [{"version": i, "type": "OrderPlaced", "data": {"item": f"Item-{i}"}} for i in range(1, 101)]
create_snapshot(test_aggregate, test_events, 100)
Kết quả mong đợi: Script chạy thành công, tạo ra key Redis với giá trị đã nén.
Thiết lập quyền thực thi và cấu hình Cron để chạy định kỳ (ví dụ: mỗi 5 phút) để quét các Aggregate cần snapshot:
sudo chmod +x /opt/cqrs-app/snapshot-service/snapshot_generator.py
(crontab -u root 2>/dev/null; echo "*/5 * * * * /usr/bin/python3 /opt/cqrs-app/snapshot-service/snapshot_generator.py >> /var/log/cqrs/snapshot.log 2>&1") | crontab -
Kết quả mong đợi: Cron job được thêm vào, log file được tạo khi script chạy.
Verify kết quả bằng cách kiểm tra key trong Redis:
redis-cli keys "snapshot:*"
redis-cli get "snapshot:ORDER-12345:100"
Kết quả mong đợi: Trả về danh sách key snapshot và dữ liệu đã nén (byte string).
Triển khai logic tái cấu trúc (Replay) từ Snapshot đến trạng thái hiện tại
Logic Replay là bước quan trọng nhất để đảm bảo tính nhất quán. Khi một Aggregate được yêu cầu, hệ thống phải: (1) Tìm Snapshot mới nhất, (2) Load state từ Snapshot, (3) Tìm các Event mới hơn Snapshot trong Event Store, (4) Áp dụng lần lượt các Event mới này vào state đã load.
Chúng ta sẽ tạo một service "State Rebuilder" để minh họa quy trình này. Service này nhận Aggregate ID và trả về state hiện tại.
Tạo file script Rebuilder:
sudo nano /opt/cqrs-app/snapshot-service/state_rebuilder.py
Nội dung hoàn chỉnh của file script:
#!/usr/bin/env python3
import redis
import gzip
import json
import yaml
from datetime import datetime
# Load config
with open('/opt/cqrs-app/snapshot-service/config.yaml', 'r') as f:
config = yaml.safe_load(f)
redis_client = redis.Redis(
host=config['snapshot']['storage']['host'],
port=config['snapshot']['storage']['port'],
db=config['snapshot']['storage']['db'],
decode_responses=True
)
# Giả lập Event Store (trong thực tế là Database SQL/NoSQL)
def fetch_events_after(aggregate_id, from_version):
"""
Lấy danh sách event có version > from_version.
Giả lập: Trả về events từ 101 đến 120.
"""
# Trong thực tế: SELECT * FROM events WHERE aggregate_id = ? AND version > ?
events = []
for i in range(from_version + 1, from_version + 21):
events.append({
"version": i,
"type": "OrderStatusChanged",
"data": {"status": "Shipped", "timestamp": datetime.now().isoformat()}
})
return events
def load_snapshot(aggregate_id, snapshot_version):
key = f"{config['snapshot']['storage']['key_prefix']}{aggregate_id}:{snapshot_version}"
data = redis_client.get(key)
if not data:
return None
# Decompress
decompressed = gzip.decompress(data.encode('utf-8'))
return json.loads(decompressed)
def rebuild_state(aggregate_id):
# 1. Tìm snapshot mới nhất (giả lập logic tìm max version)
# Trong thực tế: Query Redis hoặc DB để tìm snapshot version cao nhất
snapshot_version = 100
state = load_snapshot(aggregate_id, snapshot_version)
if not state:
# Fallback: Replay từ đầu (version 0)
state = {"id": aggregate_id, "version": 0, "data": {}}
start_version = 0
else:
start_version = state.get('version', 0)
print(f"[DEBUG] Loaded snapshot at version {start_version} for {aggregate_id}")
# 2. Fetch events mới hơn snapshot
new_events = fetch_events_after(aggregate_id, start_version)
print(f"[DEBUG] Found {len(new_events)} new events to replay")
# 3. Replay events
for event in new_events:
# Logic apply event tương tự như trong snapshot_generator
state['version'] = event['version']
state['data'].update(event['data'])
state['last_event'] = event['type']
return state
if __name__ == "__main__":
final_state = rebuild_state("ORDER-12345")
print(f"[SUCCESS] Final State: {json.dumps(final_state, indent=2)}")
Kết quả mong đợi: Script in ra log chi tiết về việc load snapshot, số lượng event cần replay và state cuối cùng.
Chạy thử script để verify:
sudo chmod +x /opt/cqrs-app/snapshot-service/state_rebuilder.py
/usr/bin/python3 /opt/cqrs-app/snapshot-service/state_rebuilder.py
Kết quả mong đợi: In ra JSON của state với version = 120 (100 từ snapshot + 20 từ replay).
Tối ưu hiệu năng lưu trữ Snapshot trên Ubuntu
Việc lưu trữ Snapshot hiệu quả trên Ubuntu đòi hỏi tối ưu cả về cấu hình Redis, dung lượng Disk và chính sách làm sạch (GC) để tránh tràn ổ cứng. Chúng ta sẽ tập trung vào việc nén dữ liệu (đã làm ở phần trước) và cấu hình giới hạn bộ nhớ cho Redis.
Cấu hình file redis.conf để giới hạn bộ nhớ và chính sách xóa khi đầy:
sudo nano /etc/redis/redis.conf
Tìm và sửa các dòng sau (hoặc thêm vào cuối file nếu chưa có):
# Giới hạn bộ nhớ tối đa (ví dụ: 512MB cho Snapshot DB)
maxmemory 512mb
# Chính sách xóa khi hết bộ nhớ: xóa các key ít được dùng nhất (volatile-lru)
# Chỉ xóa các key có set TTL (expire time), phù hợp với snapshot có retention_days
maxmemory-policy volatile-lru
# Bật AOF để đảm bảo persistence
appendonly yes
appendfsync everysec
Kết quả mong đợi: Redis sẽ tự động xóa các snapshot cũ (đã hết TTL) khi bộ nhớ đạt 512MB, đảm bảo service không bị crash.
Tối ưu thêm bằng cách sử dụng ZFS hoặc LVM với compression nếu lưu Snapshot trên Filesystem thay vì Redis (tùy chọn nâng cao). Ở đây ta dùng Redis nên cần cấu hình cron để xóa các key quá hạn thủ công nếu Redis không làm tốt:
sudo nano /opt/cqrs-app/snapshot-service/cleanup_old_snapshots.sh
Nội dung script cleanup:
#!/bin/bash
# Script để xóa các snapshot đã hết hạn (giả lập logic kiểm tra TTL)
# Redis tự động xóa dựa trên TTL, script này dùng để log hoặc xử lý backup
redis-cli KEYS "snapshot:*" | while read key; do
# Kiểm tra TTL
ttl=$(redis-cli TTL "$key")
if [ "$ttl" -lt 0 ]; then
echo "Removing expired snapshot: $key"
redis-cli DEL "$key"
fi
done
Cấu hình cron để chạy cleanup mỗi ngày:
sudo chmod +x /opt/cqrs-app/snapshot-service/cleanup_old_snapshots.sh
(crontab -u root 2>/dev/null; echo "0 2 * * * /opt/cqrs-app/snapshot-service/cleanup_old_snapshots.sh >> /var/log/cqrs/cleanup.log 2>&1") | crontab -
Kết quả mong đợi: Script cleanup chạy lúc 2h sáng mỗi ngày, log các key đã xóa.
Verify hiệu năng bằng cách kiểm tra memory usage của Redis:
redis-cli INFO memory | grep used_memory_human
Kết quả mong đợi: Memory usage ổn định dưới mức maxmemory đã cấu hình.
Xử lý các vấn đề về phiên bản Snapshot khi upgrade
Khi upgrade ứng dụng (ví dụ: thêm trường mới vào Aggregate), các Snapshot cũ có thể không tương thích với logic business mới. Nếu không xử lý, việc Replay sẽ bị lỗi hoặc state bị sai lệch. Giải pháp là sử dụng cơ chế Migration Snapshot.
Chiến lược: Khi load một Snapshot cũ, kiểm tra `schema_version`. Nếu nhỏ hơn `current_version`, chạy hàm migration để update state trước khi Replay các event mới.
Cập nhật file cấu hình để theo dõi version schema:
sudo nano /opt/cqrs-app/snapshot-service/config.yaml
Thêm phần version vào file config (nội dung cập nhật hoàn chỉnh):
snapshot:
enabled: true
interval_events: 100
storage:
type: redis
host: 127.0.0.1
port: 6379
db: 0
key_prefix: "snapshot:"
compression: gzip
retention_days: 30
current_schema_version: 2
Cập nhật script state_rebuilder.py để xử lý migration:
sudo nano /opt/cqrs-app/snapshot-service/state_rebuilder.py
Thay thế hàm `load_snapshot` và thêm hàm `migrate_snapshot` (chỉ hiển thị phần thay đổi quan trọng):
...
# Thêm import thêm
import copy
def migrate_snapshot(state, from_version, to_version):
"""
Logic migration: Cập nhật state từ schema cũ sang mới.
"""
if from_version < 2 and to_version >= 2:
# Migration V1 -> V2: Thêm trường 'total_amount' mặc định là 0
if 'total_amount' not in state:
state['total_amount'] = 0
print(f"[MIGRATION] Upgraded snapshot {state['id']} from V1 to V2")
# Lưu lại state đã migrate (nếu cần lưu lại snapshot mới)
return state
def load_snapshot(aggregate_id, snapshot_version):
key = f"{config['snapshot']['storage']['key_prefix']}{aggregate_id}:{snapshot_version}"
data = redis_client.get(key)
if not data:
return None
decompressed = gzip.decompress(data.encode('utf-8'))
state = json.loads(decompressed)
# Kiểm tra version schema trong state (giả lập state có trường 'schema_version')
state_schema = state.get('schema_version', 1)
current_schema = config['snapshot']['current_schema_version']
if state_schema < current_schema:
print(f"[WARN] Snapshot {aggregate_id} has old schema version {state_schema}. Migrating...")
state = migrate_snapshot(state, state_schema, current_schema)
state['schema_version'] = current_schema
# Optional: Lưu lại snapshot đã migrate để lần sau load nhanh hơn
# redis_client.setex(key, ... state mới ...)
return state
...
Kết quả mong đợi: Khi load snapshot cũ, script tự động thêm các trường mới cần thiết trước khi tính toán state.
Tạo script kiểm tra (dry-run) để đảm bảo migration chạy đúng trước khi deploy:
sudo nano /opt/cqrs-app/snapshot-service/test_migration.py
Nội dung script test:
#!/usr/bin/env python3
import json
# Giả lập snapshot cũ (V1)
old_snapshot_data = {
"id": "ORDER-OLD",
"version": 50,
"schema_version": 1,
"data": {"item": "OldItem"}
}
# Import logic migrate từ file chính (hoặc copy lại logic)
# Ở đây giả định logic đã có sẵn
def migrate_snapshot(state, from_version, to_version):
if from_version < 2 and to_version >= 2:
if 'total_amount' not in state:
state['total_amount'] = 0
print("Migration V1->V2 applied: Added total_amount")
return state
# Test
print("Before migration:", json.dumps(old_snapshot_data))
migrated = migrate_snapshot(old_snapshot_data, 1, 2)
print("After migration:", json.dumps(migrated))
Chạy test:
sudo chmod +x /opt/cqrs-app/snapshot-service/test_migration.py
/usr/bin/python3 /opt/cqrs-app/snapshot-service/test_migration.py
Kết quả mong đợi: In ra state với trường `total_amount` mới được thêm vào, chứng tỏ migration hoạt động.
Verify kết quả cuối cùng bằng cách chạy lại state_rebuilder với một aggregate giả lập có snapshot cũ:
/usr/bin/python3 /opt/cqrs-app/snapshot-service/state_rebuilder.py
Kết quả mong đợi: Log hiển thị thông báo migration và state cuối cùng chứa đầy đủ các trường mới.
Điều hướng series:
Mục lục: Series: Triển khai Database CQRS với Event Sourcing và Ubuntu 24.04
« Phần 6: Cấu hình Event Bus và đồng bộ hóa dữ liệu
Phần 8: Bảo mật, giám sát và tối ưu hóa hệ thống »