了解了Kafka的术语以后,尝试使用Python操作Kafka,下面是一个简单的demo,为啥不用Java呢…因为没学过…等我哪天研究研究👀
客户端 查找了一下,Kafka的Python客户端常用的基本就三种:Kafka-python
、pykafka
、confluent-kafka-python
,也有人对这三种客户端做了Benchmark: Python Kafka Client Benchmarking ,但是感觉Kafka-python
相关的文章多一点,就先用这个吧,等熟悉下操作,看情况再说要不要更换其他的客户端。
安装Kafka-python
:
1 pip install Kafka-python
创建Topic 仍然在单机服务上学习,先创建一个具有两个分区的Topic:
1 2 3 $ bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 2 --topic Hello $ bin/kafka-topics.sh --list --bootstrap-server 0.0.0.0:9092 Hello
Producer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from kafka import KafkaProducerfrom kafka.errors import KafkaErrordef main (): producer = KafkaProducer(bootstrap_servers=["top:9092" ]) for _ in range (100 ): future = producer.send("Hello" , b'msg' ) try : record_metadata = future.get(timeout=10 ) print (record_metadata) except KafkaError as e: print (e) producer.flush() if __name__ == '__main__' : main()
Consumer 看过书后,我这里采用异步与同步组合的方式手动提交偏移量,避免提交时产生问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 from kafka import KafkaConsumerfrom kafka.structs import TopicPartitionimport timeclass Consumer : def __init__ (self ): self .consumer = KafkaConsumer( group_id="my" , auto_offset_reset='earliest' , enable_auto_commit=False , bootstrap_servers=["top:9092" ] ) def consumer_data (self, topic, partition ): my_partition = TopicPartition(topic=topic, partition=partition) self .consumer.assign([my_partition]) print (f"consumer start position: {self.consumer.position(my_partition)} " ) try : while True : poll_num = self .consumer.poll(timeout_ms=1000 , max_records=5 ) if poll_num == {}: print ("consumer poll is empty, will exit" ) exit(1 ) for key, record in poll_num.items(): for message in record: print ( f"{message.topic} :{message.partition} :{message.offset} : key={message.key} value={message.value} " ) try : self .consumer.commit_async() time.sleep(5 ) except Exception as e: print (e) except Exception as e: print (e) finally : try : self .consumer.commit() finally : self .consumer.close() def main (): topic = "Hello" partition = 0 my_consumer = Consumer() my_consumer.consumer_data(topic, partition) if __name__ == '__main__' : main()
结果 上面的代码中Consumer中通过group_id
的方式可以让多个Consumer消费同一Topic的不同分区,以消费Hello主题的0分区为例,运行Producer代码向Hello中送入数据,再启动Consumer代码可以看到:
1 2 3 4 5 6 7 8 9 > python kz_consumer.py consumer start position: 0 Hello:0 :0 : key=None value=b'msg' Hello:0 :1 : key=None value=b'msg' Hello:0 :2 : key=None value=b'msg' Hello:0 :3 : key=None value=b'msg' Hello:0 :4 : key=None value=b'msg' Hello:0 :5 : key=None value=b'msg' ...
同时通过终端可以查看到该消费组的LAG信息:
1 2 3 4 5 6 7 8 9 10 11 $ bin/kafka-consumer-groups.sh --bootstrap-server 0.0.0.0:9092 --group my --describe Consumer group 'my' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my Hello 0 10 53 43 - - - $ bin/kafka-consumer-groups.sh --bootstrap-server 0.0.0.0:9092 --group my --describe Consumer group 'my' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my Hello 0 53 53 0 - - -
到这一个简单的demo完事,要继续去看看官方文档具体的API怎么用…
参考 1、https://kafka-python.readthedocs.io/en/master/usage.html 2、Kafka权威指南