1. Thiết lập môi trường Kafka và cấu hình Spark Streaming
Bước đầu tiên là đảm bảo nguồn dữ liệu thực thời (real-time) đã sẵn sàng trên Kubernetes để Spark có thể tiêu thụ. Chúng ta sẽ triển khai Kafka qua Helm chart và cấu hình Spark để kết nối.
Mục tiêu: Tạo topic Kafka và chuẩn bị thư viện connector để Spark đọc dữ liệu.
Triển khai Kafka với 3 replica và 1 partition để đảm bảo tính sẵn sàng cao.
helm install kafka bitnami/kafka --set replicaCount=3 --set persistence.enabled=true --set persistence.size=10Gi --set zookeeper.replicaCount=1
Kết quả mong đợi: Pod Kafka và Zookeeper ở trạng thái Running, expose port 9092.
Tạo topic mẫu 'user-events' với retention 24h và compression gzip để giảm băng thông.
kubectl exec -it kafka-0 -- /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic user-events --bootstrap-server kafka-0:9092 --partitions 4 --replication-factor 3 --config retention.ms=86400000 --config compression.type=gzip
Kết quả mong đợi: Thông báo "Topic 'user-events' created".
Cấu hình file `spark-defaults.conf` trong Spark Operator hoặc Spark pod để chỉ định các thư viện Kafka connector cần thiết cho Structured Streaming.
Đường dẫn: `/opt/spark/conf/spark-defaults.conf`
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
spark.jars.repositories https://repo1.maven.org/maven2/
spark.sql.streaming.checkpointLocation /tmp/checkpoints/
spark.sql.streaming.fileCheckpointLocation /tmp/checkpoints/files/
Kết quả mong đợi: Spark session sẽ tự động tải các JAR cần thiết khi khởi động, không cần thêm vào classpath thủ công.
2. Cấu hình Spark Structured Streaming để đọc từ Kafka
Chúng ta sẽ viết code Spark (Scala hoặc PySpark) để thiết lập nguồn đọc (source) từ Kafka. Cấu hình này cần xử lý việc ánh xạ schema từ JSON string sang DataFrame.
Mục tiêu: Tạo DataFrame Streaming từ Kafka, parse JSON và chuẩn bị dữ liệu cho bước viết.
Đoạn code PySpark để đọc từ Kafka. Lưu ý cấu hình `kafka.bootstrap.servers` và `subscribe` chính xác với topic đã tạo.
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp
spark = SparkSession.builder \
.appName("RealTimeIngestion") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints/user-events") \
.getOrCreate()
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-0:9092,kafka-1:9092,kafka-2:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", 1000) \
.load()
# Parse JSON trong trường value thành struct
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema_json).alias("data")) \
.select("data.*") \
.withColumn("event_time", to_timestamp(col("event_time"))) \
.withColumn("processing_time", col("timestamp"))
parsed_df.printSchema()
Kết quả mong đợi: Schema hiển thị các trường dữ liệu thực (user_id, event_type, event_time) thay vì chỉ có value là string.
3. Viết dữ liệu Stream vào Apache Iceberg và Delta Lake
Đây là bước cốt lõi: chuyển đổi dữ liệu streaming thành bảng Lakehouse có ACID. Chúng ta sẽ sử dụng cơ chế Write-Ahead Log (WAL) hoặc checkpoint để đảm bảo tính nhất quán.
Mục tiêu: Tạo query write vào bảng Iceberg và Delta, kích hoạt auto-commit.
Viết dữ liệu vào bảng Apache Iceberg. Iceberg yêu cầu một catalog (ví dụ: Hadoop Catalog hoặc REST Catalog) và đường dẫn storage (S3/HDFS).
iceberg_query = parsed_df.writeStream \
.format("iceberg") \
.option("checkpointLocation", "/tmp/checkpoints/iceberg/user-events") \
.option("table", "demo_catalog.db1.user_events_iceberg") \
.option("path", "s3a://my-bucket/iceberg/user-events") \
.option("write.mode", "append") \
.trigger(processingTime="5 seconds") \
.start()
iceberg_query.awaitTermination()
Kết quả mong đợi: Query chạy liên tục, xuất hiện log "Batch 0 completed" mỗi 5 giây khi có dữ liệu mới.
Viết dữ liệu vào bảng Delta Lake. Delta Lake sử dụng DeltaLog để quản lý metadata và hỗ trợ native integration tốt hơn trên S3.
delta_query = parsed_df.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/checkpoints/delta/user-events") \
.option("path", "s3a://my-bucket/delta/user-events") \
.option("mergeSchema", "true") \
.trigger(processingTime="5 seconds") \
.start()
delta_query.awaitTermination()
Kết quả mong đợi: Log hiển thị "Trigger: AvailableNow" hoặc thời gian trigger, các file Parquet được ghi vào S3 và file commit log (_delta_log) được cập nhật.
Verify kết quả: Chạy query Spark SQL để đọc lại dữ liệu đã ghi.
spark.sql("SELECT COUNT(*) FROM demo_catalog.db1.user_events_iceberg").show()
spark.sql("SELECT COUNT(*) FROM delta.`s3a://my-bucket/delta/user-events`").show()
Kết quả mong đợi: Số lượng dòng tăng lên sau mỗi trigger của stream.
4. Quản lý Watermark và xử lý Late Data
Trong môi trường stream, dữ liệu có thể đến trễ (late data) do mạng hoặc hệ thống nguồn. Nếu không xử lý, các phép join hoặc aggregation sẽ bị sai lệch hoặc không bao giờ hoàn thành.
Mục tiêu: Áp dụng watermark để loại bỏ dữ liệu quá cũ và thực hiện window aggregation an toàn.
Áp dụng watermark vào cột `event_time` với độ trễ cho phép là 10 phút. Dữ liệu nào đến muộn hơn 10 phút so với thời gian sự kiện sẽ bị loại bỏ khỏi state.
from pyspark.sql.functions import window, count
watermarked_df = parsed_df.withWatermark("event_time", "10 minutes")
windowed_counts = watermarked_df \
.groupBy(window(col("event_time"), "5 minutes"), "user_id") \
.count()
watermark_query = windowed_counts.writeStream \
.format("console") \
.option("checkpointLocation", "/tmp/checkpoints/watermark") \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.start()
Kết quả mong đợi: Khi dữ liệu late (vượt quá 10 phút) đến, nó sẽ không xuất hiện trong kết quả aggregation, tránh việc tính toán lại toàn bộ window cũ.
Xử lý Late Data bằng cách ghi vào bảng "Late Data" thay vì loại bỏ hoàn toàn, giúp audit sau này.
from pyspark.sql.functions import current_timestamp
late_data_df = parsed_df.filter(col("processing_time") > (col("event_time") + interval(10, "minutes")))
late_query = late_data_df.writeStream \
.format("delta") \
.option("path", "s3a://my-bucket/delta/late-data") \
.option("checkpointLocation", "/tmp/checkpoints/late-data") \
.trigger(processingTime="10 seconds") \
.start()
Kết quả mong đợi: Dữ liệu bị trễ hơn 10 phút được ghi riêng vào bảng `late-data` thay vì làm hỏng bảng chính.
5. Triển khai Streaming Job với Auto-scaling trên Kubernetes
Để xử lý tải cao, chúng ta cần Spark Executor tự động mở rộng (dynamic allocation) và Kubernetes tự động scale số pod (HPA) dựa trên độ trễ (lag) của Kafka.
Mục tiêu: Cấu hình Dynamic Allocation trong Spark và HPA trong Kubernetes dựa trên metric Kafka lag.
Cấu hình Dynamic Allocation trong `spark-defaults.conf` hoặc code để Spark tự thêm/bớt executor.
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.initialExecutors 3
spark.dynamicAllocation.cachedExecutors true
spark.dynamicAllocation.executorIdleTimeout 60s
Kết quả mong đợi: Spark UI hiển thị số lượng executor thay đổi theo thời gian thực khi lượng data vào tăng/giảm.
Triển khai Spark Job trên Kubernetes sử dụng Spark Operator với tài nguyên giới hạn (limit) và yêu cầu (request) hợp lý để HPA hoạt động.
File: `spark-streaming-job.yaml`
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: user-events-streaming
namespace: default
spec:
type: Scala
mode: cluster
sparkVersion: 3.5.0
restartPolicy:
type: Never
driver:
cores: 2
memory: 2Gi
labels:
version: 1.0
executor:
cores: 2
instances: 2
memory: 2Gi
dynamicAllocation:
enabled: true
minExecutors: 2
maxExecutors: 20
initialExecutors: 3
schedulerBacklog: 1000
volumes:
- name: checkpoint-storage
persistentVolumeClaim:
claimName: iceberg-storage-pvc
volumeCloudStorage:
- name: s3-storage
s3:
bucket: my-bucket
path: /
dependencies:
- mavenCoordinate: org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
Kết quả mong đợi: Pod Spark Driver và Executor chạy trên K8s, Executor tự động scale từ 2 lên 20 khi cần.
Cấu hình Horizontal Pod Autoscaler (HPA) cho Executor Pod dựa trên metric Kafka Consumer Lag. Cần cài đặt Prometheus Adapter và Kafka Exporter trước.
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: spark-executor-hpa
namespace: default
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: StatefulSet
name: spark-executor # Tên pod template của executor
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: kafka_consumer_lag
target:
type: AverageValue
averageValue: "10000" # Scale up nếu lag trung bình > 10,000 message
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 60
- type: Pods
value: 5
periodSeconds: 60
selectPolicy: Max
Kết quả mong đợi: Khi Kafka lag tăng, K8s tự động tạo thêm Executor pod để giảm lag xuống dưới ngưỡng 10,000.
Verify kết quả: Quan sát log của Spark Driver và số lượng pod Executor trong Kubernetes.
kubectl get pods -l app=spark-executor -w
kubectl describe hpa spark-executor-hpa
Kết quả mong đợi: Số lượng pod Executor tăng lên khi có traffic lớn và giảm xuống khi traffic hết, đồng thời log Spark hiển thị "Dynamic allocation added X executors".
Điều hướng series:
Mục lục: Series: Series: Xây dựng nền tảng Data Lakehouse hiện đại với Apache Iceberg, Delta Lake và Spark trên Kubernetes
« Phần 7: Quản lý tài nguyên và tối ưu hiệu năng trên Kubernetes
Phần 9: An toàn và kiểm soát truy cập: RBAC, Encryption và Audit »