步骤二:创建 Topic
更新时间:2025-08-15 03:41:38
功能概述
Kafka 通过 Topic 进行生产消息和消费消息,生产者往 Topic 中写消息,消费者从 Topic 中读消息。
用户可以通过 Kafka Manager 创建 Topic,详情可参考使用 Kafka Manager 管理 Topic。本文介绍如何使用 Kafka 命令行工具创建 Topic。
操作步骤
-
为了方便用户通过本地访问 Kafka 集群,用户可以为客户端节点开启公网访问,或通过 VPN 的方式打通网络,以确保本地服务器可以访问 Kafka 集群网络。
-
在本地服务器终端,执行以下命令行,创建 Topic。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-topics.sh --create --topic <topic_name> --bootstrap-server <Kafka 连接地址> --partitions <partition_num> --replication-factor <replication_num> # 已开启 SASL ./kafka-topics.sh --create --topic <topic_name> --bootstrap-server <Kafka 连接地址> --partitions <partition_num> --replication-factor <replication_num> --command-config /ssl/kafka.config命令行参数说明
参数 说明 topic_name
Topic 名称。
Kafka 连接地址
可参考查看 Kafka 集群信息内容,获取连接地址。
partition_num
Partition 数。每个 Topic 被拆分成多个 Partition,每个 Partition 由一系列有序的消息组成。
replication_num
副本数量。
-
在本地服务器终端,执行以下命令行,查看刚创建的 Topic。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-topics.sh --list --bootstrap-server <Kafka 连接地址> # 已开启 SASL ./kafka-topics.sh --list --bootstrap-server <Kafka 连接地址> --command-config /ssl/kafka.config -
在本地服务器终端,执行以下命令行,平衡 Topic 分区 Leader。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-preferred-replica-election.sh --bootstrap-server <Kafka 连接地址> # 已开启 SASL ./kafka-preferred-replica-election.sh --bootstrap-server <Kafka 连接地址> --command-config /ssl/kafka.config -
在本地服务器终端,执行以下命令行,更改 Topic 配置参数。用户也可以在创建 Topic 的时候指定,格式为
--config a=b --config x=y。cd /opt/kafka/current/bin # 未开启 SASL ./kafka-configs.sh --bootstrap-server <Kafka 连接地址> --entity-type topics --entity-name <topic_name> --alter --add-config <para_name>=<para_value> # 已开启 SASL ./kafka-configs.sh --bootstrap-server <Kafka 连接地址> --entity-type topics --entity-name <topic_name> --alter --add-config <para_name>=<para_value> --command-config /ssl/kafka.config -
在本地服务器终端,执行以下命令行,修改 Topic 分区。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --alter --topic <topic_name> --partitions <partition_num> # 已开启 SASL ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --alter --topic <topic_name> --partitions <partition_num> --command-config /ssl/kafka.config -
在本地服务器终端,执行以下命令行,删除 Topic。
cd /opt/kafka/current/bin # 未开启 SASL ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --delete --topic <topic_name> # 已开启 SASL ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --delete --topic <topic_name> --command-config /ssl/kafka.config