通过Kafka消费数据,此前是使用kafka-python库,但是观察到性能上不去,想用Confluent Kafka Python替换,看是否有提升的可能性。
基本环境准备
操作系统: CentOS 7
Python版本: Python 2
依赖准备:
1 | yum -y install cyrus-sasl-gssapi cyrus-sasl-devel zlib-devel lz4 lz4-devel libzstd-devel libcurl-devel openssl openssl-devel python-devel |
配置librdkafka
librdkafka是Apache Kafka协议的C库实现,提供Producer、Consumer 和Admin客户端。它的设计考虑了消息传递的可靠性和高性能。
1 | git clone https://github.com/edenhill/librdkafka.git |
install之后,可以找/usr/local/lib
目录找到librdkafka的静态库、动态库:
1 | ls -lrt /usr/local/lib |
通过ldconfig
添加新的链接库:
1 | echo "/usr/local/lib">> /etc/ld.so.conf |
Confluent Kafka Python
安装confluent kafka python客户端,其对librdkafka进行了封装:
1 | pip install --no-binary :all: confluent-kafka==1.7.0 -i https://mirrors.aliyun.com/pypi/simple |
注意: 此处如果需要使用SASL Kerberos/GSSAPI,那么需要使用上面的模式。
另外因测试环境默认使用系统自带Python2,目前能使用的confluent-kafka
版本为1.7.0
,最新的1.8.2
版本会报错如下:
1 | Collecting confluent-kafka |
程序验证
接下来,我们写个简单的生产、消费程序进行验证。
生产端:
1 | from confluent_kafka import Producer |
消费端:
1 | from confluent_kafka import Consumer |
通过describe可以看到可以正常生产、消费:
1 | bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.110:9092 --describe --group hello |
同时我们通过tcpdump抓包验证可以看到客户端版本等信息:
1 | 11:49:12.105528 IP 192.168.0.106.45262 > top.XmlIpcRegSvc: Flags [P.], seq 1:78, ack 1, win 229, options [nop,nop,TS val 24601420 ecr 2579566433], length 77 |
参考
1、https://github.com/confluentinc/confluent-kafka-python/tree/v1.7.0
2、https://github.com/edenhill/librdkafka
3、https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#