Skip to content

一、kafka环境搭建

1、安装kafka

kafka几乎支持所有主流操作系统,包括docker镜像。

2、启动kafka

KRaft是Kafka自3.x版本引入的新模式,它移除了对ZooKeeper的依赖,使架构更简洁。需要jdk11+。

  • docker方式启动(推荐):
    1、单节点部署(最简单):单节点采用 Combined模式,即一个节点同时承担Controller和Broker两种角色,适合快速搭建测试环境。
    1)生成集群ID:首先,需要为集群生成一个唯一ID,所有节点将使用此ID。
shell
# 使用任意Kafka镜像的工具生成UUID
docker run --rm apache/kafka:latest /opt/kafka/bin/kafka-storage.sh random-uuid
# 输出示例:4hU1sxPUQG-4qqqgZQxFZA

2)编写docker-compose.yml:将 <你的集群ID> 替换为上一步生成的ID。

yml
version: '3.8'
services:
  kafka-kraft-single:
    image: apache/kafka:latest  # 使用官方最新镜像
    container_name: kafka-kraft-single
    hostname: kafka-single
    ports:
      - "9092:9092"  # Broker监听端口,供客户端连接
      - "9093:9093"  # Controller内部通信端口(可选)
    environment:
      # 🆔 核心身份与角色配置
      KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"  # 必填,替换为你的集群ID,如:4hU1sxPUQG-4qqqgZQxFZA
      KAFKA_PROCESS_ROLES: "broker,controller"  # 节点角色:兼具Broker和Controller
      KAFKA_NODE_ID: 1  # 节点唯一标识

      # 🌐 网络监听配置
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"  # Controller监听器名称
      KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"  # 监听地址和协议
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"  # 对外宣告的地址(客户端连接用)
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"  # 监听器安全协议映射

      # 🗳️ Controller仲裁配置(单节点就是自己)
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-single:9093"  # 格式:node_id@host:port

      # 💾 存储与日志配置
      KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"  # 数据日志目录

      # ⚙️ 其他常用配置(可选)
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"  # 建议关闭自动创建主题
      KAFKA_NUM_PARTITIONS: 1  # 默认分区数
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  # __consumer_offsets主题副本因子
    volumes:
      - kafka-single-data:/tmp/kraft-combined-logs  # 挂载数据卷以实现持久化
volumes:
  kafka-single-data:
yml
version: '3.8'
services:
  kafka-kraft:
    image: apache/kafka:latest
    container_name: kafka-kraft-single
    ports:
      - "9092:9092"  # 客户端连接端口
      - "9093:9093"  # 控制器内部通信端口(可选暴露)
    environment:
      # 必须的KRaft环境变量
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-kraft:9093"
      
      # 监听器配置
      KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      
      # 基础配置
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
      
      # 单节点特有的副本设置
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 3
    volumes:
      - kafka-data:/tmp/kraft-combined-logs  # 持久化数据
    healthcheck:
      test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "localhost:9092", "--list"]
      interval: 10s
      timeout: 5s
      retries: 5

3)启动命令

shell
# 1. 启动容器
docker-compose up -d
# 2. 查看日志,确认启动成功
docker logs -f kafka-kraft-single
# 3. 进入容器执行测试
docker exec -it kafka-kraft-single /bin/sh
# 4. 在容器内创建主题和生产消费消息
# 切换到Kafka命令目录
cd /opt/kafka/bin
#新建主题,--partitions 1 --replication-factor 1可选
./kafka-topics.sh --create --topic docker-test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
#查看主题
./kafka-topics.sh --bootstrap-server localhost:9092 --list
#生产消息
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic docker-test
#消费 需要通过--partition 0 指定分区,否则消费不到消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic docker-test --from-beginning --partition 0

2、多节点集群(生产环境,三个节点为例): 生产环境建议采用角色分离架构,即Controller节点和Broker节点分开部署,以提高稳定性和可扩展性。最佳实践是部署至少3个Controller节点和2个以上Broker节点。

