Cấu hình Kafka Consumer và Offset Management
Thiết lập nhóm Consumer và chiến lược offset
Ta cần cấu hình ứng dụng Consumer để tham gia vào một Consumer Group cụ thể, đảm bảo sự kiện được phân phối cân bằng và xử lý đúng thứ tự.
Mục đích là thiết lập group.id để Kafka biết ai là người đọc, và enable.auto.commit để kiểm soát việc ghi offset thủ công, tránh việc mất sự kiện khi consumer gặp lỗi giữa chừng.
Kết quả mong đợi là consumer sẽ tự động join vào broker và sẵn sàng nhận dữ liệu từ topic đã định.
Trước tiên, tạo file cấu hình cho consumer tại đường dẫn /opt/kafka-consumer/config.properties.
bootstrap.servers=localhost:9092
group.id=event-sourcing-group-01
session.timeout.ms=30000
heartbeat.interval.ms=10000
max.poll.records=100
enable.auto.commit=false
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
File cấu hình này sẽ được đọc bởi ứng dụng Java/Go/Python. Lưu ý auto.offset.reset=earliest để khi replay, consumer đọc từ đầu log.
Kết quả: Consumer đã có cấu hình tĩnh để kết nối vào cluster Kafka đã setup ở phần trước.
Verify kết cấu Consumer Group
Trước khi chạy code, hãy kiểm tra xem consumer có thể join group được không bằng lệnh dòng lệnh.
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Kết quả mong đợi: Nếu chưa có group nào, danh sách sẽ rỗng. Nếu đã có, bạn sẽ thấy event-sourcing-group-01.
Triển khai Logic Consumer bằng Python
Ta sẽ viết một script Python đơn giản để đọc sự kiện từ topic user-events và in ra console, đồng thời xử lý commit offset thủ công.
Mục đích là minh họa luồng xử lý: Poll -> Deserialize -> Process -> Commit.
Kết quả mong đợi: Script chạy, nhận được các event JSON và in ra màn hình, offset được cập nhật sau khi xử lý thành công.
Trước khi chạy, cài đặt thư viện confluent-kafka cần thiết.
pip install confluent-kafka
Kết quả: Thư viện được cài đặt vào môi trường Python hiện tại.
Tạo file script /app/consumer.py với nội dung đầy đủ dưới đây.
import json
import signal
import sys
from confluent_kafka import Consumer, KafkaError, KafkaException
# Đọc config từ file
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'event-sourcing-group-01',
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000,
'max.poll.records': 100,
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
'key.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer',
'value.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer'
}
consumer = Consumer(config)
topics = ['user-events']
consumer.subscribe(topics)
def signal_handler(sig, frame):
print('\nĐang đóng consumer...')
consumer.close()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print("Consumer đang lắng nghe sự kiện...")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError.NO_ERROR:
continue
raise KafkaException(msg.error())
# Xử lý sự kiện
key = msg.key().decode('utf-8')
value = json.loads(msg.value().decode('utf-8'))
print(f"[RECEIVED] Partition: {msg.partition()}, Offset: {msg.offset()}, Key: {key}")
print(f"Payload: {json.dumps(value, indent=2, ensure_ascii=False)}")
# Logic giả lập xử lý
# Ở đây ta sẽ call hàm apply_event(value, current_state)
# Sau khi xử lý xong mới commit
try:
# Giả lập logic Apply Event (sẽ chi tiết ở phần sau)
print(f"-> Applying event type: {value.get('type')}")
# Commit offset thủ công sau khi xử lý thành công
consumer.commit(msg)
print(f"-> Committed offset: {msg.offset()}")
except Exception as e:
print(f"-> Lỗi khi xử lý event: {str(e)}")
# Không commit offset, lần sau sẽ đọc lại sự kiện này
raise
except KeyboardInterrupt:
pass
finally:
consumer.close()
print("Consumer đã dừng.")
Kết quả: Script chạy, in ra từng sự kiện từ topic và xác nhận offset đã commit.
Logic Apply Event và Cập nhật State
Thiết kế State Machine đơn giản
Trong Event Sourcing, State không được lưu trực tiếp dưới dạng bảng quan hệ truyền thống mà được tái tạo từ chuỗi Event.
Mục đích là xây dựng hàm apply nhận một Event và một State hiện tại, trả về State mới.
Kết quả mong đợi: State của entity (ví dụ User) thay đổi chính xác theo từng loại sự kiện (Created, Updated, Deleted).
Ta sẽ tạo một class UserState và các hàm xử lý sự kiện trong file /app/state_machine.py.
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class UserState:
id: str
email: str = ""
full_name: str = ""
is_active: bool = True
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
version: int = 0
def apply(self, event: Dict[str, Any]) -> 'UserState':
"""
Áp dụng sự kiện vào state hiện tại để tạo state mới.
Đây là hàm cốt lõi của Event Sourcing.
"""
event_type = event.get("type")
event_data = event.get("data", {})
# Clone state hiện tại để đảm bảo immutability (không thay đổi trực tiếp)
new_state = UserState(
id=self.id,
email=self.email,
full_name=self.full_name,
is_active=self.is_active,
created_at=self.created_at,
updated_at=self.updated_at,
version=self.version
)
if event_type == "UserCreated":
new_state.id = event_data.get("id", self.id)
new_state.email = event_data.get("email")
new_state.full_name = event_data.get("full_name")
new_state.created_at = datetime.now()
new_state.version = 1
elif event_type == "UserProfileUpdated":
if "email" in event_data:
new_state.email = event_data["email"]
if "full_name" in event_data:
new_state.full_name = event_data["full_name"]
new_state.updated_at = datetime.now()
new_state.version += 1
elif event_type == "UserDeleted":
new_state.is_active = False
new_state.updated_at = datetime.now()
new_state.version += 1
elif event_type == "UserActivated":
new_state.is_active = True
new_state.updated_at = datetime.now()
new_state.version += 1
else:
# Nếu là sự kiện không nhận biết, bỏ qua hoặc throw error
raise ValueError(f"Unknown event type: {event_type}")
return new_state
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"email": self.email,
"full_name": self.full_name,
"is_active": self.is_active,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"version": self.version
}
# Hàm khởi tạo state rỗng
def create_empty_state(user_id: str) -> UserState:
return UserState(id=user_id)
Kết quả: Class UserState được định nghĩa với phương thức apply xử lý logic business.
Tích hợp Logic Apply vào Consumer
Bây giờ ta sẽ sửa file consumer.py để sử dụng class UserState trên, thay vì chỉ in ra console.
Mục đích là tái tạo state trong bộ nhớ (In-Memory State) của consumer dựa trên luồng sự kiện.
Kết quả mong đợi: Khi chạy, console sẽ in ra trạng thái hiện tại của user sau mỗi sự kiện.
Sửa đổi phần logic trong vòng lặp while True của file /app/consumer.py như sau (thay thế phần comment "Logic giả lập xử lý").
import sys
sys.path.append('/app')
from state_machine import UserState, create_empty_state
# Dictionary để lưu state hiện tại của các user trong bộ nhớ
# Trong thực tế, đây sẽ là cache hoặc snapshot được load từ DB
user_states: Dict[str, UserState] = {}
# ... (giữ nguyên phần import và setup consumer ở trên)
# Trong vòng lặp while:
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError.NO_ERROR:
continue
raise KafkaException(msg.error())
key = msg.key().decode('utf-8')
value = json.loads(msg.value().decode('utf-8'))
# Lấy state hiện tại hoặc tạo mới nếu chưa có
# Lưu ý: Trong môi trường production, bạn nên load state snapshot từ DB
# để không phải đọc lại toàn bộ lịch sử mỗi khi restart
if key not in user_states:
user_states[key] = create_empty_state(key)
current_state = user_states[key]
# Áp dụng sự kiện
try:
new_state = current_state.apply(value)
user_states[key] = new_state
print(f"[STATE UPDATE] User: {key}, New Version: {new_state.version}")
print(f"Current State: {json.dumps(new_state.to_dict(), indent=2, ensure_ascii=False)}")
consumer.commit(msg)
except Exception as e:
print(f"[ERROR] Failed to apply event for user {key}: {str(e)}")
# Không commit, sẽ retry ở lần poll sau
raise
except KeyboardInterrupt:
pass
finally:
consumer.close()
print("Consumer đã dừng.")
Kết quả: Consumer không chỉ đọc event mà còn cập nhật state trong bộ nhớ, phản ánh chính xác trạng thái hiện tại của user.
Tính năng Replay: Tái tạo trạng thái từ lịch sử
Cấu hình chế độ Replay
Để thực hiện replay, ta cần reset offset của consumer group về điểm bắt đầu (earliest) của topic.
Mục đích là buộc consumer đọc lại toàn bộ lịch sử sự kiện từ ngày đầu tiên để tái tạo state hiện tại chính xác.
Kết quả mong đợi: Consumer sẽ đọc lại tất cả event cũ và in ra trạng thái cuối cùng giống hệt như trạng thái hiện tại trong DB.
Sử dụng lệnh kafka-consumer-groups.sh để reset offset của group về earliest.
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group event-sourcing-group-01 --reset-offsets --to-earliest --execute --topic user-events
Kết quả: Thông báo xác nhận reset offset thành công cho topic user-events.
Chạy lại Consumer để Replay
Khởi động lại script consumer sau khi đã reset offset.
Mục đích là quan sát quá trình tái tạo state từ đầu.
Kết quả mong đợi: Console sẽ in ra toàn bộ lịch sử sự kiện theo thứ tự thời gian, và trạng thái cuối cùng của user sẽ khớp với dữ liệu hiện tại.
Chạy script với cờ --replay (giả định ta thêm logic này vào script, hoặc đơn giản là chạy lại sau khi reset offset như trên).
python3 /app/consumer.py
Kết quả: Bạn sẽ thấy log in ra từ đầu (UserCreated) đến các sự kiện update sau đó.
Verify kết quả Replay
So sánh trạng thái cuối cùng sau khi replay với trạng thái thực tế trong Database (nếu có) hoặc với snapshot.
Mục đích là đảm bảo tính nhất quán của dữ liệu (Data Consistency).
Kết quả mong đợi: Giá trị version và các trường dữ liệu (email, full_name) sau khi replay phải giống hệt trạng thái hiện tại.
Để kiểm tra nhanh, ta có thể dùng lệnh kafka-console-consumer.sh để đọc toàn bộ log và đếm số lượng sự kiện.
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user-events --from-beginning --timeout-ms 5000 | wc -l
Kết quả: Số dòng in ra tương ứng với tổng số sự kiện đã được replay. Nếu số lượng sự kiện khớp với log của script consumer, việc replay thành công.
Verify Tổng thể
Để đảm bảo toàn bộ quy trình từ Consumer, Apply Event đến Replay hoạt động trơn tru, hãy thực hiện các bước kiểm tra sau.
1. Kiểm tra offset hiện tại của consumer group.
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group event-sourcing-group-01 --topic user-events
Kết quả mong đợi: Cột LAG phải bằng 0 nếu consumer đã bắt kịp với producer.
2. Kiểm tra state cuối cùng của một user cụ thể trong console của script.
Quan sát dòng log cuối cùng nhất cho key user đó.
Kết quả mong đợi: State hiển thị phải chứa các giá trị đã được cập nhật lần cuối.
3. Thực hiện thêm một sự kiện mới và quan sát consumer phản hồi ngay lập tức.
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic user-events --property "parse.key=true"
Nhập vào console producer (đã mở song song):
user-001|{"type": "UserProfileUpdated", "data": {"full_name": "Nguyen Van A Updated"}}
Kết quả mong đợi: Script consumer ngay lập tức in ra sự kiện mới và cập nhật state của user-001 với tên mới.
Điều hướng series:
Mục lục: Series: Triển khai Database Event Sourcing với Apache Kafka và Ubuntu 24.04
« Phần 5: Triển khai Database để lưu trữ Event Store
Phần 7: Xử lý lỗi, Debugging và các mẹo tối ưu hiệu suất »