配置Confluent Kafka Python客户端

通过Kafka消费数据,此前是使用kafka-python库,但是观察到性能上不去,想用Confluent Kafka Python替换,看是否有提升的可能性。

基本环境准备

操作系统: CentOS 7
Python版本: Python 2
依赖准备:

1
2
3
4
5
$ yum -y install cyrus-sasl-gssapi cyrus-sasl-devel  zlib-devel lz4 lz4-devel libzstd-devel libcurl-devel openssl openssl-devel python-devel
$ yum -y install git gcc g++ make gcc-c++
$ yum -y install epel-release
$ yum -y install python-pip
$ yum -y install python-virtualenv

配置librdkafka

librdkafka是Apache Kafka协议的C库实现,提供Producer、Consumer 和Admin客户端。它的设计考虑了消息传递的可靠性和高性能。

1
2
3
4
5
$ git clone https://github.com/edenhill/librdkafka.git
$ cd librdfafka
$ ./configure
$ make
$ make install

install之后,可以找/usr/local/lib目录找到librdkafka的静态库、动态库:

1
2
3
4
5
6
7
8
9
10
$ ls -lrt /usr/local/lib
total 54864
-rwxr-xr-x. 1 root root 27578060 Feb 3 05:22 librdkafka.a
-rwxr-xr-x. 1 root root 4346000 Feb 3 05:22 librdkafka-static.a
-rwxr-xr-x. 1 root root 17988344 Feb 3 05:22 librdkafka.so.1
lrwxrwxrwx. 1 root root 15 Feb 3 05:22 librdkafka.so -> librdkafka.so.1
-rwxr-xr-x. 1 root root 4597704 Feb 3 05:22 librdkafka++.a
-rwxr-xr-x. 1 root root 1660344 Feb 3 05:22 librdkafka++.so.1
drwxr-xr-x. 2 root root 96 Feb 3 05:22 pkgconfig
lrwxrwxrwx. 1 root root 17 Feb 3 05:22 librdkafka++.so -> librdkafka++.so.1

通过ldconfig添加新的链接库:

1
2
$ echo "/usr/local/lib">> /etc/ld.so.conf
$ ldconfig

Confluent Kafka Python

安装confluent kafka python客户端,其对librdkafka进行了封装:

1
$ pip install --no-binary :all: confluent-kafka==1.7.0 -i https://mirrors.aliyun.com/pypi/simple

注意: 此处如果需要使用SASL Kerberos/GSSAPI,那么需要使用上面的模式。

另外因测试环境默认使用系统自带Python2,目前能使用的confluent-kafka版本为1.7.0,最新的1.8.2版本会报错如下:

1
2
3
4
5
6
7
8
9
Collecting confluent-kafka
Downloading https://mirrors.aliyun.com/pypi/packages/fb/16/d04dded73439266a3dbcd585f1128483dcf509e039bacd93642ac5de97d4/confluent-kafka-1.8.2.tar.gz (104kB)
100% |████████████████████████████████| 112kB 2.5MB/s
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-build-cY7cdF/confluent-kafka/setup.py", line 12, in <module>
with open(os.path.join(work_dir, 'README.md'), encoding='utf-8') as f:
TypeError: 'encoding' is an invalid keyword argument for this function

程序验证

接下来,我们写个简单的生产、消费程序进行验证。
生产端:

1
2
3
4
5
6
7
8
9
10
11
12
13
from confluent_kafka import Producer


p = Producer({
"bootstrap.servers": "192.168.0.110:9092"
})

aim = '{"_id":"61fbf5b54dfce68a0eacc54e","index":0,"guid":"76db89fa-c818-4fcb-acb2-c58e9dad7fab","isActive":true,"balance":"$1,521.30","favoriteFruit":"banana"}'

for i in range(0, 1000):
p.produce("HelloWorld", aim)

p.flush()

消费端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from confluent_kafka import Consumer
from confluent_kafka import TopicPartition
from confluent_kafka import KafkaException


c = Consumer({
"bootstrap.servers": "192.168.0.110:9092",
"group.id": "hello",
"auto.offset.reset": "earliest"
})

partition = TopicPartition("HelloWorld", 0)
c.assign([partition])
records = c.consume(timeout=3, num_messages=50)
if records:
c.commit(message=records[-1], asynchronous=False)
for i in records:
print i.value()
c.close()

通过describe可以看到可以正常生产、消费:

1
2
3
4
5
6
$ bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.110:9092 --describe --group hello

Consumer group 'hello' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
hello HelloWorld 0 200 1000 800 - - -

同时我们通过tcpdump抓包验证可以看到客户端版本等信息:

1
2
3
4
5
6
7
8
9
10
11:49:12.105528 IP 192.168.0.106.45262 > top.XmlIpcRegSvc: Flags [P.], seq 1:78, ack 1, win 229, options [nop,nop,TS val 24601420 ecr 2579566433], length 77
0x0000: 4500 0081 3056 4000 4006 87f8 c0a8 006a E...0V@.@......j
0x0010: c0a8 006e b0ce 2384 7d21 5320 ae52 0dae ...n..#.}!S..R..
0x0020: 8018 00e5 829c 0000 0101 080a 0177 634c .............wcL
0x0030: 99c1 0f61 0000 0049 0012 0003 0000 0001 ...a...I........
0x0040: 0007 7264 6b61 666b 6100 1763 6f6e 666c ..rdkafka..confl
0x0050: 7565 6e74 2d6b 6166 6b61 2d70 7974 686f uent-kafka-pytho
0x0060: 6e1f 312e 372e 302d 7264 6b61 666b 612d n.1.7.0-rdkafka-
0x0070: 312e 382e 322d 3439 2d67 3439 3932 6233 1.8.2-49-g4992b3
0x0080: 00 .

参考

1、https://github.com/confluentinc/confluent-kafka-python/tree/v1.7.0
2、https://github.com/edenhill/librdkafka
3、https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#

0%