一般使用kafka时会通过SASL做客户端认证,比如Kerberos SASL GSSAPI,只需要管理员授权即可,认证后流量侧的表现仍是明文的,考虑到数据安全问题,想避免中间设备通过流量还原的方式获取数据,需要对kafka服务开启SSL实现通信加密,实际现在都是TLS了,但是配置文件中仍然使用SSL。
环境介绍
kafka版本: 2.6.3
zookeeper: 使用kafka自带zookeeper版本,3.5.9
采用单机三节点测试,为了便于理解,目录结构大致如下:
1 | ├── bin |
Zookeeper基本配置
首先为每个节点分配不同的ID并写入myid文件
1 | echo "1" > /home/top/KZ/kafka_2.13-2.6.3/data/zookeeper/01/myid |
编辑zookeeper的配置文件
1 | clientPort=2181 |
分别启动三个zookeeper节点:
1 | $ bin/zookeeper-server-start.sh -daemon config/zookeeper01.properties |
生成JKS文件
kafka支持通过SSL的方式进行安全加固,配置SSL之前,我们要先解决证书的问题。首先通过OpenSSL生成自签名证书的方式做了证书,在这里建议配置证书密码,当然不建议用弱口令,这里只是作为示例:
1 | A challenge password []:1q2w3e4r5t6y |
因为kafka在2.7.0版本才支持PEM证书(见KIP-651),在此之前都是使用JKS。那么可以通过如下方式将之前做的证书转为JKS。
1 | openssl pkcs12 -export -in demoCA/server.crt -inkey server.key -name myserver.internal.net > server.p12 |
现在生成了server.keystore.jks
和server.truststore.jks
文件,client.keystore.jks
和client.truststore.jks
同理,关于keystore和truststore的区别从上面的转换步骤可以看出来,也可以阅读以下文章:
- https://www.ibm.com/docs/en/zosconn/zosconnect/3.0?topic=connect-keystores-truststores
- https://www.geeksforgeeks.org/difference-between-truststore-and-keystore-in-java/
注意: 使用时要注意JDK版本问题,如果是JDK 11生成的JKS,在JDK 8下无法使用。
kafka配置
编辑server.properties
文件,注意因为是单机测试,在不同节点的配置文件中需要自行修改id、端口、目录等配置
1 | broker.id=1 |
分别启动三个节点kafka:
1 | bin/kafka-server-start.sh -daemon config/server01.properties |
创建topic用于生产、消费验证:
1 | bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HelloWorld |
将部分SSL配置分别写入producer.properties
、consumer.properties
,并修改bootstrap.servers
、group.id
等信息用于测试:
1 | security.protocol=SSL |
两个终端分别执行,另外可以开启tcpdump抓包,观察流量侧的表现:
1 | bin/kafka-console-producer.sh --broker-list 192.168.0.114:9093 --topic HelloWorld --producer.config config/producer.properties |
程序测试
如果是JAVA代码,那么JKS文件是可以直接使用的,对于Python来说需要使用我们上面制作的证书文件:ca.key
、ca.crt
、client.crt
(使用server.crt
也是可以的)。在这里我们仍区分kafka-python与confluent-kafka-python,关于二者的区别,可以查看上一篇文章kafka-python VS confluent-kafka-python。
kafka-python
先看kafka-python,网上搜索大部分资料,都是类似这样的配置:
1 | from kafka import KafkaProducer |
实际测试中因为自签名证书会报错,同时发现在Python2和Python3下的报错还不一样:
1 | python 3 |
正确方式是通过ssl库与参数ssl_*组合使用:
1 | from kafka import KafkaProducer |
confluent-kafka-python
同样的需求,confluent-kafka-python中设置enable.ssl.certificate.verification
为False即可解决自签名证书问题。
1 | from confluent_kafka import Consumer |
Zookeeper添加SSL配置
通过上面的配置,我们已经完成kafka的SSL通信,但Zookeeper中仍然是明文信息,接下来为Zookeeper也添加SSL配置:
1 | # 注释掉clientPort,改用secureClientPort |
修改完Zookeeper配置,重启服务后,可以在日志中看到类似如下内容,证明服务启动正常:
1 | INFO Received connection request from /192.168.0.114:59150 (org.apache.zookeeper.server.quorum.QuorumCnxManager) |
同时kafka的配置文件也要做相应的修改并重启服务,感兴趣的小伙伴可以查看KIP-515了解背景:
1 | zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty |
另外,我们需要创建一个kafka_op.properties
配置用于操作,开启SSL后此前的命令已经无法获取信息:
1 | bin/kafka-topics.sh --list --zookeeper localhost:2181 |
kafka_op.properties
配置如下:
1 | security.protocol=SSL |
可能你会问,为什么kafka_op.properties
配置比上面的producer.properties
、consumer.properties
配置多了三行,如果不加会因为自签名证书校验问题报错
1 | Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager) org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed |
现在可以通过下面的命令进行测试:
1 | # 列出当前topic列表 |
在执行命令时如果遇到类似警告,直接忽略即可,由kafka自身的问题造成,并不影响使用,详见issues KAFKA-10090,在2.8.0版本已经修复:
1 | WARN The configuration 'ssl.truststore.location' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig) |
ACL授权
通过上面的部署,我们对Zookeeper、kafka的通信都开启了SSL,那么接下来进行ACL授权。采用ssl.principal.mapping.rules
规则将CN字段作为User名称,并配置超级用户可以对Kafka集群中的任何资源执行任何操作,在server.properties
中添加如下内容后重启Broker:
1 | authorizer.class.name=kafka.security.authorizer.AclAuthorizer |
接下来,我们新建一个客户端配置client.properties
:
1 | security.protocol=SSL |
直接使用该配置测试生产、消费时会因为ACL未授权无法访问topic。
1 | bin/kafka-console-producer.sh --broker-list 192.168.0.114:9093 --topic HelloWorld --producer.config config/client.properties |
同时在logs/kafka-authorizer.log
日志中也可以看到如下信息:
INFO Principal = User:client.applenice.net is Denied Operation = Describe from host = 192.168.0.114 on resource = Topic:LITERAL:HelloWorld for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger)
kafka中使用kafka-acls.sh
脚本为SSL用户授予集群的权限,因为已经配置了Zookeeper SSL,需要建立一个创建zookeeper_client.properties
文件,并写入如下内容:
1 | zookeeper.connect=192.168.0.114:2181,192.168.0.114:2182,192.168.0.114:2183 |
生成SSL证书时,已经指定了Distinguished Name Field,根据设置的RULE,对client证书的CN值设置ACL即可。
1 | 生产者授权 |
重新执行kafka-console-producer.sh
、kafka-console-consumer.sh
命令,可以看到能够正常生产、消费数据,符合预期效果。这里要说明一下,未设置ACL时,按照上面的Python程序配置,可以直接进行生产、消费,具体差别我还没研究清楚。从流量侧的表现来看,命令行程序产生的流量中是带有client.applenice.net
的,Python程序产生的流量中并没有client.applenice.net
,仅包含了www.applenice.net
,能匹配上super.users
应该就是出现问题的原因了。
到这里Kafka SSL通信加密与ACL授权就全部完成了,能够满足需求了😊😆。
参考
1、https://archive.apache.org/dist/kafka/2.7.0/RELEASE_NOTES.html
2、https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key
3、https://smallstep.com/hello-mtls/doc/server/kafka
4、https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication
5、https://zookeeper.apache.org/doc/r3.5.5/zookeeperAdmin.html
6、https://www.orchome.com/171
7、https://github.com/dpkp/kafka-python/issues/2005
8、https://github.com/edenhill/librdkafka/issues/2758
9、https://awesome-it.de/2022/05/21/kafka-security-mtls-acl-authorization/
10、https://www.ibm.com/docs/en/z-logdata-analytics/5.1.0?topic=zos-configuring-transport-layer-security-apache-kafka
11、https://stackoverflow.com/questions/51959495/how-to-create-kafka-python-producer-with-ssl-configuration
12、https://docs.vmware.com/en/VMware-Smart-Assurance/10.1.0/sa-ui-installation-config-guide-10.1.0/GUID-DF659094-60D3-4E1B-8D63-3DE3ED8B0EDF.html
13、https://knowledge.digicert.com/generalinformation/INFO1745.html
14、https://cwiki.apache.org/confluence/display/KAFKA/KIP-371%3A+Add+a+configuration+to+build+custom+SSL+principal+name
15、https://docs.cloudera.com/runtime/7.2.7/kafka-securing/topics/kafka-secure-principal-name-mapping.html
16、https://cwiki.apache.org/confluence/display/KAFKA/KIP-371%3A+Add+a+configuration+to+build+custom+SSL+principal+name
17、https://docs.confluent.io/platform/current/kafka/authorization.html
18、https://blog.csdn.net/u013887254/article/details/103217425
19、Kafka 核心技术与实战-云环境下的授权该怎么做?