Collecting confluent-kafka Downloading https://mirrors.aliyun.com/pypi/packages/fb/16/d04dded73439266a3dbcd585f1128483dcf509e039bacd93642ac5de97d4/confluent-kafka-1.8.2.tar.gz (104kB) 100% |████████████████████████████████| 112kB 2.5MB/s Complete output from command python setup.py egg_info: Traceback (most recent call last): File "<string>", line 1, in <module> File "/tmp/pip-build-cY7cdF/confluent-kafka/setup.py", line 12, in <module> with open(os.path.join(work_dir, 'README.md'), encoding='utf-8') as f: TypeError: 'encoding' is an invalid keyword argument for this function
程序验证
接下来,我们写个简单的生产、消费程序进行验证。 生产端:
1 2 3 4 5 6 7 8 9 10 11 12 13
from confluent_kafka import Producer
p = Producer({ "bootstrap.servers": "192.168.0.110:9092" })
for i inrange(0, 1000): p.produce("HelloWorld", aim)
p.flush()
消费端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
from confluent_kafka import Consumer from confluent_kafka import TopicPartition from confluent_kafka import KafkaException
c = Consumer({ "bootstrap.servers": "192.168.0.110:9092", "group.id": "hello", "auto.offset.reset": "earliest" })
partition = TopicPartition("HelloWorld", 0) c.assign([partition]) records = c.consume(timeout=3, num_messages=50) if records: c.commit(message=records[-1], asynchronous=False) for i in records: print i.value() c.close()