一、kafka环境搭建
1、安装kafka
kafka几乎支持所有主流操作系统,包括docker镜像。
- 1.docker镜像:使用docker-compose.yml部署。
- 2.常规操作系统:在官网下载kafka,下载后解压即可。下载地址:https://kafka.apache.org/downloads
2、启动kafka
KRaft是Kafka自3.x版本引入的新模式,它移除了对ZooKeeper的依赖,使架构更简洁。需要jdk11+。
- docker方式启动(推荐):
1、单节点部署(最简单):单节点采用 Combined模式,即一个节点同时承担Controller和Broker两种角色,适合快速搭建测试环境。
1)生成集群ID:首先,需要为集群生成一个唯一ID,所有节点将使用此ID。
shell
# 使用任意Kafka镜像的工具生成UUID
docker run --rm apache/kafka:latest /opt/kafka/bin/kafka-storage.sh random-uuid
# 输出示例:4hU1sxPUQG-4qqqgZQxFZA2)编写docker-compose.yml:将 <你的集群ID> 替换为上一步生成的ID。
yml
version: '3.8'
services:
kafka-kraft-single:
image: apache/kafka:latest # 使用官方最新镜像
container_name: kafka-kraft-single
hostname: kafka-single
ports:
- "9092:9092" # Broker监听端口,供客户端连接
- "9093:9093" # Controller内部通信端口(可选)
environment:
# 🆔 核心身份与角色配置
KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>" # 必填,替换为你的集群ID,如:4hU1sxPUQG-4qqqgZQxFZA
KAFKA_PROCESS_ROLES: "broker,controller" # 节点角色:兼具Broker和Controller
KAFKA_NODE_ID: 1 # 节点唯一标识
# 🌐 网络监听配置
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" # Controller监听器名称
KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" # 监听地址和协议
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092" # 对外宣告的地址(客户端连接用)
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" # 监听器安全协议映射
# 🗳️ Controller仲裁配置(单节点就是自己)
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-single:9093" # 格式:node_id@host:port
# 💾 存储与日志配置
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" # 数据日志目录
# ⚙️ 其他常用配置(可选)
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" # 建议关闭自动创建主题
KAFKA_NUM_PARTITIONS: 1 # 默认分区数
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # __consumer_offsets主题副本因子
volumes:
- kafka-single-data:/tmp/kraft-combined-logs # 挂载数据卷以实现持久化
volumes:
kafka-single-data:yml
version: '3.8'
services:
kafka-kraft:
image: apache/kafka:latest
container_name: kafka-kraft-single
ports:
- "9092:9092" # 客户端连接端口
- "9093:9093" # 控制器内部通信端口(可选暴露)
environment:
# 必须的KRaft环境变量
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-kraft:9093"
# 监听器配置
KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
# 基础配置
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
# 单节点特有的副本设置
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 3
volumes:
- kafka-data:/tmp/kraft-combined-logs # 持久化数据
healthcheck:
test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "localhost:9092", "--list"]
interval: 10s
timeout: 5s
retries: 53)启动命令
shell
# 1. 启动容器
docker-compose up -d
# 2. 查看日志,确认启动成功
docker logs -f kafka-kraft-single
# 3. 进入容器执行测试
docker exec -it kafka-kraft-single /bin/sh
# 4. 在容器内创建主题和生产消费消息
# 切换到Kafka命令目录
cd /opt/kafka/bin
#新建主题,--partitions 1 --replication-factor 1可选
./kafka-topics.sh --create --topic docker-test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
#查看主题
./kafka-topics.sh --bootstrap-server localhost:9092 --list
#生产消息
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic docker-test
#消费 需要通过--partition 0 指定分区,否则消费不到消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic docker-test --from-beginning --partition 02、多节点集群(生产环境,三个节点为例): 生产环境建议采用角色分离架构,即Controller节点和Broker节点分开部署,以提高稳定性和可扩展性。最佳实践是部署至少3个Controller节点和2个以上Broker节点。
yml
version: '3.8'
services:
# *************** Controller 节点集群 (3节点,管理元数据) ***************
controller-1:
image: apache/kafka:latest
container_name: controller-1
hostname: controller-1 # 重要:用于节点间通信
environment:
KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
KAFKA_PROCESS_ROLES: "controller" # 角色仅为Controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENERS: "CONTROLLER://:9093"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093" # 3节点仲裁列表
KAFKA_LOG_DIRS: "/tmp/kraft-controller-logs"
volumes:
- controller-1-data:/tmp/kraft-controller-logs
networks:
kafka-net:
ipv4_address: 172.28.1.11 # 固定IP,便于稳定组成仲裁列表
controller-2:
image: apache/kafka:latest
container_name: controller-2
hostname: controller-2
environment:
KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
KAFKA_PROCESS_ROLES: "controller"
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENERS: "CONTROLLER://:9093"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
KAFKA_LOG_DIRS: "/tmp/kraft-controller-logs"
volumes:
- controller-2-data:/tmp/kraft-controller-logs
networks:
kafka-net:
ipv4_address: 172.28.1.12
controller-3:
image: apache/kafka:latest
container_name: controller-3
hostname: controller-3
environment:
KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
KAFKA_PROCESS_ROLES: "controller"
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENERS: "CONTROLLER://:9093"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
KAFKA_LOG_DIRS: "/tmp/kraft-controller-logs"
volumes:
- controller-3-data:/tmp/kraft-controller-logs
networks:
kafka-net:
ipv4_address: 172.28.1.13
# *************** Broker 节点集群 (3节点,存储和处理数据) ***************
broker-1:
image: apache/kafka:latest
container_name: broker-1
hostname: broker-1
ports:
- "19092:9092" # 将宿主机不同端口映射到各Broker
environment:
KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
KAFKA_PROCESS_ROLES: "broker" # 角色仅为Broker[citation:5]
KAFKA_NODE_ID: 4
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENERS: "PLAINTEXT://:9092"
# 广告监听器地址:客户端实际连接的地址。生产环境应设为宿主机IP或域名。
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-1:9092"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093" # 指向Controller集群
KAFKA_LOG_DIRS: "/tmp/kraft-broker-logs"
# 🛡️ 生产环境重要Broker配置
KAFKA_DEFAULT_REPLICATION_FACTOR: 3 # 默认副本因子,与Broker数匹配[citation:7]
KAFKA_NUM_PARTITIONS: 3 # 创建主题时的默认分区数
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 # 内部主题高可用
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2 # 最小同步副本数,保障数据可靠性[citation:4]
volumes:
- broker-1-data:/tmp/kraft-broker-logs
depends_on:
- controller-1
- controller-2
- controller-3
networks:
kafka-net:
ipv4_address: 172.28.1.21
broker-2:
image: apache/kafka:latest
container_name: broker-2
hostname: broker-2
ports:
- "19093:9092"
environment:
KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
KAFKA_PROCESS_ROLES: "broker"
KAFKA_NODE_ID: 5
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENERS: "PLAINTEXT://:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-2:9092"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
KAFKA_LOG_DIRS: "/tmp/kraft-broker-logs"
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
volumes:
- broker-2-data:/tmp/kraft-broker-logs
depends_on:
- controller-1
- controller-2
- controller-3
networks:
kafka-net:
ipv4_address: 172.28.1.22
broker-3:
image: apache/kafka:latest
container_name: broker-3
hostname: broker-3
ports:
- "19094:9092"
environment:
KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
KAFKA_PROCESS_ROLES: "broker"
KAFKA_NODE_ID: 6
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENERS: "PLAINTEXT://:9092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-3:9092"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
KAFKA_LOG_DIRS: "/tmp/kraft-broker-logs"
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
volumes:
- broker-3-data:/tmp/kraft-broker-logs
depends_on:
- controller-1
- controller-2
- controller-3
networks:
kafka-net:
ipv4_address: 172.28.1.23
networks:
kafka-net:
driver: bridge
ipam:
config:
- subnet: 172.28.1.0/24
volumes:
controller-1-data:
controller-2-data:
controller-3-data:
broker-1-data:
broker-2-data:
broker-3-data:yml
version: '3.8'
services:
kafka1:
image: apache/kafka:latest
container_name: kafka-node-1
hostname: kafka1
ports:
- "19092:9092" # 节点1客户端端口
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
volumes:
- kafka-data-1:/var/lib/kafka/data
networks:
- kafka-network
kafka2:
image: apache/kafka:latest
container_name: kafka-node-2
hostname: kafka2
ports:
- "29092:9092" # 节点2客户端端口
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka2:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
volumes:
- kafka-data-2:/var/lib/kafka/data
networks:
- kafka-network
depends_on:
- kafka1
kafka3:
image: apache/kafka:latest
container_name: kafka-node-3
hostname: kafka3
ports:
- "39092:9092" # 节点3客户端端口
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka3:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LOG_DIRS: "/var/lib/kafka/data"
volumes:
- kafka-data-3:/var/lib/kafka/data
networks:
- kafka-network
depends_on:
- kafka1
- kafka2
networks:
kafka-network:
driver: bridge
volumes:
kafka-data-1:
kafka-data-2:
kafka-data-3:启动命令:
shell
# 1. 启动集群(在docker-compose文件所在目录)
docker-compose -f docker-compose.cluster.yml up -d
# 2. 检查集群状态(任意节点执行)
docker exec kafka-node-1 kafka-cluster.sh cluster-id --bootstrap-server kafka1:9092
docker exec kafka-node-1 kafka-topics.sh --bootstrap-server kafka1:9092 --describe --topic __cluster_metadata
# 3. 创建高可用主题
docker exec kafka-node-1 kafka-topics.sh \
--create \
--topic production-topic \
--bootstrap-server kafka1:9092 \
--partitions 6 \
--replication-factor 3
# 4. 查看集群节点
docker exec kafka-node-1 kafka-broker-api-versions.sh --bootstrap-server kafka1:9092- 2.常规操作系统启动:
1、生成一个集群唯一id(cluster uuid),参考命令如下:
shell
KAFLA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
#或直接生成
bin/kafka-storage.sh random-uuid
#例:
# 1. 生成集群唯一ID(仅第一次需要)
./bin/kafka-storage.sh random-uuid
# 输出示例:xp9ST7pYRr-4VGB6XJKtYA2、格式化存储,仅首次启动时,再每个节点上执行格式化,使用上一步生成的id,参考命令如下:
shell
bin/kafka-storage.sh formate -t $KAFLA_CLUSTER_ID -c config/kraft/server.properties
#或直接使用上面生成的uuid
bin/kafka-storage.sh formate -t 生成的uuid -c config/kraft/server.properties
#例
# 2. 用上一步的ID格式化存储目录(⚠️ 会清空数据)
./bin/kafka-storage.sh format -t xp9ST7pYRr-4VGB6XJKtYA -c ./config/kraft/server.properties
# 看到“Formatting /tmp/kraft-combined-logs”表示成功3、启动kafka服务,参考命令如下:
shell
bin/kafka-server-start.sh config/kraft/server.properties &
#例
# 3. 启动Kafka服务(前台运行,方便查看日志)
./bin/kafka-server-start.sh ./config/kraft/server.properties3、发送/消费kafka消息
使用kafka命令行工具进行简单的发送/消费消息。
1.创建一个主题
参考命令如下:
shell
#创建主题
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3
# 查看所有主题
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看主题详情
./bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
# 停止Kafka服务(如果前台运行,在原终端按Ctrl+C)
./bin/kafka-server-stop.sh2.启动一个生产者客户端发送消息
参考命令如下:
shell
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
>消息内容
>消息内容
>CTR+C结束退出每行输入结束后,按回车键,表示发送多条独立的消息。
3.新启动一个命令行窗口,启动一个消费者客户端消费消息
参考命令如下:
shell
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
收到消息1
收到消息2
>CTR+C结束退出二、springboot3集成kafka
1.添加依赖
xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>2.配置连接
yml
spring:
kafka:
# 单节点kafka服务
# bootstrap-servers: localhost:9092
# 多节点kafka服务
bootstrap-servers: localhost:19092,localhost:29092,localhost:39092 # 你的Kafka集群地址
# 或者使用容器网络内部地址(如果Spring Boot也在Docker中)
# bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all # 生产环境确保高可靠性
retries: 3
consumer:
group-id: my-springboot-app-group # 消费者组ID
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false # 建议改为手动提交以精确控制3.创建主题
除了通过kafka命令行创建主题,还可以通过代码方式创建,例:
java
@Configuration
public class KafkaConfig{
public static final String TOPIC="my-topic";
@BEAN
public NewTopic testTopic(){
//分区为4,副本为1的主题
return new NewTopic(TOPIC,4,(short)1);
}
}一个消费者可以消费多个分区的消息,但一个分区只能被一个消费者组中的一个消费者消费。如果一个topic只有一个分区,那么就只能有一个消费者进行消费。
4.发送/消费消息
生产者发送消息:通过kafkaTemplate发送。
java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessageProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}消费者消费消息:使用@KafkaListener注解接收消息。
java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class MessageConsumerService {
@KafkaListener(topics = "your-topic-name", groupId = "my-springboot-app-group")
public void consume(String message) {
System.out.println("Received message: " + message);
// 在此处处理你的业务逻辑
}
}三、生产环境最佳实践
- Kafka集群配置:
- 高可用:部署至少3个节点,关键内部主题(如__consumer_offsets)副本因子设为3,min.insync.replicas设为2
- 数据安全:启用SSL/TLS加密通信和SASL认证
- 资源与监控:设置JVM堆内存(如-Xmx4G -Xms4G),使用Prometheus+Grafana监控集群指标
- Spring Boot应用优化:
- 生产者:根据业务需求调整batch.size和linger.ms以平衡吞吐量和延迟。启用压缩(如compression-type: lz4)以节省带宽。
- 消费者:对于高吞吐场景,在@KafkaListener中设置concurrency参数以启动多个消费者线程。务必处理消费逻辑中的异常,并考虑实现重试机制(如Spring Retry)。
- 连接与容错:配置适当的连接重试和超时参数,确保应用能从网络抖动中恢复。
- 部署与运维
- 客户端版本:确保spring-kafka客户端版本与Kafka服务端版本兼容
- 配置管理:避免在代码中硬编码连接信息,使用配置中心或环境变量管理敏感配置
- 优雅启停:在应用关闭时,确保消费者能完成正在处理的消息并提交偏移量。
