Python操作Kafka的简单示例

了解了Kafka的术语以后,尝试使用Python操作Kafka,下面是一个简单的demo,为啥不用Java呢…因为没学过…等我哪天研究研究👀

客户端

查找了一下,Kafka的Python客户端常用的基本就三种:Kafka-pythonpykafkaconfluent-kafka-python,也有人对这三种客户端做了Benchmark: Python Kafka Client Benchmarking,但是感觉Kafka-python相关的文章多一点,就先用这个吧,等熟悉下操作,看情况再说要不要更换其他的客户端。

安装Kafka-python:

1
pip install Kafka-python

创建Topic

仍然在单机服务上学习,先创建一个具有两个分区的Topic:

1
2
3
$ bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 2 --topic Hello
$ bin/kafka-topics.sh --list --bootstrap-server 0.0.0.0:9092
Hello

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from kafka import KafkaProducer
from kafka.errors import KafkaError


def main():
producer = KafkaProducer(bootstrap_servers=["top:9092"])

for _ in range(100):
future = producer.send("Hello", b'msg')
try:
record_metadata = future.get(timeout=10)
print(record_metadata)
except KafkaError as e:
print(e)
producer.flush()


if __name__ == '__main__':
main()

Consumer

看过书后,我这里采用异步与同步组合的方式手动提交偏移量,避免提交时产生问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time


class Consumer:
def __init__(self):
self.consumer = KafkaConsumer(
group_id="my",
auto_offset_reset='earliest',
enable_auto_commit=False,
bootstrap_servers=["top:9092"]
)

def consumer_data(self, topic, partition):
my_partition = TopicPartition(topic=topic, partition=partition)
self.consumer.assign([my_partition])

print(f"consumer start position: {self.consumer.position(my_partition)}")

try:
while True:
poll_num = self.consumer.poll(timeout_ms=1000, max_records=5)
if poll_num == {}:
print("consumer poll is empty, will exit")
exit(1)
for key, record in poll_num.items():
for message in record:
print(
f"{message.topic}:{message.partition}:{message.offset}: key={message.key} value={message.value}")

try:
self.consumer.commit_async()
time.sleep(5)
except Exception as e:
print(e)
except Exception as e:
print(e)
finally:
try:
self.consumer.commit()
finally:
self.consumer.close()


def main():
topic = "Hello"
partition = 0
my_consumer = Consumer()
my_consumer.consumer_data(topic, partition)


if __name__ == '__main__':
main()

结果

上面的代码中Consumer中通过group_id的方式可以让多个Consumer消费同一Topic的不同分区,以消费Hello主题的0分区为例,运行Producer代码向Hello中送入数据,再启动Consumer代码可以看到:

1
2
3
4
5
6
7
8
9
> python kz_consumer.py
consumer start position: 0
Hello:0:0: key=None value=b'msg'
Hello:0:1: key=None value=b'msg'
Hello:0:2: key=None value=b'msg'
Hello:0:3: key=None value=b'msg'
Hello:0:4: key=None value=b'msg'
Hello:0:5: key=None value=b'msg'
...

同时通过终端可以查看到该消费组的LAG信息:

1
2
3
4
5
6
7
8
9
10
11
$ bin/kafka-consumer-groups.sh --bootstrap-server 0.0.0.0:9092 --group my --describe
Consumer group 'my' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my Hello 0 10 53 43 - - -

$ bin/kafka-consumer-groups.sh --bootstrap-server 0.0.0.0:9092 --group my --describe
Consumer group 'my' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my Hello 0 53 53 0 - - -

到这一个简单的demo完事,要继续去看看官方文档具体的API怎么用…

参考

1、https://kafka-python.readthedocs.io/en/master/usage.html
2、Kafka权威指南