Cấu hình Dynamic Allocation cho Spark Executor trên Kubernetes
Bật tính năng Dynamic Allocation giúp Spark tự động thêm hoặc giảm Executor dựa trên tải công việc (backlog) thay vì chạy cố định số lượng pod, giúp tiết kiệm tài nguyên cụm Kubernetes khi không có job đang chạy.
Tạo file cấu hình spark-defaults.conf tại đường dẫn /etc/spark/conf/spark-defaults.conf trong container Driver. Nội dung file bao gồm các tham số bật tính năng và giới hạn số lượng Executor tối đa/tối thiểu.
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=20
spark.dynamicAllocation.initialExecutors=5
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=10s
spark.dynamicAllocation.shuffleTracking.enabled=true
Kết quả mong đợi: Khi submit job, Spark sẽ chỉ khởi tạo 5 Executor ban đầu. Nếu job cần thêm tài nguyên, Kubernetes sẽ tự động tạo thêm pod Executor lên đến 20. Khi job xong, các pod thừa sẽ bị loại bỏ sau 10s.
Verify kết quả bằng cách quan sát số lượng Pod trong namespace của Spark trên Kubernetes Dashboard hoặc dùng lệnh kubectl để theo dõi sự thay đổi số lượng pod trong quá trình chạy job.
kubectl get pods -n spark-namespace -w
Sử dụng Spark Pod Presets và Resource Quotas
Sử dụng Resource Quotas để giới hạn tổng tài nguyên CPU và RAM mà namespace Spark được phép sử dụng, tránh tình trạng Spark chiếm hết tài nguyên của cụm Kubernetes.
Tạo file YAML định nghĩa ResourceQuota tại đường dẫn /k8s-config/spark-quota.yaml. File này giới hạn tổng CPU và Memory cho namespace, cũng như giới hạn số lượng Pod.
apiVersion: v1
kind: ResourceQuota
metadata:
name: spark-resource-quota
namespace: spark-namespace
spec:
hard:
requests.cpu: "50"
requests.memory: 200Gi
limits.cpu: "100"
limits.memory: 400Gi
pods: "50"
Áp dụng cấu hình bằng lệnh kubectl. Kết quả mong đợi là Kubernetes sẽ từ chối (Reject) việc tạo Pod mới nếu tổng tài nguyên đã vượt quá giới hạn đã đặt trong file.
kubectl apply -f /k8s-config/spark-quota.yaml
Định nghĩa Pod Preset để tự động gắn các label, annotation hoặc environment variables vào mọi Pod Executor được tạo bởi Spark, giúp đồng nhất cấu hình mà không cần sửa code Spark.
Tạo file YAML định nghĩa PodPreset tại đường dẫn /k8s-config/spark-pod-preset.yaml. Preset này sẽ thêm secret chứa credentials và label monitoring vào mọi pod có label spark-role: executor.
apiVersion: settings.k8s.io/v1alpha1
kind: PodPreset
metadata:
name: spark-executor-preset
spec:
selector:
matchLabels:
spark-role: executor
env:
- name: SPARK_LOCAL_DIRS
value: "/mnt/local-disk"
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: secret-key
volumes:
- name: local-storage
emptyDir:
medium: Memory
volumeMounts:
- name: local-storage
mountPath: "/mnt/local-disk"
Áp dụng Preset. Kết quả mong đợi: Khi Spark tạo Executor, Kubernetes sẽ tự động mount volume local và inject biến môi trường AWS vào pod đó trước khi pod khởi động.
kubectl apply -f /k8s-config/spark-pod-preset.yaml
Tối ưu Memory Management để tránh OOM trên Kubernetes
Cấu hình chính xác tỷ lệ memory dành cho Executor để tránh lỗi OOM (Out Of Memory) do container bị Kubernetes kill khi vượt quá limit, đặc biệt là khi Spark sử dụng nhiều bộ nhớ cho Shuffle và Off-heap.
Chỉnh sửa file spark-defaults.conf tại /etc/spark/conf/spark-defaults.conf để tăng cường quản lý bộ nhớ. Cần thiết lập spark.memory.fraction và spark.memory.storageFraction phù hợp với công việc, đồng thời bật spark.memory.offHeap.enabled nếu cần dùng bộ nhớ ngoài JVM.
spark.executor.memory=8g
spark.executor.memoryOverhead=2g
spark.memory.fraction=0.6
spark.memory.storageFraction=0.3
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g
spark.kubernetes.container.image.pullPolicy=IfNotPresent
Giải thích tham số: spark.executor.memory là bộ nhớ dành cho Heap (JVM), spark.executor.memoryOverhead là bộ nhớ dành cho Native, Thread stack và Off-heap. Tổng bộ nhớ yêu cầu của container là tổng của 2 giá trị này.
Kết quả mong đợi: Spark sẽ không bị Crash do OOM Killer của Kubernetes. Bộ nhớ Off-heap giúp giảm áp lực lên Garbage Collector, cải thiện hiệu năng cho các job xử lý dữ liệu lớn.
Verify kết quả bằng cách xem logs của Executor và metrics của container. Nếu không có dòng "Killed" trong logs và container không bị restart liên tục do OOM, cấu hình đã thành công.
kubectl logs -l spark-role=executor -n spark-namespace --tail=100 | grep -i "killed\|oom"
Cấu hình Shuffle Partition và Broadcast Join cho các job lớn
Tối ưu hóa số lượng partition trong quá trình Shuffle để giảm lượng dữ liệu truyền qua mạng (network shuffle) giữa các Executor, tránh tình trạng nghẽn cổ chai (bottleneck).
Thêm cấu hình vào file spark-defaults.conf tại /etc/spark/conf/spark-defaults.conf. Giá trị spark.sql.shuffle.partitions mặc định là 200, thường quá nhỏ cho data lakehouse. Cần tăng lên tương ứng với số lượng Executor và số core.
spark.sql.shuffle.partitions=2000
spark.network.timeout=800s
spark.executor.heartbeatInterval=60s
Kết quả mong đợi: Khi thực hiện các phép toán nhóm (groupby) hoặc join lớn, dữ liệu được phân chia đều hơn, thời gian chạy giảm và tránh lỗi timeout do mất kết nối mạng giữa các node.
Kích hoạt Broadcast Join để gửi toàn bộ bảng nhỏ (small table) về tất cả các Executor, thay vì thực hiện Shuffle Join tốn kém cho cả hai bảng. Điều này cực kỳ hiệu quả khi join bảng lớn với bảng tham chiếu nhỏ.
Thêm cấu hình vào file spark-defaults.conf tại /etc/spark/conf/spark-defaults.conf. Thiết lập ngưỡng kích thước bảng để Spark tự động quyết định broadcast.
spark.sql.autoBroadcastJoinThreshold=104857600
spark.sql.broadcastTimeout=300s
Giải thích: 104857600 bytes tương đương 100MB. Nếu bảng join nhỏ hơn 100MB, Spark sẽ tự động broadcast. Bạn có thể tăng con số này nếu muốn broadcast bảng lớn hơn.
Kết quả mong đợi: Trong Spark UI (tab SQL), các query join sẽ hiển thị "BroadcastExchange" thay vì "ShuffleExchange" đối với bảng nhỏ, giảm đáng kể thời gian thực thi.
Verify kết quả bằng cách mở Spark UI, vào tab "SQL" -> "Details" của một query join. Quan sát dòng execution plan, tìm kiếm từ khóa "BroadcastExchange" để xác nhận cơ chế này đã được kích hoạt.
curl -s http://spark-master-service:8080/ui/jobs | grep -o "BroadcastExchange"
Điều hướng series:
Mục lục: Series: Series: Xây dựng nền tảng Data Lakehouse hiện đại với Apache Iceberg, Delta Lake và Spark trên Kubernetes
« Phần 6: Tích hợp Catalog trung tâm: Apache Hudi vs Glue Catalog vs Nessie
Phần 8: Xử lý dữ liệu stream: Real-time Ingestion vào Lakehouse »