Thiết kế Domain Model và Aggregate Root
Bước đầu tiên là xác định Aggregate Root trong mô hình Event Sourcing. Aggregate Root đóng vai trò là cổng duy nhất để truy cập vào một tập hợp các đối tượng, đảm bảo tính nhất quán của dữ liệu.
Chúng ta sẽ tạo một ứng dụng Python đơn giản để quản lý tài khoản ngân hàng (Account). Mỗi lần thay đổi trạng thái (nạp tiền, rút tiền) sẽ tạo ra một sự kiện (Event), không phải cập nhật trực tiếp vào database.
Tạo thư mục dự án và file cấu trúc Domain Model:
mkdir -p /opt/event-sourcing-app/src/domain
cd /opt/event-sourcing-app/src/domain
touch __init__.py account.py
File account.py sẽ chứa định nghĩa Aggregate Root "Account" và danh sách các Event.
Viết nội dung đầy đủ cho file /opt/event-sourcing-app/src/domain/account.py:
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any, Optional
from enum import Enum
class AccountStatus(Enum):
ACTIVE = "ACTIVE"
FROZEN = "FROZEN"
@dataclass
class DomainEvent:
"""Cấu trúc cơ bản cho mọi sự kiện trong hệ thống."""
event_id: str
aggregate_id: str
event_type: str
timestamp: datetime
data: Dict[str, Any]
@dataclass
class AccountCreatedEvent(DomainEvent):
"""Sự kiện khi tài khoản được tạo mới."""
pass
@dataclass
class AccountDepositedEvent(DomainEvent):
"""Sự kiện khi tiền được nạp vào tài khoản."""
pass
@dataclass
class AccountWithdrewEvent(DomainEvent):
"""Sự kiện khi tiền được rút ra từ tài khoản."""
pass
@dataclass
class AccountFrozenEvent(DomainEvent):
"""Sự kiện khi tài khoản bị đóng băng."""
pass
class Account:
"""
Aggregate Root cho tài khoản ngân hàng.
Chứa logic nghiệp vụ và danh sách lịch sử sự kiện.
"""
def __init__(self, account_id: str, initial_balance: float = 0.0):
self.account_id = account_id
self.balance = initial_balance
self.status = AccountStatus.ACTIVE
self.version = 0
self.uncommitted_events: List[DomainEvent] = []
def create_account(self, account_id: str, initial_balance: float):
"""
Command: Tạo tài khoản mới.
Chỉ được gọi khi version == 0.
"""
if self.version != 0:
raise ValueError("Account already exists")
self.account_id = account_id
self.balance = initial_balance
self.version += 1
event = AccountCreatedEvent(
event_id=f"{account_id}-created-{self.version}",
aggregate_id=account_id,
event_type="AccountCreated",
timestamp=datetime.utcnow(),
data={"initial_balance": initial_balance}
)
self.uncommitted_events.append(event)
def deposit(self, amount: float):
"""
Command: Nạp tiền.
"""
if self.status == AccountStatus.FROZEN:
raise ValueError("Account is frozen")
if amount List[DomainEvent]:
"""Trả về danh sách sự kiện chưa được lưu vào Kafka."""
return self.uncommitted_events
def clear_uncommitted_events(self):
"""Xóa danh sách sự kiện sau khi đã publish thành công."""
self.uncommitted_events.clear()
def apply_event(self, event: DomainEvent):
"""
Replay: Áp dụng sự kiện để tái tạo trạng thái.
Dùng trong phần Consumer (Phần 6).
"""
if event.event_type == "AccountCreated":
self.balance = event.data["initial_balance"]
self.status = AccountStatus.ACTIVE
elif event.event_type == "AccountDeposited":
self.balance = event.data["new_balance"]
elif event.event_type == "AccountWithdrew":
self.balance = event.data["new_balance"]
elif event.event_type == "AccountFrozen":
self.status = AccountStatus.FROZEN
self.version = int(event.event_id.split('-')[-1])
Kết quả mong đợi: File Python hợp lệ, không lỗi cú pháp. Khi import, các class Account và DomainEvent đã sẵn sàng để sử dụng trong logic nghiệp vụ.
Verify kết quả thiết kế Domain
Chạy script kiểm tra nhanh để đảm bảo logic nghiệp vụ hoạt động trước khi tích hợp Kafka:
cd /opt/event-sourcing-app/src
python3 -c "
from domain.account import Account
acc = Account('ACC-001')
acc.create_account('ACC-001', 1000)
acc.deposit(500)
acc.withdraw(200)
print(f'State: Balance={acc.balance}, Version={acc.version}')
print(f'Events count: {len(acc.get_uncommitted_events())}')
for e in acc.get_uncommitted_events():
print(f' - {e.event_type}: {e.data}')
"
Kết quả mong đợi: Console in ra trạng thái Balance = 1300, Version = 3, và danh sách 3 sự kiện tương ứng.
Triển khai Kafka Producer và Publish Sự kiện
Phần này sẽ tích hợp Kafka Producer vào ứng dụng Python để gửi các sự kiện đã tạo từ Aggregate Root vào Kafka Topic.
Chúng ta sẽ sử dụng thư viện kafka-python (kafka-python-ng) hoặc confluent-kafka. Ở đây chọn kafka-python cho tính đơn giản và tương thích cao với môi trường Ubuntu 24.04 không cần cài thêm thư viện C.
Cài đặt thư viện Kafka Client cho Python:
pip install kafka-python
Tạo module Producer trong thư mục src:
mkdir -p /opt/event-sourcing-app/src/infrastructure
cd /opt/event-sourcing-app/src/infrastructure
touch kafka_producer.py
Viết nội dung file /opt/event-sourcing-app/src/infrastructure/kafka_producer.py:
from kafka import KafkaProducer
from kafka.errors import KafkaError
from typing import List, Dict, Any
import json
import logging
import time
# Cấu hình logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EventSourcingProducer:
def __init__(self, bootstrap_servers: str = "localhost:9092"):
"""
Khởi tạo Kafka Producer.
Cấu hình serializer để chuyển đổi đối tượng Python sang JSON.
"""
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else b'',
acks='all', # Chờ tất cả các follower xác nhận (Strong consistency)
retries=3,
retry_backoff_ms=100,
max_in_flight_requests_per_connection=1
)
logger.info(f"Kafka Producer connected to {bootstrap_servers}")
def publish_events(self, events: List[Dict[str, Any]], topic: str = "account-events"):
"""
Gửi danh sách sự kiện vào Kafka.
Sử dụng partition key là aggregate_id để đảm bảo thứ tự sự kiện cho cùng một tài khoản.
"""
if not events:
return
logger.info(f"Publishing {len(events)} events to topic '{topic}'...")
failed_events = []
for event in events:
try:
# Chuyển đối tượng Event thành dict để serialize
event_dict = {
"event_id": event.event_id,
"aggregate_id": event.aggregate_id,
"event_type": event.event_type,
"timestamp": event.timestamp.isoformat(),
"data": event.data,
"version": event.aggregate_id.split('-')[0] # Placeholder logic, thực tế lấy từ version
}
# Gửi sự kiện, key là aggregate_id
future = self.producer.send(topic, key=event.aggregate_id, value=event_dict)
# Đợi kết quả gửi (blocking) để đảm bảo thứ tự
metadata = future.get(timeout=10)
logger.debug(f"Event {event.event_id} sent to partition {metadata.partition}")
except Exception as e:
logger.error(f"Failed to send event {event.event_id}: {str(e)}")
failed_events.append(event)
if failed_events:
raise Exception(f"{len(failed_events)} events failed to send")
self.producer.flush()
logger.info(f"Successfully published {len(events)} events.")
def close(self):
self.producer.close()
logger.info("Kafka Producer closed.")
Kết quả mong đợi: Class EventSourcingProducer đã sẵn sàng, cấu hình acks='all' đảm bảo dữ liệu không bị mất.
Viết Script Demo để Test Producer
Tạo script chính để kích hoạt logic nghiệp vụ và gửi sự kiện:
touch /opt/event-sourcing-app/src/main.py
Viết nội dung file /opt/event-sourcing-app/src/main.py:
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from domain.account import Account
from infrastructure.kafka_producer import EventSourcingProducer
import uuid
def run_demo():
# Khởi tạo Producer
producer = EventSourcingProducer(bootstrap_servers="localhost:9092")
try:
# Tạo tài khoản mới
account_id = f"ACC-{uuid.uuid4().hex[:8].upper()}"
print(f"Creating account: {account_id}")
acc = Account(account_id)
acc.create_account(account_id, 1000.0)
# Thực hiện giao dịch
acc.deposit(500.0)
acc.withdraw(200.0)
# Lấy các sự kiện chưa commit
events = acc.get_uncommitted_events()
# Publish lên Kafka
producer.publish_events(events)
# Xóa bộ nhớ đệm sự kiện sau khi publish thành công
acc.clear_uncommitted_events()
print(f"Success! Account {account_id} state: Balance={acc.balance}, Version={acc.version}")
print("Events published to Kafka topic 'account-events'")
except Exception as e:
print(f"Error during demo: {e}")
# Logic xử lý lỗi sẽ được mở rộng ở phần sau
sys.exit(1)
finally:
producer.close()
if __name__ == "__main__":
run_demo()
Kết quả mong đợi: Script chạy thành công, in ra thông báo "Events published to Kafka topic 'account-events'".
Xử lý lỗi và Retry Logic khi gửi sự kiện
Trong môi trường sản xuất, Kafka có thể bị tạm thời không khả dụng (network timeout, leader election). Producer cần cơ chế Retry để đảm bảo sự kiện được gửi ít nhất một lần (At-least-once delivery).
Chúng ta sẽ mở rộng class Producer để thêm cơ chế Retry thông minh với Exponential Backoff.
Cập nhật file /opt/event-sourcing-app/src/infrastructure/kafka_producer.py để thêm hàm retry logic:
import time
import random
from typing import List, Dict, Any
import json
import logging
from kafka import KafkaProducer
from kafka.errors import KafkaError, TimeoutError as KafkaTimeoutError
# ... (Giữ nguyên phần import và class definition ban đầu, thêm phương thức mới dưới đây)
class EventSourcingProducer:
def __init__(self, bootstrap_servers: str = "localhost:9092", max_retries: int = 5):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else b'',
acks='all',
retries=0, # Tắt retry mặc định của Kafka để tự quản lý
request_timeout_ms=30000,
max_in_flight_requests_per_connection=1
)
self.max_retries = max_retries
logger.info(f"Kafka Producer initialized with max_retries={max_retries}")
def _exponential_backoff(self, attempt: int, base_delay: float = 1.0, max_delay: float = 30.0) -> float:
"""
Tính thời gian chờ theo hàm mũ: delay = base * (2 ^ attempt) + random jitter
"""
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
def publish_events_with_retry(self, events: List[Dict[str, Any]], topic: str = "account-events"):
"""
Gửi sự kiện với cơ chế Retry.
Nếu một sự kiện thất bại, chỉ retry sự kiện đó, không ảnh hưởng đến sự kiện khác.
"""
if not events:
return
logger.info(f"Attempting to publish {len(events)} events to '{topic}' with retry logic...")
pending_events = list(events)
max_attempts = self.max_retries
while pending_events:
attempt = 0
current_batch = list(pending_events)
failed_batch = []
for event in current_batch:
try:
event_dict = {
"event_id": event.event_id,
"aggregate_id": event.aggregate_id,
"event_type": event.event_type,
"timestamp": event.timestamp.isoformat(),
"data": event.data,
"version": event.aggregate_id.split('-')[0]
}
future = self.producer.send(topic, key=event.aggregate_id, value=event_dict)
metadata = future.get(timeout=10)
logger.debug(f"Attempt {attempt+1}: Event {event.event_id} sent successfully.")
except Exception as e:
logger.warning(f"Attempt {attempt+1}: Failed to send event {event.event_id} - {str(e)}")
failed_batch.append(event)
if not failed_batch:
break # Tất cả thành công
pending_events = failed_batch
attempt += 1
if attempt >= max_attempts:
# Đã vượt quá số lần retry tối đa
raise Exception(f"Failed to publish {len(pending_events)} events after {max_attempts} attempts. Events: {[e.event_id for e in pending_events]}")
# Tính toán thời gian chờ và sleep
wait_time = self._exponential_backoff(attempt)
logger.warning(f"Retrying {len(pending_events)} failed events in {wait_time:.2f} seconds...")
time.sleep(wait_time)
self.producer.flush()
logger.info(f"Successfully published all events after {attempt + 1} attempts.")
# ... (Giữ nguyên phương thức close)
Kết quả mong đợi: Producer giờ có khả năng tự động retry nếu gặp lỗi tạm thời, giảm thiểu việc mất sự kiện.
Verify cơ chế Retry
Để kiểm tra, ta có thể giả lập lỗi bằng cách ngắt kết nối Kafka hoặc thay đổi config. Tuy nhiên, trong môi trường ổn định, ta chạy lại script demo nhưng thay đổi method gọi:
sed -i 's/producer.publish_events/events_with_retry/' /opt/event-sourcing-app/src/main.py
sed -i 's/producer.publish_events(events)/producer.publish_events_with_retry(events)/' /opt/event-sourcing-app/src/main.py
Cập nhật main.py để dùng hàm mới:
cd /opt/event-sourcing-app/src
python3 main.py
Kết quả mong đợi: Script chạy thành công. Nếu Kafka bị lỗi tạm thời, bạn sẽ thấy log "Retrying X failed events..." trước khi in "Successfully published all events".
Verify toàn bộ luồng dữ liệu với Kafka Console Consumer
Sau khi Producer chạy, cần xác minh dữ liệu đã vào Kafka Topic chưa bằng công cụ Console Consumer có sẵn trong Kafka.
Chạy lệnh consumer để đọc topic account-events từ đầu (from beginning):
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic account-events \
--from-beginning \
--property print.key=true \
--property key.separator=" | "
Kết quả mong đợi: Bạn sẽ thấy dòng output tương tự như sau (số liệu có thể khác tùy demo):
ACC-XXXX | {"event_id": "ACC-XXXX-created-1", "aggregate_id": "ACC-XXXX", "event_type": "AccountCreated", "timestamp": "...", "data": {"initial_balance": 1000.0}, ...}
ACC-XXXX | {"event_id": "ACC-XXXX-deposit-2", "aggregate_id": "ACC-XXXX", "event_type": "AccountDeposited", "timestamp": "...", "data": {"amount": 500.0, "new_balance": 1500.0}, ...}
ACC-XXXX | {"event_id": "ACC-XXXX-withdraw-3", "aggregate_id": "ACC-XXXX", "event_type": "AccountWithdrew", "timestamp": "...", "data": {"amount": 200.0, "new_balance": 1300.0}, ...}
Quan sát: Key (Aggregate ID) luôn giống nhau cho cùng một tài khoản, đảm bảo các sự kiện của cùng một tài khoản nằm trong cùng một Partition, duy trì tính thứ tự (Ordering).
Điều hướng series:
Mục lục: Series: Triển khai Database Event Sourcing với Apache Kafka và Ubuntu 24.04
« Phần 3: Thiết kế Schema và quản lý dữ liệu với Schema Registry
Phần 5: Triển khai Database để lưu trữ Event Store »