Thiết kế Spark Job để đọc dữ liệu thô từ Object Storage
Để bắt đầu pipeline ETL, chúng ta cần cấu hình Spark để truy cập trực tiếp vào Object Storage (S3, MinIO, GCS) nơi chứa dữ liệu thô (raw data) dưới dạng Parquet hoặc ORC.
Việc này yêu cầu Spark phải có quyền truy cập (credentials) và biết chính xác đường dẫn (URI) của bucket chứa dữ liệu nguồn.
Trên Kubernetes, cách tốt nhất là lưu trữ credentials dưới dạng Kubernetes Secret và inject vào Spark Driver/Executor qua biến môi trường, tránh hardcode vào code.
1. Chuẩn bị Secret cho Object Storage
Tạo Kubernetes Secret chứa access key và secret key của Object Storage (ví dụ MinIO hoặc AWS S3).
kubectl create secret generic s3-credentials \
--from-literal=AWS_ACCESS_KEY_ID="minioadmin" \
--from-literal=AWS_SECRET_ACCESS_KEY="minioadmin" \
--namespace=spark-ns
Kết quả: Secret được tạo trong namespace `spark-ns`, sẵn sàng để mount vào Pod.
2. Cấu hình Spark Submission với Credentials
Chúng ta sẽ sử dụng `spark-submit` để gửi job vào Kubernetes. Cần map các biến môi trường từ Secret vào trong Spark Application.
spark-submit \
--master=k8s://https://kubernetes.default.svc \
--deploy-mode=cluster \
--name=etl-raw-to-iceberg \
--class org.apache.spark.sql.streaming.StreamingQuery \
--conf spark.driver.memory=4g \
--conf spark.executor.memory=4g \
--conf spark.executor.instances=3 \
--conf spark.kubernetes.secret.s3-credentials=AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY \
--conf spark.sql.catalogImplementation=in-memory \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=s3a://my-lakehouse-bucket/warehouse \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.endpoint=http://minio.minio.svc.cluster.local:9000 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
/opt/spark/jars/iceberg-spark-3.5_2.12-1.4.0.jar \
--source-uri s3a://my-lakehouse-bucket/raw-data/sales-2023-10 \
--target-table spark_catalog.default.sales_iceberg
Kết quả: Pod Spark được tạo, Driver và Executor đều có thể đọc dữ liệu từ `s3a://...` nhờ secret được inject tự động.
Chuyển đổi dữ liệu từ Parquet/ORC sang Iceberg/Delta
Yêu cầu cốt lõi của ETL là chuyển đổi định dạng lưu trữ từ file-based (Parquet/ORC) sang table-based (Iceberg/Delta) để hỗ trợ ACID transactions và Time Travel.
Chúng ta sẽ viết một Spark Job đơn giản thực hiện đọc nguồn (source) và ghi đích (sink) với cơ chế `append` hoặc `overwrite`.
1. Mã nguồn Spark Job (Python/PySpark)
File Python sẽ được lưu trong container hoặc được upload lên Object Storage để Spark load. Dưới đây là nội dung file `etl_pipeline.py`.
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
def create_spark_session():
spark = (SparkSession.builder
.appName("ETL_Raw_to_Iceberg")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", "s3a://my-lakehouse-bucket/warehouse")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio.svc.cluster.local:9000")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.getOrCreate())
return spark
def main():
if len(sys.argv) != 3:
print("Usage: etl_pipeline.py <source_uri> <target_table_name>")
sys.exit(1)
source_uri = sys.argv[1]
target_table = sys.argv[2]
spark = create_spark_session()
# Đọc dữ liệu thô (Parquet/ORC)
# Spark tự động detect schema từ file Parquet/ORC
df_source = spark.read.format("parquet").load(source_uri)
# Xử lý Schema Evolution (thêm cột metadata)
df_processed = df_source.withColumn("etl_timestamp", current_timestamp())
# Ghi vào Iceberg Table (Sử dụng Append mode để không xóa dữ liệu cũ nếu có)
# Nếu muốn Overwrite toàn bộ, dùng .mode("overwrite").saveAsTable()
df_processed.write \
.mode("append") \
.format("iceberg") \
.option("write.merge.schema", "true") \
.saveAsTable(target_table)
spark.stop()
print(f"ETL completed. Data written to table: {target_table}")
if __name__ == "__main__":
main()
Kết quả: File Python xử lý việc đọc Parquet, thêm cột thời gian ETL và ghi vào bảng Iceberg với tính năng tự động mở rộng schema.
2. Triển khai Job vào Kubernetes
Upload file script lên Object Storage và chạy job với tham số cụ thể.
# Upload script lên bucket (giả sử dùng aws cli hoặc mc)
aws s3 cp etl_pipeline.py s3a://my-lakehouse-bucket/scripts/etl_pipeline.py
# Chạy job
spark-submit \
--master=k8s://https://kubernetes.default.svc \
--deploy-mode=cluster \
--name=etl-conversion-job \
--class org.apache.spark.sql.SparkSession \
--conf spark.driver.memory=4g \
--conf spark.executor.memory=4g \
--conf spark.executor.instances=2 \
--conf spark.kubernetes.secret.s3-credentials=AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions \
--conf spark.sql.catalogImplementation=in-memory \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=s3a://my-lakehouse-bucket/warehouse \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.endpoint=http://minio.minio.svc.cluster.local:9000 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
s3a://my-lakehouse-bucket/scripts/etl_pipeline.py \
s3a://my-lakehouse-bucket/raw-data/sales-2023-10 \
spark_catalog.default.sales_iceberg
Kết quả: Pod chạy, đọc dữ liệu từ raw folder, chuyển đổi sang Iceberg và ghi vào warehouse.
Verify kết quả chuyển đổi
Kiểm tra xem bảng Iceberg đã được tạo và chứa dữ liệu chưa bằng Spark SQL.
spark-submit \
--master=k8s://https://kubernetes.default.svc \
--deploy-mode=client \
--name=verify-iceberg-table \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=s3a://my-lakehouse-bucket/warehouse \
--conf spark.hadoop.fs.s3a.endpoint=http://minio.minio.svc.cluster.local:9000 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.kubernetes.secret.s3-credentials=AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY \
--conf spark.sql.catalogImplementation=in-memory \
--conf spark.kubernetes.container.image=spark:3.5.0-scala2.12 \
--conf spark.driver.memory=1g \
--conf spark.executor.instances=1 \
--conf spark.executor.memory=1g \
--class org.apache.spark.sql.SparkSession \
-- \
"SELECT * FROM spark_catalog.default.sales_iceberg LIMIT 5"
Kết quả mong đợi: In ra 5 dòng đầu tiên của bảng, xác nhận dữ liệu đã được chuyển đổi thành công và có thêm cột `etl_timestamp`.
Triển khai pipeline ETL dưới dạng Kubernetes Job hoặc CronJob
Để tự động hóa quá trình ETL, chúng ta không nên chạy thủ công `spark-submit`. Hãy đóng gói logic vào Kubernetes Job (chạy 1 lần) hoặc CronJob (chạy định kỳ).
Sử dụng Kubernetes Job là cách chuẩn để chạy batch processing trong môi trường Cloud Native.
1. Tạo Kubernetes Job Manifest (YAML)
Tạo file `spark-etl-job.yaml` định nghĩa Pod Spark chạy dưới dạng Job.
apiVersion: batch/v1
kind: Job
metadata:
name: spark-etl-conversion
namespace: spark-ns
labels:
app: spark-etl
spec:
ttlSecondsAfterFinished: 300
template:
spec:
containers:
- name: spark-driver
image: spark:3.5.0-scala2.12
command:
- spark-submit
args:
- --master
- k8s://https://kubernetes.default.svc
- --deploy-mode
- cluster
- --name
- etl-conversion-job
- --class
- org.apache.spark.sql.SparkSession
- --conf
- spark.driver.memory=4g
- --conf
- spark.executor.memory=4g
- --conf
- spark.executor.instances=2
- --conf
- spark.kubernetes.secret.s3-credentials=AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY
- --conf
- spark.sql.extensions=org.apache.iceberg.spark.extensions
- --conf
- spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog
- --conf
- spark.sql.catalog.spark_catalog.type=hadoop
- --conf
- spark.sql.catalog.spark_catalog.warehouse=s3a://my-lakehouse-bucket/warehouse
- --conf
- spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
- --conf
- spark.hadoop.fs.s3a.endpoint=http://minio.minio.svc.cluster.local:9000
- --conf
- spark.hadoop.fs.s3a.path.style.access=true
- s3a://my-lakehouse-bucket/scripts/etl_pipeline.py
- s3a://my-lakehouse-bucket/raw-data/sales-2023-10
- spark_catalog.default.sales_iceberg
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: s3-credentials
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: s3-credentials
key: AWS_SECRET_ACCESS_KEY
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
restartPolicy: Never
backoffLimit: 4
Kết quả: Manifest file được tạo sẵn sàng để áp dụng vào cluster.
2. Áp dụng Job vào Cluster
Sử dụng `kubectl` để tạo Job từ file YAML.
kubectl apply -f spark-etl-job.yaml -n spark-ns
Kết quả: Kubernetes tạo Pod, chạy job, sau khi xong (thành công hoặc lỗi) Pod sẽ tự xóa (nếu cấu hình ttlSecondsAfterFinished).
3. Chuyển sang CronJob cho ETL định kỳ
Nếu cần chạy ETL mỗi giờ, chuyển đổi Job thành CronJob.
apiVersion: batch/v1
kind: CronJob
metadata:
name: spark-etl-cron
namespace: spark-ns
spec:
schedule: "0 * * * *" # Chạy vào phút 0 của mỗi giờ
jobTemplate:
spec:
template:
spec:
containers:
- name: spark-driver
image: spark:3.5.0-scala2.12
command:
- spark-submit
args:
- --master
- k8s://https://kubernetes.default.svc
- --deploy-mode
- cluster
- --name
- etl-conversion-cron
- --class
- org.apache.spark.sql.SparkSession
- --conf
- spark.driver.memory=4g
- --conf
- spark.executor.memory=4g
- --conf
- spark.executor.instances=2
- --conf
- spark.kubernetes.secret.s3-credentials=AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY
- --conf
- spark.sql.extensions=org.apache.iceberg.spark.extensions
- --conf
- spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog
- --conf
- spark.sql.catalog.spark_catalog.type=hadoop
- --conf
- spark.sql.catalog.spark_catalog.warehouse=s3a://my-lakehouse-bucket/warehouse
- --conf
- spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
- --conf
- spark.hadoop.fs.s3a.endpoint=http://minio.minio.svc.cluster.local:9000
- --conf
- spark.hadoop.fs.s3a.path.style.access=true
- s3a://my-lakehouse-bucket/scripts/etl_pipeline.py
- s3a://my-lakehouse-bucket/raw-data/sales-2023-10
- spark_catalog.default.sales_iceberg
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: s3-credentials
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: s3-credentials
key: AWS_SECRET_ACCESS_KEY
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
restartPolicy: Never
concurrencyPolicy: Forbid # Chặn việc chạy song song nếu job trước chưa xong
startingDeadlineSeconds: 10
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 3
Kết quả: CronJob được tạo, tự động kích hoạt Spark Job theo lịch trình đã định.
Verify kết quả CronJob
Kiểm tra lịch sử chạy của CronJob để đảm bảo pipeline hoạt động định kỳ.
kubectl get cronjobs -n spark-ns
kubectl describe cronjob spark-etl-cron -n spark-ns
kubectl get jobs -n spark-ns | grep etl-conversion
Kết quả mong đợi: Thấy CronJob đang chạy, danh sách các Job con được tạo theo lịch, trạng thái `SUCCEEDED`.
Xử lý Schema Evolution trong quá trình ETL
Trong môi trường Data Lakehouse, dữ liệu nguồn (raw data) thường thay đổi cấu trúc (thêm cột, đổi kiểu) theo thời gian. Spark cần xử lý các thay đổi này mà không làm sập pipeline.
Iceberg và Delta Lake hỗ trợ Schema Evolution tự động, nhưng cần cấu hình đúng để Spark chấp nhận các thay đổi.
1. Cấu hình Spark cho Schema Evolution
Cần bật tính năng `mergeSchema` trong Spark SQL để khi ghi vào Iceberg/Delta, hệ thống tự động thêm cột mới từ nguồn vào đích.
--conf spark.sql.sources.mergeSchema.enabled=true
--conf spark.sql.catalog.spark_catalog.write.merge.schema=true
Kết quả: Spark sẽ không throw lỗi khi phát hiện cột mới trong source mà chưa có trong target table.
2. Xử lý trường hợp thay đổi kiểu dữ liệu (Type Change)
Iceberg cho phép thay đổi kiểu dữ liệu an toàn (ví dụ: từ Integer sang Long) nhưng không cho phép thay đổi không tương thích (ví dụ: String sang Integer). Để xử lý, cần logic cast thủ công trong code PySpark nếu kiểu thay đổi quá lớn.
Ví dụ xử lý thêm cột mới `customer_segment` (String) vào bảng cũ.
# Trong file etl_pipeline.py, thêm logic kiểm tra schema
from pyspark.sql.types import ArrayType, StringType
def handle_schema_evolution(df_source, target_table_name):
spark = SparkSession.builder.getOrCreate()
# Kiểm tra xem bảng đích đã tồn tại chưa
if spark.catalog.tableExists(target_table_name):
target_table = spark.table(target_table_name)
target_schema = target_table.schema
# So sánh schema nguồn và đích
source_fields = {f.name: f for f in df_source.schema.fields}
target_fields = {f.name: f for f in target_schema.fields}
# Xử lý cột mới
new_columns = [name for name in source_fields if name not in target_fields]
if new_columns:
print(f"Detected new columns: {new_columns}")
# Iceberg tự động merge nếu bật write.merge.schema=true
# Nhưng nếu cần xử lý logic đặc biệt cho cột mới, làm ở đây
for col_name in new_columns:
# Ví dụ: cast hoặc transform dữ liệu trước khi ghi
# df_source = df_source.withColumn(col_name, col(col_name).cast(StringType()))
pass
else:
print("Table does not exist, creating new table with full schema.")
return df_source
# Gọi hàm trong main
df_processed = handle_schema_evolution(df_source, target_table)
df_processed.write.mode("append").format("iceberg").option("write.merge.schema", "true").saveAsTable(target_table)
Kết quả: Pipeline không bị lỗi khi nguồn dữ liệu có thêm cột mới, Iceberg tự động cập nhật metadata của bảng.
3. Xử lý Schema Conflict (Cột bị xóa)
Nếu cột trong nguồn bị xóa, Iceberg vẫn giữ lại cột đó trong bảng đích (vì lịch sử dữ liệu cũ cần cột đó). Dữ liệu mới ghi vào sẽ có giá trị `null` cho cột đã bị xóa ở nguồn.
Để tránh lỗi khi đọc, Spark cần cấu hình `spark.sql.sources.schema.checking`.
--conf spark.sql.sources.schema.checking=false
Kết quả: Spark bỏ qua việc kiểm tra nghiêm ngặt schema, cho phép ghi dữ liệu vào bảng đích ngay cả khi source thiếu một số cột cũ (các cột đó sẽ là null).
Verify Schema Evolution
Thêm một cột mới vào file Parquet nguồn và chạy lại ETL, sau đó kiểm tra schema bảng đích.
spark-submit \
--master=k8s://https://kubernetes.default.svc \
--deploy-mode=client \
--name=verify-schema-evolution \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=s3a://my-lakehouse-bucket/warehouse \
--conf spark.hadoop.fs.s3a.endpoint=http://minio.minio.svc.cluster.local:9000 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.kubernetes.secret.s3-credentials=AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY \
--conf spark.sql.catalogImplementation=in-memory \
--conf spark.driver.memory=1g \
--conf spark.executor.instances=1 \
--conf spark.executor.memory=1g \
--class org.apache.spark.sql.SparkSession \
-- \
"DESCRIBE EXTENDED spark_catalog.default.sales_iceberg"
Kết quả mong đợi: Output hiển thị danh sách cột của bảng, trong đó có cả cột mới vừa được thêm từ lần chạy ETL gần nhất, và cột `etl_timestamp` vẫn còn.
Điều hướng series:
Mục lục: Series: Series: Xây dựng nền tảng Data Lakehouse hiện đại với Apache Iceberg, Delta Lake và Spark trên Kubernetes
« Phần 4: Triển khai Delta Lake: Tạo bảng và tối ưu hóa lưu trữ
Phần 6: Tích hợp Catalog trung tâm: Apache Hudi vs Glue Catalog vs Nessie »