yml
version: '3.8'
services:
  # *************** Controller 节点集群 (3节点,管理元数据) ***************
  controller-1:
    image: apache/kafka:latest
    container_name: controller-1
    hostname: controller-1  # 重要:用于节点间通信
    environment:
      KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
      KAFKA_PROCESS_ROLES: "controller"  # 角色仅为Controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENERS: "CONTROLLER://:9093"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"  # 3节点仲裁列表
      KAFKA_LOG_DIRS: "/tmp/kraft-controller-logs"
    volumes:
      - controller-1-data:/tmp/kraft-controller-logs
    networks:
      kafka-net:
        ipv4_address: 172.28.1.11  # 固定IP,便于稳定组成仲裁列表

  controller-2:
    image: apache/kafka:latest
    container_name: controller-2
    hostname: controller-2
    environment:
      KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
      KAFKA_PROCESS_ROLES: "controller"
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENERS: "CONTROLLER://:9093"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
      KAFKA_LOG_DIRS: "/tmp/kraft-controller-logs"
    volumes:
      - controller-2-data:/tmp/kraft-controller-logs
    networks:
      kafka-net:
        ipv4_address: 172.28.1.12

  controller-3:
    image: apache/kafka:latest
    container_name: controller-3
    hostname: controller-3
    environment:
      KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
      KAFKA_PROCESS_ROLES: "controller"
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENERS: "CONTROLLER://:9093"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
      KAFKA_LOG_DIRS: "/tmp/kraft-controller-logs"
    volumes:
      - controller-3-data:/tmp/kraft-controller-logs
    networks:
      kafka-net:
        ipv4_address: 172.28.1.13

  # *************** Broker 节点集群 (3节点,存储和处理数据) ***************
  broker-1:
    image: apache/kafka:latest
    container_name: broker-1
    hostname: broker-1
    ports:
      - "19092:9092"  # 将宿主机不同端口映射到各Broker
    environment:
      KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
      KAFKA_PROCESS_ROLES: "broker"  # 角色仅为Broker[citation:5]
      KAFKA_NODE_ID: 4
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENERS: "PLAINTEXT://:9092"
      # 广告监听器地址:客户端实际连接的地址。生产环境应设为宿主机IP或域名。
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-1:9092"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"  # 指向Controller集群
      KAFKA_LOG_DIRS: "/tmp/kraft-broker-logs"
      # 🛡️ 生产环境重要Broker配置
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3  # 默认副本因子,与Broker数匹配[citation:7]
      KAFKA_NUM_PARTITIONS: 3  # 创建主题时的默认分区数
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3  # 内部主题高可用
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2  # 最小同步副本数,保障数据可靠性[citation:4]
    volumes:
      - broker-1-data:/tmp/kraft-broker-logs
    depends_on:
      - controller-1
      - controller-2
      - controller-3
    networks:
      kafka-net:
        ipv4_address: 172.28.1.21

  broker-2:
    image: apache/kafka:latest
    container_name: broker-2
    hostname: broker-2
    ports:
      - "19093:9092"
    environment:
      KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
      KAFKA_PROCESS_ROLES: "broker"
      KAFKA_NODE_ID: 5
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENERS: "PLAINTEXT://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-2:9092"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
      KAFKA_LOG_DIRS: "/tmp/kraft-broker-logs"
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
    volumes:
      - broker-2-data:/tmp/kraft-broker-logs
    depends_on:
      - controller-1
      - controller-2
      - controller-3
    networks:
      kafka-net:
        ipv4_address: 172.28.1.22

  broker-3:
    image: apache/kafka:latest
    container_name: broker-3
    hostname: broker-3
    ports:
      - "19094:9092"
    environment:
      KAFKA_KRAFT_CLUSTER_ID: "<你的集群ID>"
      KAFKA_PROCESS_ROLES: "broker"
      KAFKA_NODE_ID: 6
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LISTENERS: "PLAINTEXT://:9092"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker-3:9092"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@controller-1:9093,2@controller-2:9093,3@controller-3:9093"
      KAFKA_LOG_DIRS: "/tmp/kraft-broker-logs"
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
    volumes:
      - broker-3-data:/tmp/kraft-broker-logs
    depends_on:
      - controller-1
      - controller-2
      - controller-3
    networks:
      kafka-net:
        ipv4_address: 172.28.1.23

networks:
  kafka-net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.28.1.0/24

volumes:
  controller-1-data:
  controller-2-data:
  controller-3-data:
  broker-1-data:
  broker-2-data:
  broker-3-data:
yml
version: '3.8'
services:
  kafka1:
    image: apache/kafka:latest
    container_name: kafka-node-1
    hostname: kafka1
    ports:
      - "19092:9092"  # 节点1客户端端口
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
      KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LOG_DIRS: "/var/lib/kafka/data"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3
    volumes:
      - kafka-data-1:/var/lib/kafka/data
    networks:
      - kafka-network

  kafka2:
    image: apache/kafka:latest
    container_name: kafka-node-2
    hostname: kafka2
    ports:
      - "29092:9092"  # 节点2客户端端口
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
      KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka2:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LOG_DIRS: "/var/lib/kafka/data"
    volumes:
      - kafka-data-2:/var/lib/kafka/data
    networks:
      - kafka-network
    depends_on:
      - kafka1

  kafka3:
    image: apache/kafka:latest
    container_name: kafka-node-3
    hostname: kafka3
    ports:
      - "39092:9092"  # 节点3客户端端口
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
      KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka3:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_LOG_DIRS: "/var/lib/kafka/data"
    volumes:
      - kafka-data-3:/var/lib/kafka/data
    networks:
      - kafka-network
    depends_on:
      - kafka1
      - kafka2

networks:
  kafka-network:
    driver: bridge

volumes:
  kafka-data-1:
  kafka-data-2:
  kafka-data-3:

启动命令:

shell
# 1. 启动集群(在docker-compose文件所在目录)
docker-compose -f docker-compose.cluster.yml up -d
# 2. 检查集群状态(任意节点执行)
docker exec kafka-node-1 kafka-cluster.sh cluster-id --bootstrap-server kafka1:9092
docker exec kafka-node-1 kafka-topics.sh --bootstrap-server kafka1:9092 --describe --topic __cluster_metadata
# 3. 创建高可用主题
docker exec kafka-node-1 kafka-topics.sh \
  --create \
  --topic production-topic \
  --bootstrap-server kafka1:9092 \
  --partitions 6 \
  --replication-factor 3

