Kafka命令

鸡汤: 每一个你讨厌的现在,都有一个不够努力的曾经!

1. 基础命令

1. 创建topic
kafka-topics.sh \
--zookeeper master:2181 \
--create \
--replication-factor 3 \
--partitions 1 \
--topic first

2. 查看topic
kafka-topics.sh \
--zookeeper master:2181 \
--list

2. 查看详细topic
kafka-topics.sh \
--zookeeper master:2181 \
--describe --topic first

3. 生产者
kafka-console-producer.sh \
--broker-list master:9092 \
--topic first

4. 消费者
kafka-console-consumer.sh \
--bootstrap-server master:9092 \
--topic first \
--from-beginning

5. 删除topic
kafka-topics.sh \
--zookeeper master:2181 \
--delete \
--topic first

1.1 创建topic

kafka-topics.sh \
--zookeeper master:2181 \
--create \
--replication-factor 3 \
--partitions 1 \
--topic first

选项解释

--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数
详细操作
[root@master ~]# kafka-topics.sh \
> --zookeeper master:2181 \
> --create \
> --replication-factor 3 \
> --partitions 1 \
> --topic first
Created topic "first".

1.2 查看所有topic

kafka-topics.sh --zookeeper master:2181 --list
详细操作
[root@master ~]# kafka-topics.sh --zookeeper master:2181 --list
first

# 注意: 副本数一定要小于机器集群个数否则会报错
[root@master ~]# kafka-topics.sh --zookeeper master:2181 --create --replication-factor 4 --partitions 1 --topic second
Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
[2019-08-22 19:25:40,799] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
 (kafka.admin.TopicCommand$)

1.3 发送消息

kafka-console-producer.sh \
--broker-list master:9092 \
--topic first

kafka的默认端口是9092

详细操作
[root@master ~]# kafka-console-producer.sh \
> --broker-list master:9092 \
> --topic first
>hello caimengzhi

1.4 消费消息

#新版本
kafka-console-consumer.sh \
--bootstrap-server master:9092 \
--topic first \
--from-beginning

# 老版本
kafka-console-consumer.sh \
--zookeeper master:2181 \
--from-beginning \
--topic first

--from-beginning 会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

详细操作
[root@slave2 logs]# kafka-console-consumer.sh \
> --bootstrap-server master:9092 \
> --topic first \
> --from-beginning
hello caimengzhi
低版本报错
[root@slave2 logs]# kafka-console-consumer.sh --zookeeper master:2181 --topic first
zookeeper is not a recognized option
Option                                   Description                            
------                                   -----------                            
--bootstrap-server <String: server to    REQUIRED: The server(s) to connect to. 
  connect to>                                                                   
--consumer-property <String:             A mechanism to pass user-defined       
  consumer_prop>                           properties in the form key=value to  
                                           the consumer.                        
--consumer.config <String: config file>  Consumer config properties file. Note  
                                           that [consumer-property] takes       
                                           precedence over this config.         
--enable-systest-events                  Log lifecycle events of the consumer   
                                           in addition to logging consumed      
                                           messages. (This is specific for      
                                           system tests.)                       
--formatter <String: class>              The name of a class to use for         
                                           formatting kafka messages for        
                                           display. (default: kafka.tools.      
                                           DefaultMessageFormatter)             
--from-beginning                         If the consumer does not already have  
                                           an established offset to consume     
                                           from, start with the earliest        
                                           message present in the log rather    
                                           than the latest message.             
--group <String: consumer group id>      The consumer group id of the consumer. 
--isolation-level <String>               Set to read_committed in order to      
                                           filter out transactional messages    
                                           which are not committed. Set to      
                                           read_uncommittedto read all          
                                           messages. (default: read_uncommitted)
--key-deserializer <String:                                                     
  deserializer for key>                                                         
--max-messages <Integer: num_messages>   The maximum number of messages to      
                                           consume before exiting. If not set,  
                                           consumption is continual.            
--offset <String: consume offset>        The offset id to consume from (a non-  
                                           negative number), or 'earliest'      
                                           which means from beginning, or       
                                           'latest' which means from end        
                                           (default: latest)                    
--partition <Integer: partition>         The partition to consume from.         
                                           Consumption starts from the end of   
                                           the partition unless '--offset' is   
                                           specified.                           
--property <String: prop>                The properties to initialize the       
                                           message formatter. Default           
                                           properties include:                  
                                            print.timestamp=true|false            
                                            print.key=true|false                  
                                            print.value=true|false                
                                            key.separator=<key.separator>         
                                            line.separator=<line.separator>       
                                            key.deserializer=<key.deserializer>   
                                            value.deserializer=<value.            
                                           deserializer>                        
                                         Users can also pass in customized      
                                           properties for their formatter; more 
                                           specifically, users can pass in      
                                           properties keyed with 'key.          
                                           deserializer.' and 'value.           
                                           deserializer.' prefixes to configure 
                                           their deserializers.                 
--skip-message-on-error                  If there is an error when processing a 
                                           message, skip it instead of halt.    
--timeout-ms <Integer: timeout_ms>       If specified, exit if no message is    
                                           available for consumption for the    
                                           specified interval.                  
--topic <String: topic>                  The topic id to consume on.            
--value-deserializer <String:                                                   
  deserializer for values>                                                      
--whitelist <String: whitelist>          Whitelist of topics to include for     
                                           consumption.

查阅资料后发现是kafka的版本问题,低版本的kafka可以使用以上的命令,但是在高版本的kafka中需要使用如下命令才行:
kafka-console-consumer.sh \
--bootstrap-server master:9092 \
--topic first \
--from-beginning

1.5 查看某个Topic的详情

kafka-topics.sh \
--zookeeper master:2181 \
--describe --topic first
详细操作
[root@master ~]# kafka-topics.sh \
> --zookeeper master:2181 \
> --describe --topic first
Topic:first PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: first    Partition: 0    Leader: 1   Replicas: 1,0,2 Isr: 2,0,1
         |            |               |           |               |________ 选举用的,谁复制的数据越多,就靠前,主挂了后优先选择它
         |            |               |           |________________________ 三个副本,所在机器位置 broker.id
         |            |               |____________________________________ leader所在机器 broker.id
         |            |____________________________________________________ 分区数所在位置broker.id
         |_________________________________________________________________ topic 名字为first
生产者生产数据到topic,其他的从都去拉取主的消息。

1.6 删除topic

kafka-topics.sh \
--zookeeper master:2181 \
--delete \
--topic first

在删除之前,一定要关闭对应的生产者和消费者进程

详细操作
[root@master ~]# kafka-topics.sh \
> --zookeeper master:2181 \
> --delete \
> --topic first
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true. <=== 显示bug
[root@master ~]# kafka-topics.sh --zookeeper master:2181 --list
__consumer_offsets