Cài đặt và cấu hình Apache Kafka trên Ubuntu 24.04
Tải và cài đặt Apache Kafka
Tải bản phân phối Kafka từ trang chủ Apache và giải nén vào thư mục /opt để chuẩn bị cho việc cấu hình server.
sudo apt update && sudo apt install -y wget
wget https://archive.apache.org/dist/kafka/3.8.0/kafka_2.13-3.8.0.tgz
sudo mkdir -p /opt/kafka
sudo tar -xzf kafka_2.13-3.8.0.tgz -C /opt/kafka --strip-components=1
sudo rm kafka_2.13-3.8.0.tgz
Kết quả: Thư mục /opt/kafka được tạo với cấu trúc bin, configs, libs bên trong.
Cấu hình file server.properties
Sửa file cấu hình chính để định nghĩa ID broker, đường dẫn log, và cổng kết nối, đồng thời đảm bảo Kafka lắng nghe trên địa chỉ IP của máy chủ để Flink có thể kết nối.
Đường dẫn file: /opt/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://YOUR_SERVER_IP:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.log.replication.factor=1
transaction.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
Chú ý: Thay thế YOUR_SERVER_IP bằng địa chỉ IP thực tế của máy Ubuntu (dùng lệnh ip addr).
Kết quả: File cấu hình sẵn sàng để khởi động Kafka, lắng nghe kết nối từ Flink.
Tạo thư mục log và khởi động Kafka
Tạo thư mục lưu trữ log Kafka và chạy service Kafka bằng chế độ background để chuẩn bị nhận dữ liệu.
sudo mkdir -p /var/kafka-logs
sudo chown -R $USER:$USER /var/kafka-logs
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties
Kết quả: Kafka bắt đầu chạy, bạn có thể thấy log khởi động nếu kiểm tra file /opt/kafka/logs/server.log.
Verify kết quả cài đặt Kafka
Chờ 10-15 giây sau khi khởi động, sau đó kiểm tra trạng thái broker và topic để đảm bảo Kafka hoạt động ổn định.
cd /opt/kafka
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | head -n 5
Kết quả: Xuất ra danh sách các API version mà broker hỗ trợ, xác nhận Kafka đang lắng nghe cổng 9092.
Tạo Topic Kafka cho Input và Output
Tạo Topic nguồn (Source)
Tạo topic "streaming-input" để nhận dữ liệu thô từ bên ngoài, sử dụng 3 partition để tăng thông lượng song song.
cd /opt/kafka
bin/kafka-topics.sh --create --topic streaming-input --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
Kết quả: In ra thông báo "Topic 'streaming-input' created" và xác nhận số lượng partition là 3.
Tạo Topic đích (Sink)
Tạo topic "streaming-output" để lưu trữ dữ liệu đã được Flink xử lý, cấu hình tương tự như input.
bin/kafka-topics.sh --create --topic streaming-output --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
Kết quả: In ra thông báo "Topic 'streaming-output' created".
Verify danh sách Topic
Liệt kê tất cả các topic hiện có trên cluster để xác nhận cả hai topic đã được tạo thành công.
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Kết quả: Danh sách hiển thị "streaming-input" và "streaming-output" cùng các topic hệ thống mặc định.
Cấu hình Flink Kafka Connector
Xác định phiên bản Connector tương thích
Để tránh xung đột library, cần chọn phiên bản flink-connector-kafka tương thích với phiên bản Kafka đã cài (3.8.0) và phiên bản Flink đang chạy (giả định là 1.18.x hoặc 1.19.x).
Với Kafka 3.8.0, connector cần sử dụng phiên bản 3.8.0 của flink-connector-kafka.
Thêm JAR Connector vào thư mục Flink
Tải file JAR của connector và đặt vào thư mục lib của Flink để cluster có thể load class connector khi submit job.
cd /opt/flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.8.0/flink-connector-kafka-3.8.0.jar
Kết quả: File flink-connector-kafka-3.8.0.jar nằm trong thư mục /opt/flink/lib.
Verify JAR đã được nhận diện
Khởi động lại Flink Cluster (nếu cần) và kiểm tra danh sách plugin trong Web UI hoặc console để đảm bảo connector đã sẵn sàng.
ls -lh /opt/flink/lib | grep kafka
Kết quả: Hiển thị file JAR vừa tải, kích thước khoảng 2MB.
Cấu hình Security và Authentication (SASL/PLAIN)
Chuẩn bị file JAAS Config
Tạo file cấu hình xác thực JAAS để Flink có thể kết nối với Kafka thông qua cơ chế SASL/PLAIN, thay vì kết nối không bảo mật.
Đường dẫn file: /opt/kafka/config/kafka_client_jaas.conf
kafka_client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="flink_user"
password="flink_secret_password";
};
Kết quả: File JAAS được tạo, chứa thông tin xác thực cho user Flink.
Cấu hình Kafka cho SASL
Sửa lại file server.properties của Kafka để bật listener SASL_PLAINTEXT và cấu hình mechanism xác thực.
Đường dẫn file: /opt/kafka/config/server.properties (thay thế phần listeners)
listeners=SASL_PLAINTEXT://0.0.0.0:9093
advertised.listeners=SASL_PLAINTEXT://YOUR_SERVER_IP:9093
listener.name.sasl_plaintext.sasl.enabled.mechanisms=PLAIN
listener.name.sasl_plaintext.plain.sasl.server=org.apache.kafka.common.security.plain.PlainLoginModule
listener.name.sasl_plaintext.plain.sasl.client.callback.handler.class=org.apache.kafka.common.security.authenticator.DefaultCallbackHandler
Chú ý: Cần tạo user "flink_user" trong file users.properties hoặc sử dụng mechanism đơn giản cho môi trường dev.
Kết quả: Kafka lắng nghe thêm cổng 9093 cho kết nối có xác thực.
Khởi động lại Kafka với cấu hình mới
Dừng và khởi động lại Kafka để áp dụng các thay đổi bảo mật mới.
cd /opt/kafka
bin/kafka-server-stop.sh
bin/kafka-server-start.sh -daemon config/server.properties
Kết quả: Kafka chạy lại với listener SASL_PLAINTEXT trên cổng 9093.
Verify kết nối có xác thực
Thử gửi một message vào topic với cấu hình JAAS để xác nhận xác thực hoạt động.
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --topic streaming-input --bootstrap-server localhost:9093 --producer.config config/producer.properties
Cần tạo file config/producer.properties trước với nội dung: sasl.mechanism=PLAIN, security.protocol=SASL_PLAINTEXT.
Kết quả: Console không bị lỗi authentication và message được gửi thành công.
Kiểm tra kết nối giữa Flink và Kafka Cluster
Chạy lệnh kiểm tra connectivity từ Flink JobManager
Sử dụng lệnh console của Flink để thử kết nối trực tiếp đến Kafka cluster và liệt kê topic, đảm bảo Flink có thể đọc cấu hình broker.
cd /opt/flink
bin/flink run -c org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerExample \
--bootstrap.servers localhost:9092 \
--topic streaming-input \
flink-connector-kafka-3.8.0.jar
Lưu ý: Đây là ví dụ về class example có sẵn trong JAR connector (nếu có) hoặc bạn có thể dùng một script Java đơn giản.
Verify bằng Console Consumer
Chạy console consumer của Kafka để xác nhận dữ liệu từ Flink (hoặc từ producer test) đang được nhận vào topic output.
cd /opt/kafka
bin/kafka-console-consumer.sh --topic streaming-output --bootstrap-server localhost:9092 --from-beginning
Kết quả: Console hiển thị các dòng dữ liệu được ghi vào topic streaming-output, xác nhận luồng dữ liệu Source -> Flink -> Sink đã thông suốt.
Verify trong Flink Web UI
Mở trình duyệt truy cập Flink Dashboard (thường là http://localhost:8081) và kiểm tra tab "Jobs".
Trạng thái của job streaming phải là "RUNNING" và không có lỗi (Exception) trong phần logs.
Kết quả: Dashboard hiển thị job đang chạy với thông số throughput (records/s) tăng dần, xác nhận kết nối thành công.
Điều hướng series:
Mục lục: Series: Triển khai Database Streaming với Apache Flink và Ubuntu 24.04
« Phần 3: Triển khai Apache Flink Standalone Cluster trên Ubuntu
Phần 5: Phát triển và submit ứng dụng Streaming đơn giản »