功能概述

Kafka 通过 Topic 进行生产消息和消费消息,生产者往 Topic 中写消息,消费者从 Topic 中读消息。

用户可以通过 Kafka Manager 创建 Topic,详情可参考使用 Kafka Manager 管理 Topic。本文介绍如何使用 Kafka 命令行工具创建 Topic。

操作步骤

  1. 为了方便用户通过本地访问 Kafka 集群,用户可以为客户端节点开启公网访问,或通过 VPN 的方式打通网络,以确保本地服务器可以访问 Kafka 集群网络。

    详情可分别参考 绑定公网 IPVPN

  2. 在本地服务器终端,执行以下命令行,创建 Topic。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-topics.sh --create --topic <topic_name> --bootstrap-server <Kafka 连接地址> --partitions <partition_num> --replication-factor <replication_num>
    
    # 已开启 SASL
    ./kafka-topics.sh --create --topic <topic_name> --bootstrap-server <Kafka 连接地址> --partitions <partition_num> --replication-factor <replication_num> --command-config /ssl/kafka.config

    命令行参数说明

    参数 说明

    topic_name

    Topic 名称。

    Kafka 连接地址

    可参考查看 Kafka 集群信息内容,获取连接地址。

    partition_num

    Partition 数。每个 Topic 被拆分成多个 Partition,每个 Partition 由一系列有序的消息组成。

    replication_num

    副本数量。

  3. 在本地服务器终端,执行以下命令行,查看刚创建的 Topic。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-topics.sh --list --bootstrap-server <Kafka 连接地址>
    
    # 已开启 SASL
    ./kafka-topics.sh --list --bootstrap-server <Kafka 连接地址> --command-config /ssl/kafka.config
  4. 在本地服务器终端,执行以下命令行,平衡 Topic 分区 Leader。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-preferred-replica-election.sh --bootstrap-server <Kafka 连接地址>
    
    # 已开启 SASL
    ./kafka-preferred-replica-election.sh --bootstrap-server <Kafka 连接地址> --command-config /ssl/kafka.config
  5. 在本地服务器终端,执行以下命令行,更改 Topic 配置参数。用户也可以在创建 Topic 的时候指定,格式为 --config a=b --config x=y

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-configs.sh --bootstrap-server <Kafka 连接地址> --entity-type topics --entity-name <topic_name> --alter --add-config <para_name>=<para_value>
    
    # 已开启 SASL
    ./kafka-configs.sh --bootstrap-server <Kafka 连接地址> --entity-type topics --entity-name <topic_name> --alter --add-config <para_name>=<para_value> --command-config /ssl/kafka.config
  6. 在本地服务器终端,执行以下命令行,修改 Topic 分区。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --alter --topic <topic_name> --partitions <partition_num>
    
    # 已开启 SASL
    ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --alter --topic <topic_name> --partitions <partition_num> --command-config /ssl/kafka.config
  7. 在本地服务器终端,执行以下命令行,删除 Topic。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --delete --topic <topic_name>
    
    # 已开启 SASL
    ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --delete --topic <topic_name> --command-config /ssl/kafka.config