Điều chỉnh Resource Request và Limit cho Pod trong Kubernetes
Đầu tiên, bạn cần xác định lại resource request và limit cho các Pod quan trọng để tránh tình trạng OOMKilled hoặc CPU throttling gây gián đoạn pipeline.
Chúng ta sẽ cập nhật YAML định nghĩa StatefulSet của Kafka và Flink TaskManager. Request là mức tài nguyên tối thiểu hệ thống cam kết cung cấp, Limit là mức tối đa Pod được phép tiêu thụ.
File cấu hình: ./kafka-pvc.yaml
Nội dung hoàn chỉnh cho Kafka (Strimzi) với điều chỉnh cho 3 replicas:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-cluster
namespace: data-platform
spec:
kafka:
version: 3.6.0
replicas: 3
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
jvmOptions:
-XX:MaxRAMPercentage=75.0
-XX:+UseG1GC
Kết quả mong đợi: Pod Kafka sẽ khởi động lại với lượng RAM và CPU được cấp phát chính xác, tránh hiện tượng GC (Garbage Collection) quá thường xuyên do thiếu bộ nhớ.
Tiếp theo, tối ưu hóa cho Flink TaskManager. Flink rất nhạy cảm với bộ nhớ Heap. Cần cấu hình rõ ràng để tránh OOM.
File cấu hình: ./flink-deployment.yaml
Nội dung hoàn chỉnh cho Flink TaskManager (Kubernetes Operator):
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: analytics-pipeline
namespace: data-platform
spec:
image: flink:1.17-scala_2.12-java11
flinkVersion: v1_17
flinkConfiguration:
taskmanager.memory.process.size: 6144m
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.managed.fraction: 0.4
taskmanager.memory.network.fraction: 0.1
taskManager:
replicas: 3
resources:
requests:
cpu: "3"
memory: "7Gi"
limits:
cpu: "6"
memory: "8Gi"
Kết quả mong đợi: TaskManager chạy ổn định, không bị restart do lỗi OOM, và slot được phân bổ hiệu quả cho các job aggregation.
Verify kết quả
Chạy lệnh kiểm tra trạng thái resource thực tế so với request/limit:
kubectl top pods -n data-platform -l app=kafka-cluster
kubectl top pods -n data-platform -l app=flink
So sánh cột cpu và memory với giá trị đã set. Nếu usage vượt quá limit thường xuyên, bạn cần tăng limit hoặc tối ưu code.
Tối ưu hóa Partitioning và Sharding cho Kafka và ClickHouse
Để đảm bảo throughput cao, số lượng partition của Kafka topic phải tương thích với số lượng Task Slot của Flink và số lượng shard của ClickHouse.
Nguyên tắc vàng: Số partition Kafka >= Số Task Slot Flink >= Số shard ClickHouse.
Giả sử bạn có 3 Kafka Brokers, 3 Flink TaskManagers (mỗi cái 4 slots = 12 slots), và 3 ClickHouse nodes.
File cấu hình: ./kafka-topic-config.yaml
Tạo topic với số partition tối ưu (ví dụ 12 hoặc bội số của 3):
kafka-topics.sh --bootstrap-server kafka-cluster-data-platform:9092 --create --topic analytics-raw-events --partitions 12 --replication-factor 3 --config cleanup.policy=delete --config retention.ms=604800000
Kết quả mong đợi: Topic được tạo với 12 phân vùng, cho phép Flink song song hóa việc đọc dữ liệu lên đến 12 luồng xử lý.
Đối với ClickHouse, bạn cần cấu hình sharding logic để dữ liệu được phân tán đều qua các node. Sử dụng key hash để đảm bảo cùng một entity (ví dụ: user_id) luôn nằm trên cùng một shard.
File cấu hình: /etc/clickhouse-server/config.d/sharding.xml
Nội dung hoàn chỉnh để định nghĩa cấu hình shard và replica:
clickhouse-0
9000
clickhouse-1
9000
clickhouse-2
9000
Kết quả mong đợi: ClickHouse Cluster nhận diện được 3 shard, cho phép phân phối dữ liệu đồng đều.
Trong Flink, cấu hình Sink để gửi dữ liệu vào ClickHouse với sharding key tương ứng. Điều này đảm bảo dữ liệu được định tuyến đúng shard mà không cần ClickHouse tự cân bằng lại (rebalancing) gây tốn tài nguyên.
File code: ./src/main/java/ClickHouseSinkBuilder.java
Đoạn code cấu hình sink (pseudocode logic):
environmentSettings.put("sink.clickhouse.host", "clickhouse-0:9000");
environmentSettings.put("sink.clickhouse.cluster", "analytics_cluster");
environmentSettings.put("sink.clickhouse.database", "analytics_db");
environmentSettings.put("sink.clickhouse.table", "raw_events");
// Key sharding: hash(user_id) % 3
dataStream.addSink(new ClickHouseSinkBuilder()
.withShardingKeyExtractor(record -> record.getUserID())
.build());
Kết quả mong đợi: Dữ liệu từ Flink được phân phối đều vào 3 node ClickHouse dựa trên user_id, tránh hiện tượng "hot spot" trên một node.
Verify kết quả
Kiểm tra sự phân bổ dữ liệu trên các node ClickHouse:
clickhouse-client --query "SELECT shardNum, count() FROM analytics_db.raw_events GROUP BY shardNum"
Số lượng dòng (count) của từng shardNum phải xấp xỉ bằng nhau (sai số < 5%).
Cấu hình Auto-scaling cho Flink TaskManager và ClickHouse nodes
Để xử lý peak load và giảm chi phí khi traffic thấp, bạn cần triển khai Horizontal Pod Autoscaler (HPA) cho Flink và ClickHouse.
Đối với Flink, việc scale TaskManager cần sự hỗ trợ của Kubernetes Operator và HPA dựa trên metric CPU hoặc metric tùy chỉnh (ví dụ: lag của Kafka consumer).
File cấu hình: ./flink-hpa.yaml
Nội dung hoàn chỉnh cho HPA của Flink TaskManager:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: flink-taskmanager-hpa
namespace: data-platform
spec:
scaleTargetRef:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
name: analytics-pipeline
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Kết quả mong đợi: Khi CPU > 70%, Kubernetes sẽ tự động tạo thêm TaskManager. Khi CPU < 50% trong 5 phút, nó sẽ giảm số lượng Pod.
Đối với ClickHouse, việc scale tự động phức tạp hơn vì cần cập nhật cluster configuration. Cách tiếp cận phổ biến nhất là dùng KEDA (Kubernetes Event-driven Autoscaler) kết hợp với ClickHouse Operator hoặc script custom để điều chỉnh số lượng replica trong StatefulSet.
File cấu hình: ./clickhouse-hpa.yaml (Giả sử dùng KEDA với metric queue length của Kafka):
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: clickhouse-scaledobject
namespace: data-platform
spec:
scaleTargetRef:
name: clickhouse-statefulset
minReplicaCount: 3
maxReplicaCount: 9
triggers:
- type: kafka
metadata:
bootstrapServers: kafka-cluster-data-platform:9092
consumerGroup: flink-consumer-group
topic: analytics-raw-events
lagThreshold: "50000"
Kết quả mong đợi: Khi lag của Kafka consumer vượt quá 50,000 message, ClickHouse sẽ tự động tăng số lượng node để tăng sức mạnh tính toán cho Sink.
Verify kết quả
Tạo load giả lập để kích hoạt scaling:
kubectl run load-test --image=busybox --restart=Never -- sleep 3600 &
# Check HPA status
kubectl get hpa flink-taskmanager-hpa -n data-platform -w
kubectl get scaledobject clickhouse-scaledobject -n data-platform
Quan sát cột REPLICAS tăng lên khi bạn tạo load, và giảm xuống khi load hết.
Quản lý chi phí lưu trữ và tính toán trên đám mây
Tối ưu chi phí bắt đầu từ việc giảm lượng dữ liệu lưu trữ không cần thiết và sử dụng các lớp lưu trữ giá rẻ.
Trong Kafka, cấu hình retention policy để tự động xóa dữ liệu cũ. Không lưu log indefinitely trừ khi bắt buộc.
File cấu hình: ./kafka-retention-policy.yaml
Cập nhật config topic để giới hạn lưu trữ 7 ngày (604800000 ms):
kafka-configs.sh --bootstrap-server kafka-cluster-data-platform:9092 --entity-type topics --entity-name analytics-raw-events --alter --add-config cleanup.policy=delete,retention.ms=604800000
Kết quả mong đợi: Kafka tự động xóa các partition segment cũ hơn 7 ngày, giảm dung lượng disk sử dụng.
Trong ClickHouse, sử dụng Disk Tiering để lưu trữ dữ liệu nóng (nhanh, đắt) trên SSD và dữ liệu lạnh (chậm, rẻ) trên S3/HDD. Đây là bước quan trọng nhất để cắt giảm chi phí lưu trữ.
File cấu hình: /etc/clickhouse-server/config.d/storage.xml
Nội dung hoàn chỉnh để định nghĩa Storage Policy:
/var/lib/clickhouse/ssd/
local
s3
https://s3.amazonaws.com
YOUR_ACCESS_KEY
YOUR_SECRET_KEY
clickhouse-cold-storage
ssd
1073741824
0.05
s3
Kết quả mong đợi: ClickHouse tự động di chuyển các data part cũ sang S3 khi dữ liệu trên SSD vượt quá ngưỡng, giảm chi phí lưu trữ lên đến 90%.
Cấu hình Table Engine sử dụng policy này khi tạo bảng mới:
CREATE TABLE analytics_db.raw_events_tiered (
event_time DateTime,
user_id UInt64,
event_type String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (user_id, event_time)
SETTINGS storage_policy = 'tiered_storage';
Kết quả mong đợi: Dữ liệu mới nằm trên SSD, dữ liệu cũ tự động migrate sang S3.
Đối với chi phí tính toán, sử dụng Spot Instances (Preemptible VMs) cho các worker node của Kubernetes. Flink TaskManager và ClickHouse nodes (nếu không yêu cầu stateful strict) có thể chạy trên Spot Instances để giảm 60-90% chi phí.
File cấu hình: ./node-pool-spot.yaml
Định nghĩa node pool cho worker (ví dụ AWS EKS):
apiVersion: eks.aws.x-k8s.io/v1alpha1
kind: ManagedNodegroup
metadata:
name: analytics-spot-workers
spec:
instanceTypes:
- m6i.large
- m6i.xlarge
- c6i.large
- c6i.xlarge
minSize: 2
maxSize: 20
desiredSize: 5
instanceType: c6i.large
capacityType: SPOT
subnetIds:
- subnet-12345678
- subnet-87654321
labels:
node-type: worker
tier: spot
Kết quả mong đợi: Kubernetes sẽ tự động provision các node giá rẻ khi cần scale up. Cần kết hợp với Pod Disruption Budget (PDB) để đảm bảo tính sẵn sàng khi Spot Instance bị thu hồi.
Verify kết quả
Kiểm tra chi phí thực tế và trạng thái lưu trữ:
clickhouse-client --query "SELECT disk, formatReadableSize(sum(size)) FROM system.parts WHERE active = 1 GROUP BY disk"
kubectl get nodes -L node-type
Bạn sẽ thấy dữ liệu được phân bổ vào các disk (ssd, s3) và các node được gắn label tier: spot.
Điều hướng series:
Mục lục: Series: Series: Xây dựng hệ thống Real-time Analytics và Stream Processing với Apache Kafka, Flink và ClickHouse trên Kubernetes
« Phần 9: Giám sát hệ thống: Prometheus, Grafana và Alerting
Phần 11: Chiến lược Backup, Recovery và Disaster Recovery »