Cấu hình Kafka Source Connector trong Flink Job
Bước đầu tiên là định nghĩa Source trong mã nguồn Flink để kết nối với topic Kafka đã tạo ở phần trước. Chúng ta sẽ sử dụng KafkaConnector API của Flink để đọc dữ liệu từ topic raw-events.
Việc cấu hình đúng bootstrap server và group ID là bắt buộc để Flink biết nơi lấy dữ liệu và quản lý offset (vị trí đã đọc) của consumer group.
Đầu tiên, hãy tạo file Java source code tại đường dẫn /app/src/main/java/com/example/IngestJob.java với nội dung hoàn chỉnh sau:
package com.example;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class IngestJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // Đặt độ song song cho job ingest
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-bootstrap:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-ingest-group");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Bắt đầu đọc từ đầu topic
FlinkKafkaConsumer kafkaSource = new FlinkKafkaConsumer(
"raw-events",
props
);
DataStream rawStream = env.addSource(kafkaSource);
// Chỉ in ra console để verify trong bước này
rawStream.print();
env.execute("Kafka Ingest Pipeline");
}
}
Kết quả mong đợi: File IngestJob.java được lưu thành công. Khi biên dịch, không có lỗi import package nào.
Để verify cấu hình, hãy biên dịch và đóng gói job thành file JAR bằng Maven hoặc Gradle. Đảm bảo thư mục /app/target xuất hiện file flink-ingest-job-1.0.jar.
Xử lý Schema Avro/JSON và tự động phát hiện Schema
Dữ liệu thực tế từ Kafka thường ở định dạng JSON hoặc Avro. Nếu dùng StringDeserializer như trên, Flink chỉ nhận được chuỗi thô. Để xử lý logic business, ta cần parse thành Object (POJO).
Chúng ta sẽ chuyển đổi sang sử dụng JsonDeserializationSchema để tự động ánh xạ JSON string thành đối tượng Java có cấu trúc. Điều này giúp Flink hiểu các trường dữ liệu (fields) mà không cần viết code parse thủ công.
Định nghĩa một POJO (Plain Old Java Object) tại /app/src/main/java/com/example/Event.java:
package com.example;
import org.apache.flink.serialization.SerializationSchema;
import org.apache.flink.util.Cleanupable;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Event {
private final String deviceId;
private final long timestamp;
private final String eventType;
private final double value;
@JsonCreator
public Event(@JsonProperty("device_id") String deviceId,
@JsonProperty("timestamp") long timestamp,
@JsonProperty("event_type") String eventType,
@JsonProperty("value") double value) {
this.deviceId = deviceId;
this.timestamp = timestamp;
this.eventType = eventType;
this.value = value;
}
// Getters
public String getDeviceId() { return deviceId; }
public long getTimestamp() { return timestamp; }
public String getEventType() { return eventType; }
public double getValue() { return value; }
}
Kết quả mong đợi: Class Event được tạo với các trường tương ứng với schema JSON của dữ liệu đầu vào.
Cập nhật file IngestJob.java để sử dụng schema này thay vì StringDeserializer:
package com.example;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class IngestJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-bootstrap:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-ingest-group");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Sử dụng StringDeserializer cho key, nhưng Value sẽ được xử lý bởi JsonDeserializationSchema bên dưới
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Định nghĩa Schema tự động từ class Event
JsonDeserializationSchema jsonSchema = new JsonDeserializationSchema(Event.class);
FlinkKafkaConsumer kafkaSource = new FlinkKafkaConsumer(
"raw-events",
jsonSchema,
props
);
DataStream eventStream = env.addSource(kafkaSource);
// In ra đối tượng Event đã được parse
eventStream.print();
env.execute("Kafka Ingest Pipeline with JSON Schema");
}
}
Kết quả mong đợi: Khi job chạy, log console sẽ hiển thị nội dung object JSON đã được parse thành cấu trúc Event thay vì chuỗi raw string. Nếu có lỗi định dạng JSON, Flink sẽ throw exception tại điểm này.
Áp dụng Watermark và xử lý Event Time trong Stream
Khi làm Real-time Analytics, thứ tự sự kiện trong Kafka (Ingestion Time) có thể khác với thời điểm sự kiện thực sự xảy ra (Event Time). Flink cần biết Event Time để thực hiện Windowing chính xác.
Chúng ta cần trích xuất trường timestamp từ object Event và gán nó làm Event Time, đồng thời tạo Watermark để báo hiệu thời gian trôi qua.
Cập nhật logic trong IngestJob.java để áp dụng assignTimestampsAndWatermarks:
package com.example;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class IngestJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-bootstrap:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-ingest-group");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
JsonDeserializationSchema jsonSchema = new JsonDeserializationSchema(Event.class);
FlinkKafkaConsumer kafkaSource = new FlinkKafkaConsumer(
"raw-events",
jsonSchema,
props
);
DataStream eventStream = env.addSource(kafkaSource);
// Áp dụng Event Time và Watermark
// Cho phép dữ liệu trễ tối đa 5000ms (5 giây) so với event time
DataStream streamWithWatermark = eventStream
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor(5000L) {
@Override
public long extractTimestamp(Event element) {
return element.getTimestamp();
}
}
);
streamWithWatermark.print();
env.execute("Kafka Ingest Pipeline with Watermarks");
}
}
Kết quả mong đợi: Khi chạy job, log sẽ hiển thị thêm thông tin về Watermark (ví dụ: WATERMARK: 1678888888000). Điều này xác nhận Flink đang theo dõi thời gian sự kiện thay vì thời gian hệ thống.
Để verify, hãy kiểm tra log console. Nếu bạn gửi dữ liệu có timestamp cũ hơn hiện tại, job vẫn sẽ xử lý chúng nếu nằm trong ngưỡng 5 giây (bounded out-of-orderness).
Chạy Job và giám sát thông số Throughput và Lag
Sau khi đã cấu hình xong code, bước cuối là triển khai job lên Kubernetes Cluster và quan sát hiệu năng. Chúng ta sẽ sử dụng kubectl để submit job và truy cập Flink Dashboard để xem metrics.
Đầu tiên, submit job vào Kubernetes. Giả sử file JAR đã được build và nằm trong container hoặc volume mounted tại /app/target/flink-ingest-job.jar.
kubectl flink run \
--class com.example.IngestJob \
--jar /app/target/flink-ingest-job.jar \
--name kafka-ingest-pipeline \
--parallelism 4 \
--fromSavepoint latest \
2>&1 | tee flink-submit.log
Kết quả mong đợi: Command trả về ID của Job (ví dụ: job-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) và trạng thái RUNNING. File log flink-submit.log được tạo ra.
Để giám sát, cần truy cập Flink Dashboard (thường expose qua port 8081). Nếu bạn đang dùng Flink Operator, hãy port-forward service của JobManager:
kubectl port-forward svc/flink-kafka-ingest-pipeline-jobmanager 8081:8081
Kết quả mong đợi: Trình duyệt mở địa chỉ http://localhost:8081 và hiển thị giao diện Dashboard của Flink.
Tại Dashboard, vào tab Metrics của TaskManager hoặc Source Node để kiểm tra các thông số quan trọng:
- Throughput (records/sec): Số lượng bản ghi được đọc từ Kafka mỗi giây. Giá trị này phải dương và ổn định nếu có dữ liệu vào.
- Source Latency (ms): Thời gian trễ từ lúc dữ liệu được ghi vào Kafka đến lúc Flink đọc được. Giá trị này phản ánh độ trễ của pipeline ingest.
- Checkpoint Latency (ms): Thời gian để hoàn thành một checkpoint (nếu đã bật). Nếu quá cao (>10s), cần xem xét giảm parallelism hoặc tối ưu logic.
Để verify lag một cách chính xác hơn qua command line, hãy sử dụng kafka-consumer-groups để so sánh offset của Kafka với offset hiện tại của Flink Group:
kafka-consumer-groups.sh \
--bootstrap-server kafka-bootstrap:9092 \
--group flink-ingest-group \
--describe
Kết quả mong đợi: Command trả về bảng hiển thị TOPIC, PARTITION, CURRENT-OFFSET, LOG-END-OFFSET và LAG. Giá trị LAG phải bằng 0 hoặc rất nhỏ (< 100 records) khi hệ thống chạy ổn định. Nếu LAG tăng liên tục, throughput của Flink không theo kịp tốc độ ghi của Producer.
Điều hướng series:
Mục lục: Series: Series: Xây dựng hệ thống Real-time Analytics và Stream Processing với Apache Kafka, Flink và ClickHouse trên Kubernetes
« Phần 4: Cài đặt Apache Flink trên Kubernetes với Kubernetes Operator
Phần 6: Xử lý dữ liệu thời gian thực: Aggregation và Windowing với Flink »