Kafka命令
鸡汤: 每一个你讨厌的现在,都有一个不够努力的曾经!
1. 基础命令¶
1. 创建topic kafka-topics.sh \ --zookeeper master:2181 \ --create \ --replication-factor 3 \ --partitions 1 \ --topic first 2. 查看topic kafka-topics.sh \ --zookeeper master:2181 \ --list 2. 查看详细topic kafka-topics.sh \ --zookeeper master:2181 \ --describe --topic first 3. 生产者 kafka-console-producer.sh \ --broker-list master:9092 \ --topic first 4. 消费者 kafka-console-consumer.sh \ --bootstrap-server master:9092 \ --topic first \ --from-beginning 5. 删除topic kafka-topics.sh \ --zookeeper master:2181 \ --delete \ --topic first
1.1 创建topic¶
kafka-topics.sh \ --zookeeper master:2181 \ --create \ --replication-factor 3 \ --partitions 1 \ --topic first
选项解释
--topic 定义topic名 --replication-factor 定义副本数 --partitions 定义分区数
详细操作
[root@master ~]# kafka-topics.sh \ > --zookeeper master:2181 \ > --create \ > --replication-factor 3 \ > --partitions 1 \ > --topic first Created topic "first".
1.2 查看所有topic¶
kafka-topics.sh --zookeeper master:2181 --list
详细操作
[root@master ~]# kafka-topics.sh --zookeeper master:2181 --list first # 注意: 副本数一定要小于机器集群个数否则会报错 [root@master ~]# kafka-topics.sh --zookeeper master:2181 --create --replication-factor 4 --partitions 1 --topic second Error while executing topic command : Replication factor: 4 larger than available brokers: 3. [2019-08-22 19:25:40,799] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3. (kafka.admin.TopicCommand$)
1.3 发送消息¶
kafka-console-producer.sh \ --broker-list master:9092 \ --topic first
kafka的默认端口是9092
详细操作
[root@master ~]# kafka-console-producer.sh \ > --broker-list master:9092 \ > --topic first >hello caimengzhi
1.4 消费消息¶
#新版本 kafka-console-consumer.sh \ --bootstrap-server master:9092 \ --topic first \ --from-beginning # 老版本 kafka-console-consumer.sh \ --zookeeper master:2181 \ --from-beginning \ --topic first
--from-beginning
会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
详细操作
[root@slave2 logs]# kafka-console-consumer.sh \ > --bootstrap-server master:9092 \ > --topic first \ > --from-beginning hello caimengzhi
低版本报错
[root@slave2 logs]# kafka-console-consumer.sh --zookeeper master:2181 --topic first zookeeper is not a recognized option Option Description ------ ----------- --bootstrap-server <String: server to REQUIRED: The server(s) to connect to. connect to> --consumer-property <String: A mechanism to pass user-defined consumer_prop> properties in the form key=value to the consumer. --consumer.config <String: config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config. --enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) --formatter <String: class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter) --from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. --group <String: consumer group id> The consumer group id of the consumer. --isolation-level <String> Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommittedto read all messages. (default: read_uncommitted) --key-deserializer <String: deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --offset <String: consume offset> The offset id to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest) --partition <Integer: partition> The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified. --property <String: prop> The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value. deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key. deserializer.' and 'value. deserializer.' prefixes to configure their deserializers. --skip-message-on-error If there is an error when processing a message, skip it instead of halt. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval. --topic <String: topic> The topic id to consume on. --value-deserializer <String: deserializer for values> --whitelist <String: whitelist> Whitelist of topics to include for consumption. 查阅资料后发现是kafka的版本问题,低版本的kafka可以使用以上的命令,但是在高版本的kafka中需要使用如下命令才行: kafka-console-consumer.sh \ --bootstrap-server master:9092 \ --topic first \ --from-beginning
1.5 查看某个Topic的详情¶
kafka-topics.sh \ --zookeeper master:2181 \ --describe --topic first
详细操作
[root@master ~]# kafka-topics.sh \ > --zookeeper master:2181 \ > --describe --topic first Topic:first PartitionCount:1 ReplicationFactor:3 Configs: Topic: first Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 2,0,1 | | | | |________ 选举用的,谁复制的数据越多,就靠前,主挂了后优先选择它 | | | |________________________ 三个副本,所在机器位置 broker.id | | |____________________________________ leader所在机器 broker.id | |____________________________________________________ 分区数所在位置broker.id |_________________________________________________________________ topic 名字为first 生产者生产数据到topic,其他的从都去拉取主的消息。
1.6 删除topic¶
kafka-topics.sh \ --zookeeper master:2181 \ --delete \ --topic first
在删除之前,一定要关闭对应的生产者和消费者进程
详细操作
[root@master ~]# kafka-topics.sh \ > --zookeeper master:2181 \ > --delete \ > --topic first Topic first is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. <=== 显示bug [root@master ~]# kafka-topics.sh --zookeeper master:2181 --list __consumer_offsets