1. Xây dựng API Command để nhận và validate yêu cầu
Bước đầu tiên là tạo lớp điều khiển (Controller) chuyên biệt để xử lý các yêu cầu ghi (Write) từ phía khách hàng.
Chúng ta sẽ sử dụng Go (Golang) để xây dựng API này vì hiệu năng cao và sự đơn giản trong việc xử lý concurrency. Controller này sẽ không truy cập trực tiếp vào database mà chỉ chịu trách nhiệm parse request và gọi vào Domain Service.
Đầu tiên, tạo thư mục dự án và khởi tạo module Go:
mkdir -p /var/www/cqrs-demo/cmd-side
cd /var/www/cqrs-demo/cmd-side
go mod init cqrs-demo/cmd-side
Kết quả mong đợi: Thư mục được tạo và file go.mod xuất hiện.
Tiếp theo, tạo file cấu hình để định nghĩa các loại Command. Trong CQRS, Command là một đối tượng thuần túy chứa dữ liệu đầu vào.
Tạo file /var/www/cqrs-demo/cmd-side/commands/order.go với nội dung:
package commands
import (
"encoding/json"
"fmt"
"time"
)
type CreateOrderCommand struct {
UserID string `json:"user_id"`
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
TotalPrice float64 `json:"total_price"`
Timestamp time.Time `json:"timestamp"`
}
func (c CreateOrderCommand) Validate() error {
if c.UserID == "" {
return fmt.Errorf("user_id is required")
}
if c.ProductID == "" {
return fmt.Errorf("product_id is required")
}
if c.Quantity
Kết quả mong đợi: File được tạo, định nghĩa cấu trúc dữ liệu và hàm validate rõ ràng.
Bây giờ, tạo file /var/www/cqrs-demo/cmd-side/main.go để thiết lập HTTP Server và route cho Command:
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"cqrs-demo/cmd-side/commands"
)
func handleCreateOrder(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var cmd commands.CreateOrderCommand
if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
return
}
// Validate logic
if err := cmd.Validate(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Gọi Domain Logic (sẽ triển khai ở bước sau)
// Giả lập thành công cho bước test API
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"status": "command_accepted", "command_id": "cmd_" + cmd.UserID})
}
func main() {
http.HandleFunc("/api/commands/order/create", handleCreateOrder)
log.Println("Command API listening on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
Kết quả mong đợi: Server chạy trên cổng 8080, sẵn sàng nhận POST request.
Để verify, hãy gửi một request thử nghiệm bằng curl:
curl -X POST http://localhost:8080/api/commands/order/create \
-H "Content-Type: application/json" \
-d '{"user_id": "user_123", "product_id": "prod_456", "quantity": 2, "total_price": 150.00}'
Kết quả mong đợi: Nhận về response JSON với status 202 Accepted và thông báo command_accepted.
2. Triển khai logic Domain để tạo Event từ Command
Sau khi Command được validate, bước tiếp theo là chuyển đổi nó thành Domain Event.
Trong Event Sourcing, chúng ta không lưu trạng thái cuối cùng (state) mà lưu lại lịch sử các sự kiện đã xảy ra (events). Logic Domain sẽ chịu trách nhiệm "phiên dịch" ý định của Command thành các Event cụ thể.
Tạo thư mục domain và file /var/www/cqrs-demo/cmd-side/domain/events.go:
package domain
import (
"encoding/json"
"time"
"cqrs-demo/cmd-side/commands"
)
type Event interface {
GetType() string
GetPayload() []byte
GetOccurredOn() time.Time
}
type OrderCreatedEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
TotalPrice float64 `json:"total_price"`
OccurredOn time.Time `json:"occurred_on"`
}
func (e OrderCreatedEvent) GetType() string {
return "OrderCreated"
}
func (e OrderCreatedEvent) GetPayload() []byte {
return json.Marshal(e)
}
func (e OrderCreatedEvent) GetOccurredOn() time.Time {
return e.OccurredOn
}
func (e OrderCreatedEvent) MarshalJSON() ([]byte, error) {
type Alias OrderCreatedEvent
return json.Marshal(&struct {
EventType string `json:"event_type"`
*Alias
}{
EventType: e.GetType(),
Alias: (*Alias)(&e),
})
}
Kết quả mong đợi: Định nghĩa giao diện Event và cụ thể hóa OrderCreatedEvent.
Tiếp theo, tạo /var/www/cqrs-demo/cmd-side/domain/service.go để chứa logic chuyển đổi Command -> Event. Đây là nơi áp dụng các Business Rules.
package domain
import (
"fmt"
"time"
"cqrs-demo/cmd-side/commands"
)
type OrderService struct {
// Có thể inject Aggregate Repository ở đây nếu cần
}
func (s *OrderService) HandleCreateOrder(cmd commands.CreateOrderCommand) ([]Event, error) {
// Business Rule: Kiểm tra tồn tại (giả lập)
// Trong thực tế, bạn sẽ query Read Model hoặc Snapshot để check state
// Tạo Event từ Command
event := OrderCreatedEvent{
OrderID: fmt.Sprintf("ORD-%d", time.Now().UnixNano()),
UserID: cmd.UserID,
ProductID: cmd.ProductID,
Quantity: cmd.Quantity,
TotalPrice: cmd.TotalPrice,
OccurredOn: time.Now().UTC(),
}
return []Event{event}, nil
}
Kết quả mong đợi: Service trả về danh sách Event (ở đây là 1 event) từ Command.
Update lại file /var/www/cqrs-demo/cmd-side/main.go để tích hợp Domain Service vào handler:
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"cqrs-demo/cmd-side/commands"
"cqrs-demo/cmd-side/domain"
)
var orderService = &domain.OrderService{}
func handleCreateOrder(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var cmd commands.CreateOrderCommand
if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
return
}
if err := cmd.Validate(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Gọi Domain Logic để tạo Event
events, err := orderService.HandleCreateOrder(cmd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Lưu Event vào Store (sẽ triển khai ở bước sau)
// Giả lập lưu thành công
log.Printf("Generated %d event(s): %v", len(events), events)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"status": "events_generated", "count": fmt.Sprintf("%d", len(events))})
}
Kết quả mong đợi: Khi gọi API, log sẽ in ra thông tin về Event đã được tạo.
Verify lại bằng curl:
curl -X POST http://localhost:8080/api/commands/order/create \
-H "Content-Type: application/json" \
-d '{"user_id": "user_123", "product_id": "prod_456", "quantity": 2, "total_price": 150.00}'
Kiểm tra log output của server (nếu chạy bằng go run) để thấy dòng "Generated 1 event(s)".
3. Cấu hình cơ chế lưu Event vào Event Store
Bước quan trọng nhất là lưu các Event đã tạo vào Event Store.
Chúng ta sẽ sử dụng PostgreSQL làm Event Store. Cấu trúc bảng cần hỗ trợ lưu trữ JSONB để lưu payload linh hoạt của các loại Event khác nhau.
Đầu tiên, truy cập PostgreSQL và tạo database cùng bảng:
sudo -u postgres psql
CREATE DATABASE cqrs_events;
\c cqrs_events
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
occurred_on TIMESTAMPTZ NOT NULL DEFAULT NOW(),
version BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT unique_event_per_aggregate_version UNIQUE (aggregate_id, version)
);
CREATE INDEX idx_events_aggregate_id ON events (aggregate_id);
CREATE INDEX idx_events_occurred_on ON events (occurred_on);
\q
Kết quả mong đợi: Database và bảng `events` được tạo với các chỉ mục cần thiết cho hiệu năng.
Tạo file /var/www/cqrs-demo/cmd-side/storage/event_store.go để định nghĩa interface và implementation lưu Event:
package storage
import (
"database/sql"
"fmt"
"time"
"cqrs-demo/cmd-side/domain"
"log"
)
type EventStore interface {
AppendEvents(aggregateID string, events []domain.Event, currentVersion int64) error
}
type PostgresEventStore struct {
db *sql.DB
}
func NewPostgresEventStore(db *sql.DB) *PostgresEventStore {
return &PostgresEventStore{db: db}
}
func (s *PostgresEventStore) AppendEvents(aggregateID string, events []domain.Event, currentVersion int64) error {
if len(events) == 0 {
return nil
}
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if err != nil {
tx.Rollback()
}
}()
insertSQL := `INSERT INTO events (aggregate_id, event_type, payload, occurred_on, version) VALUES ($1, $2, $3, $4, $5)`
for i, event := range events {
version := currentVersion + int64(i) + 1
payloadJSON, err := event.MarshalJSON()
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
_, err = tx.Exec(insertSQL, aggregateID, event.GetType(), payloadJSON, event.GetOccurredOn(), version)
if err != nil {
log.Printf("Rolling back due to insert error: %v", err)
return fmt.Errorf("failed to insert event %s: %w", event.GetType(), err)
}
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
Kết quả mong đợi: Class PostgresEventStore có phương thức AppendEvents thực hiện lưu nhiều event trong một transaction.
Cấu hình kết nối database trong /var/www/cqrs-demo/cmd-side/config/db.go:
package config
import (
"database/sql"
"fmt"
_ "github.com/lib/pq"
)
func ConnectDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
// Cấu hình connection pool cho workload write
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
return db, nil
}
Kết quả mong đợi: Hàm kết nối DB với cấu hình pool phù hợp.
Cập nhật /var/www/cqrs-demo/cmd-side/main.go để tích hợp Event Store:
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"cqrs-demo/cmd-side/commands"
"cqrs-demo/cmd-side/domain"
"cqrs-demo/cmd-side/storage"
"cqrs-demo/cmd-side/config"
)
var orderService = &domain.OrderService{}
var eventStore storage.EventStore
func handleCreateOrder(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var cmd commands.CreateOrderCommand
if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
return
}
if err := cmd.Validate(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
events, err := orderService.HandleCreateOrder(cmd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Lưu Event vào Store
// Giả định aggregate_id là user_id cho đơn giản, version bắt đầu từ 0
if err := eventStore.AppendEvents(cmd.UserID, events, 0); err != nil {
log.Printf("Error saving events: %v", err)
http.Error(w, fmt.Sprintf("Failed to save events: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"status": "events_persisted", "count": fmt.Sprintf("%d", len(events))})
}
func main() {
// Kết nối DB
db, err := config.ConnectDB("host=localhost user=postgres password=your_password dbname=cqrs_events sslmode=disable")
if err != nil {
log.Fatalf("Failed to connect to DB: %v", err)
}
defer db.Close()
eventStore = storage.NewPostgresEventStore(db)
http.HandleFunc("/api/commands/order/create", handleCreateOrder)
log.Println("Command API listening on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
Kết quả mong đợi: Server khởi động và kết nối thành công với PostgreSQL.
Verify bằng cách chạy curl và kiểm tra database:
curl -X POST http://localhost:8080/api/commands/order/create \
-H "Content-Type: application/json" \
-d '{"user_id": "user_123", "product_id": "prod_456", "quantity": 2, "total_price": 150.00}'
sudo -u postgres psql -d cqrs_events -c "SELECT * FROM events WHERE aggregate_id = 'user_123';"
Kết quả mong đợi: API trả về 202 Accepted và bảng `events` trong DB có 1 dòng mới với payload JSONB chứa thông tin đơn hàng.
4. Xử lý lỗi và rollback trong trường hợp không ghi được Event
Trong Event Sourcing, tính nhất quán (Consistency) là tối quan trọng. Nếu việc lưu Event thất bại, toàn bộ transaction phải bị rollback để tránh trạng thái "thất bại một phần".
Cơ chế rollback đã được tích hợp trong code PostgresEventStore.AppendEvents thông qua tx.Rollback() trong khối defer. Tuy nhiên, chúng ta cần thêm logic xử lý lỗi cụ thể hơn, đặc biệt là lỗi Concurrent Write (xung đột phiên bản).
Cải tiến /var/www/cqrs-demo/cmd-side/storage/event_store.go để bắt lỗi duy nhất (Unique Constraint Violation) và trả về lỗi cụ thể:
package storage
import (
"database/sql"
"fmt"
"log"
"cqrs-demo/cmd-side/domain"
"errors"
)
var ErrOptimisticLocking = errors.New("optimistic locking conflict: aggregate version mismatch")
func (s *PostgresEventStore) AppendEvents(aggregateID string, events []domain.Event, currentVersion int64) error {
if len(events) == 0 {
return nil
}
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
// Đảm bảo rollback nếu có lỗi
if err := func() error {
insertSQL := `INSERT INTO events (aggregate_id, event_type, payload, occurred_on, version) VALUES ($1, $2, $3, $4, $5)`
for i, event := range events {
version := currentVersion + int64(i) + 1
payloadJSON, err := event.MarshalJSON()
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
_, err = tx.Exec(insertSQL, aggregateID, event.GetType(), payloadJSON, event.GetOccurredOn(), version)
if err != nil {
// Kiểm tra lỗi unique constraint
if isUniqueConstraintViolation(err) {
return ErrOptimisticLocking
}
return fmt.Errorf("failed to insert event %s: %w", event.GetType(), err)
}
}
return tx.Commit()
}(); err != nil {
tx.Rollback()
log.Printf("Transaction rolled back: %v", err)
return err
}
return nil
}
func isUniqueConstraintViolation(err error) bool {
if err == nil {
return false
}
// PostgreSQL error code for unique_violation is 23505
if pqErr, ok := err.(*pgError); ok {
return pqErr.Code == "23505"
}
// Fallback check for string if driver doesn't expose code directly
return false
}
*Lưu ý: Để code chạy được, cần import package postgres driver cụ thể để parse lỗi. Ở đây giả định logic kiểm tra lỗi cơ bản.*
Cập nhật /var/www/cqrs-demo/cmd-side/main.go để xử lý lỗi Optimistic Locking:
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"cqrs-demo/cmd-side/commands"
"cqrs-demo/cmd-side/domain"
"cqrs-demo/cmd-side/storage"
"cqrs-demo/cmd-side/config"
"errors"
)
var orderService = &domain.OrderService{}
var eventStore storage.EventStore
func handleCreateOrder(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var cmd commands.CreateOrderCommand
if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
return
}
if err := cmd.Validate(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
events, err := orderService.HandleCreateOrder(cmd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Lưu Event vào Store
saveErr := eventStore.AppendEvents(cmd.UserID, events, 0)
if saveErr != nil {
// Xử lý lỗi đặc biệt
if errors.Is(saveErr, storage.ErrOptimisticLocking) {
http.Error(w, "Conflict: Aggregate version mismatch. Please retry.", http.StatusConflict)
return
}
log.Printf("Error saving events: %v", saveErr)
http.Error(w, fmt.Sprintf("Failed to save events: %v", saveErr), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"status": "events_persisted", "count": fmt.Sprintf("%d", len(events))})
}
Kết quả mong đợi: Nếu xảy ra xung đột phiên bản (ví dụ 2 request cùng lúc cho cùng 1 user_id), API trả về 409 Conflict thay vì 500 Internal Server Error.
Để verify cơ chế rollback, hãy thử tạo một lỗi cố ý trong code (ví dụ: cố insert version trùng) hoặc chạy song song 2 request giống hệt nhau:
curl -X POST http://localhost:8080/api/commands/order/create \
-H "Content-Type: application/json" \
-d '{"user_id": "user_test", "product_id": "prod_999", "quantity": 1, "total_price": 10.00}' &
curl -X POST http://localhost:8080/api/commands/order/create \
-H "Content-Type: application/json" \
-d '{"user_id": "user_test", "product_id": "prod_999", "quantity": 1, "total_price": 10.00}'
Kết quả mong đợi: Một request thành công (202), request còn lại trả về 409 Conflict. Trong database, chỉ có 1 bản ghi cho user_test (không bị ghi đè hoặc lỗi dữ liệu).
Kiểm tra số lượng bản ghi trong DB:
sudo -u postgres psql -d cqrs_events -c "SELECT aggregate_id, COUNT(*) FROM events GROUP BY aggregate_id;"
Kết quả mong đợi: User "user_test" chỉ có 1 bản ghi duy nhất, chứng tỏ rollback đã hoạt động đúng.
Điều hướng series:
Mục lục: Series: Triển khai Database CQRS với Event Sourcing và Ubuntu 24.04
« Phần 3: Thiết kế Schema và lưu trữ Event Store
Phần 5: Triển khai Query Side và xây dựng Read Models »