1. Khởi tạo dự án Maven với Flink Dependencies
Bước đầu tiên là tạo cấu trúc thư mục chuẩn cho dự án Java để tích hợp với Apache Flink.
Tại sao: Maven giúp quản lý các thư viện (dependencies) như Kafka Client, Flink Java API, và các plugin build cần thiết một cách tự động, tránh xung đột phiên bản.
Kết quả mong đợi: Thư mục dự án được tạo với file pom.xml đã cấu hình sẵn.
Thực hiện các lệnh sau để tạo thư mục và file cấu hình:
mkdir -p ~/flink-apps/simple-streaming/src/main/java/com/example
cd ~/flink-apps/simple-streaming
Tạo file pom.xml với nội dung hoàn chỉnh bên dưới. Lưu ý: Phiên bản Flink cần khớp với phiên bản cluster bạn đã cài đặt ở Phần 3 (giả sử là 1.18.0 trong hướng dẫn này).
cat > pom.xml
Verify: Chạy lệnh cat pom.xml để kiểm tra nội dung, đảm bảo file được tạo thành công.
2. Viết mã nguồn Java cho Logic Streaming
Tạo file Java chứa logic chính: đọc từ Kafka, xử lý (map/filter), và ghi ra Console/Kafka.
Tại sao: Đây là trái tim của ứng dụng Flink, nơi định nghĩa Dataflow pipeline. Chúng ta sử dụng DataStream API để xử lý dữ liệu theo thời gian thực.
Kết quả mong đợi: File Java được biên dịch thành công, không có lỗi syntax.
Tạo file SimpleStreamingJob.java tại đường dẫn ~/flink-apps/simple-streaming/src/main/java/com/example/SimpleStreamingJob.java.
cat > src/main/java/com/example/SimpleStreamingJob.java 3
DataStream processedStream = stream
.map(s -> s.toUpperCase())
.filter(s -> s.length() > 3);
// 5. Cấu hình Kafka Sink (hoặc Console Sink)
Properties sinkProps = new Properties();
sinkProps.setProperty("bootstrap.servers", "localhost:9092");
sinkProps.setProperty("transactional.id", "flink-producer-id");
// Ghi kết quả vào topic 'output-topic'
FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
"output-topic",
new SimpleStringSchema(),
sinkProps
);
// Ghi ra Console để debug (bình luận dòng dưới nếu muốn ghi ra Kafka)
// processedStream.print("Processed Output:");
processedStream.sinkTo(kafkaProducer);
// 6. Submit Job
env.execute("Simple Kafka Streaming Job");
}
}
EOF
Verify: Kiểm tra file source code bằng head -n 20 src/main/java/com/example/SimpleStreamingJob.java để đảm bảo cấu trúc class đúng.
3. Build JAR file
Biên dịch dự án Maven thành file JAR duy nhất (Fat JAR) để submit vào cluster.
Tại sao: Flink Cluster cần một file JAR chứa tất cả code và dependencies (trừ những gì đã có sẵn trong classpath của Flink) để chạy.
Kết quả mong đợi: File simple-streaming-job-1.0-SNAPSHOT.jar được tạo trong thư mục target.
Chạy lệnh build. Đảm bảo bạn đang ở thư mục gốc của dự án (~/flink-apps/simple-streaming).
mvn clean package -DskipTests
Chờ quá trình download dependencies và compile hoàn tất. Nếu thành công, bạn sẽ thấy dòng thông báo BUILD SUCCESS.
Verify: Kiểm tra file JAR đã được tạo.
ls -lh target/*.jar
Kết quả mong đợi: Có một file .jar với kích thước khoảng vài MB (ví dụ: simple-streaming-job-1.0-SNAPSHOT.jar).
4. Submit Job vào Flink Cluster
Chạy file JAR vừa build lên Flink Standalone Cluster đã khởi động ở Phần 3.
Tại sao: Đây là bước chuyển từ môi trường phát triển (Dev) sang môi trường chạy (Run). Flink JobManager sẽ nhận JAR và phân phối code tới các TaskManager.
Kết quả mong đợi: Job được chấp nhận, trạng thái chuyển sang RUNNING trên Web UI.
Sử dụng lệnh flink run từ thư mục cài đặt Flink. Giả sử Flink được cài tại /opt/flink.
Trường hợp 1: Submit qua Command Line (CLI)
cd /opt/flink
./bin/flink run ~/flink-apps/simple-streaming/target/simple-streaming-job-1.0-SNAPSHOT.jar
Trường hợp 2: Submit qua Web UI (nếu bạn muốn giao diện trực quan)
Mở trình duyệt, truy cập http://localhost:8081. Click vào tab "Jobs", chọn "Deploy", upload file JAR từ thư mục target.
Verify: Kiểm tra trạng thái Job.
./bin/flink list
Kết quả mong đợi: Xuất hiện tên job Simple Kafka Streaming Job với trạng thái RUNNING và Job ID được hiển thị.
Để xem log thực tế của job đang chạy (chọn Job ID từ bước list):
./bin/flink log -n 50
Verify Sink: Kiểm tra dữ liệu đã được ghi vào Kafka topic 'output-topic'.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output-topic --from-beginning
Kết quả mong đợi: Bạn thấy các dòng text đã được chuyển đổi thành chữ HOA (ví dụ: "HELLO WORLD") xuất hiện trong console.
Điều hướng series:
Mục lục: Series: Triển khai Database Streaming với Apache Flink và Ubuntu 24.04
« Phần 4: Cấu hình source và sink cho Apache Kafka trên Ubuntu
Phần 6: Xử lý dữ liệu có trạng thái (Stateful) và Windowing »