# 4. 查看集群节点
docker exec kafka-node-1 kafka-broker-api-versions.sh --bootstrap-server kafka1:9092
  • 2.常规操作系统启动:
    1、生成一个集群唯一id(cluster uuid),参考命令如下:
shell
KAFLA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
#或直接生成
bin/kafka-storage.sh random-uuid
#例:
# 1. 生成集群唯一ID(仅第一次需要)
./bin/kafka-storage.sh random-uuid
# 输出示例:xp9ST7pYRr-4VGB6XJKtYA

2、格式化存储,仅首次启动时,再每个节点上执行格式化,使用上一步生成的id,参考命令如下:

shell
bin/kafka-storage.sh formate -t $KAFLA_CLUSTER_ID -c config/kraft/server.properties
#或直接使用上面生成的uuid
bin/kafka-storage.sh formate -t 生成的uuid -c config/kraft/server.properties
#例
# 2. 用上一步的ID格式化存储目录(⚠️ 会清空数据)
./bin/kafka-storage.sh format -t xp9ST7pYRr-4VGB6XJKtYA -c ./config/kraft/server.properties
# 看到“Formatting /tmp/kraft-combined-logs”表示成功

3、启动kafka服务,参考命令如下:

shell
bin/kafka-server-start.sh  config/kraft/server.properties &
#例
# 3. 启动Kafka服务(前台运行,方便查看日志)
./bin/kafka-server-start.sh ./config/kraft/server.properties

3、发送/消费kafka消息

使用kafka命令行工具进行简单的发送/消费消息。

1.创建一个主题

参考命令如下:

shell
#创建主题
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3
# 查看所有主题
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看主题详情
./bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
# 停止Kafka服务(如果前台运行,在原终端按Ctrl+C)
./bin/kafka-server-stop.sh

2.启动一个生产者客户端发送消息

参考命令如下:

shell
bin/kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic my-topic
>消息内容
>消息内容
>CTR+C结束退出

每行输入结束后,按回车键,表示发送多条独立的消息。

3.新启动一个命令行窗口,启动一个消费者客户端消费消息

参考命令如下:

shell
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning 
收到消息1
收到消息2
>CTR+C结束退出

二、springboot3集成kafka

1.添加依赖

xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.配置连接

yml
 spring:
  kafka:
    # 单节点kafka服务
    # bootstrap-servers: localhost:9092 
    # 多节点kafka服务
    bootstrap-servers: localhost:19092,localhost:29092,localhost:39092 # 你的Kafka集群地址
    # 或者使用容器网络内部地址(如果Spring Boot也在Docker中)
    # bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all # 生产环境确保高可靠性
      retries: 3
    consumer:
      group-id: my-springboot-app-group # 消费者组ID
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false # 建议改为手动提交以精确控制

3.创建主题

除了通过kafka命令行创建主题,还可以通过代码方式创建,例:

java
@Configuration
public class KafkaConfig{
    public static final String TOPIC="my-topic";
    @BEAN
    public NewTopic testTopic(){
        //分区为4,副本为1的主题
        return new NewTopic(TOPIC,4,(short)1);
    }
}

一个消费者可以消费多个分区的消息,但一个分区只能被一个消费者组中的一个消费者消费。如果一个topic只有一个分区,那么就只能有一个消费者进行消费。

4.发送/消费消息

生产者发送消息:通过kafkaTemplate发送。

java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageProducerService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

消费者消费消息:使用@KafkaListener注解接收消息。

java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumerService {

    @KafkaListener(topics = "your-topic-name", groupId = "my-springboot-app-group")
    public void consume(String message) {
        System.out.println("Received message: " + message);
        // 在此处处理你的业务逻辑
    }
}

三、生产环境最佳实践

  • Kafka集群配置:
  1. 高可用:部署至少3个节点,关键内部主题(如__consumer_offsets)副本因子设为3,min.insync.replicas设为2
  2. 数据安全:启用SSL/TLS加密通信和SASL认证
  3. 资源与监控:设置JVM堆内存(如-Xmx4G -Xms4G),使用Prometheus+Grafana监控集群指标
  • Spring Boot应用优化:
  1. 生产者:根据业务需求调整batch.size和linger.ms以平衡吞吐量和延迟。启用压缩(如compression-type: lz4)以节省带宽。
  2. 消费者:对于高吞吐场景,在@KafkaListener中设置concurrency参数以启动多个消费者线程。务必处理消费逻辑中的异常,并考虑实现重试机制(如Spring Retry)。
  3. 连接与容错:配置适当的连接重试和超时参数,确保应用能从网络抖动中恢复。
  • 部署与运维
  1. 客户端版本:确保spring-kafka客户端版本与Kafka服务端版本兼容
  2. 配置管理:避免在代码中硬编码连接信息,使用配置中心或环境变量管理敏感配置
  3. 优雅启停:在应用关闭时,确保消费者能完成正在处理的消息并提交偏移量。

Released under the MIT License.