一般使用kafka时会通过SASL做客户端认证,比如Kerberos SASL GSSAPI,只需要管理员授权即可,认证后流量侧的表现仍是明文的,考虑到数据安全问题,想避免中间设备通过流量还原的方式获取数据,需要对kafka服务开启SSL实现通信加密,实际现在都是TLS了,但是配置文件中仍然使用SSL。
环境介绍 kafka版本: 2.6.3 zookeeper: 使用kafka自带zookeeper版本,3.5.9 采用单机三节点测试,为了便于理解,目录结构大致如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 ├── bin ├── cert │ ├── server.keystore.jks │ └── server.truststore.jks │ └── client.keystore.jks │ └── client.truststore.jks ├── config │ ├── client.properties #客户端配置 │ ├── kafka_op.properties #kafka-topic.sh使用 │ ├── consumer.properties │ ├── producer.properties │ ├── server01.properties │ ├── server02.properties │ ├── server03.properties │ ├── zookeeper01.properties │ ├── zookeeper02.properties │ ├── zookeeper03.properties │ ├── zookeeper_client.properties # kafka-acls.sh连接Zookeeper SSL配置 ├── data │ ├── kafka │ │ ├── 01 │ │ ├── 02 │ │ ├── 03 │ └── zookeeper │ │ ├── 01 │ │ ├── 02 │ │ ├── 03 ├── libs ├── LICENSE ├── licenses ├── logs ├── NOTICE └── site-docs
Zookeeper基本配置 首先为每个节点分配不同的ID并写入myid文件
1 echo "1" > /home/top/KZ/kafka_2.13-2.6.3/data/zookeeper/01/myid
编辑zookeeper的配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 clientPort=2181 # 限制客户端最大连接数,预防DOS攻击 maxClientCnxns=60 # 禁用管理控制台 admin.enableServer=false # 设置snapshot目录、日志目录 dataDir=/home/top/KZ/kafka_2.13-2.6.3/data/zookeeper/01/ dataLogDir=/home/top/KZ/kafka_2.13-2.6.3/logs/zookeeper/01/ tickTime=2000 initLimit=10 syncLimit=5 server.1=192.168.0.114:2887:3887 server.2=192.168.0.114:2888:3888 server.3=192.168.0.114:2889:3889 # 配置日志清理策略,保留10个快照,24小时清理一次 autopurge.snapRetainCount=10 autopurge.purgeInterval=24
分别启动三个zookeeper节点:
1 $ bin/zookeeper-server-start.sh -daemon config/zookeeper01.properties
生成JKS文件 kafka支持通过SSL的方式进行安全加固,配置SSL之前,我们要先解决证书的问题。首先通过OpenSSL生成自签名证书 的方式做了证书,在这里建议配置证书密码,当然不建议用弱口令,这里只是作为示例:
1 A challenge password []:1q2w3e4r5t6y
因为kafka在2.7.0版本才支持PEM证书(见KIP-651),在此之前都是使用JKS。那么可以通过如下方式将之前做的证书转为JKS。
1 2 3 4 5 6 7 8 9 10 11 12 13 $ openssl pkcs12 -export -in demoCA/server.crt -inkey server.key -name myserver.internal.net > server.p12 Enter Export Password: Verifying - Enter Export Password: $ keytool -importkeystore -srckeystore server.p12 -destkeystore server.keystore.jks -srcstoretype pkcs12 -alias myserver.internal.net Importing keystore server.p12 to server.keystore.jks... Enter destination keystore password: Re-enter new password: Enter source keystore password: $ keytool -keystore server.truststore.jks -alias CARoot -import -file ca.crt Enter keystore password: Re-enter new password:
现在生成了server.keystore.jks
和server.truststore.jks
文件,client.keystore.jks
和client.truststore.jks
同理,关于keystore和truststore的区别从上面的转换步骤可以看出来,也可以阅读以下文章:
注意: 使用时要注意JDK版本问题,如果是JDK 11生成的JKS,在JDK 8下无法使用。
kafka配置 编辑server.properties
文件,注意因为是单机测试,在不同节点的配置文件中需要自行修改id、端口、目录等配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 broker.id=1 security.inter.broker.protocol=SSL security.protocol=SSL ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks ssl.keystore.password=1q2w3e4r5t6y ssl.key.password=1q2w3e4r5t6y ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks ssl.truststore.password=1q2w3e4r5t6y ssl.endpoint.identification.algorithm= ssl.client.auth=required ssl.enabled.protocols=TLSv1.2,TLSv1.1 ssl.secure.random.implementation=SHA1PRNG listeners=SSL://192.168.0.114:9093 log.dirs=/home/top/KZ/kafka_2.13-2.6.3/data/kafka/01/ zookeeper.connect=192.168.0.114:2181,192.168.0.114:2182,192.168.0.114:2183
分别启动三个节点kafka:
1 $ bin/kafka-server-start.sh -daemon config/server01.properties
创建topic用于生产、消费验证:
1 2 3 $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HelloWorld $ bin/kafka-topics.sh --list --zookeeper localhost:2181
将部分SSL配置分别写入producer.properties
、consumer.properties
,并修改bootstrap.servers
、group.id
等信息用于测试:
1 2 3 4 5 6 7 security.protocol=SSL ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks ssl.keystore.password=1q2w3e4r5t6y ssl.key.password=1q2w3e4r5t6y ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks ssl.truststore.password=1q2w3e4r5t6y ssl.endpoint.identification.algorithm=
两个终端分别执行,另外可以开启tcpdump抓包,观察流量侧的表现:
1 2 3 $ bin/kafka-console-producer.sh --broker-list 192.168.0.114:9093 --topic HelloWorld --producer.config config/producer.properties $ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.114:9093 --topic HelloWorld --consumer.config config/consumer.properties --from-beginning
程序测试 如果是JAVA代码,那么JKS文件是可以直接使用的,对于Python来说需要使用我们上面制作的证书文件:ca.key
、ca.crt
、client.crt
(使用server.crt
也是可以的)。在这里我们仍区分kafka-python与confluent-kafka-python,关于二者的区别,可以查看上一篇文章kafka-python VS confluent-kafka-python 。
kafka-python 先看kafka-python,网上搜索大部分资料,都是类似这样的配置:
1 2 3 4 5 6 7 8 9 10 11 from kafka import KafkaProducerproducer = KafkaProducer( bootstrap_servers=["192.168.0.114:9093" ], ssl_cafile="client.crt" , ssl_certfile="ca.crt" , ssl_keyfile="ca.key" , ssl_check_hostname=False , ssl_password="1q2w3e4r5t6y" , security_protocol="SSL" )
实际测试中因为自签名证书会报错,同时发现在Python2和Python3下的报错还不一样:
1 2 3 4 5 # python 3 ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1056) # python 2 kafka.errors.NoBrokersAvailable: NoBrokersAvailable
正确方式是通过ssl库与参数ssl_*组合使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from kafka import KafkaProducerimport sslcert = "ca.crt" key = "ca.key" context = ssl.create_default_context() context.load_cert_chain(certfile=cert, keyfile=key, password="1q2w3e4r5t6y" ) context.check_hostname = False context.verify_mode = ssl.CERT_NONE producer = KafkaProducer( bootstrap_servers=["192.168.0.114:9093" , "192.168.0.114:9095" , "192.168.0.114:9097" ], ssl_context=context, ssl_cafile="client.crt" , security_protocol="SSL" , )
confluent-kafka-python 同样的需求,confluent-kafka-python中设置enable.ssl.certificate.verification
为False即可解决自签名证书问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from confluent_kafka import Consumerfrom confluent_kafka import TopicPartitionfrom confluent_kafka import KafkaExceptionclass ConfluentConsumer : def __init__ (self ): self .servers = ["192.168.0.114:9093" , "192.168.0.114:9095" , "192.168.0.114:9097" ] self .bootstrap_servers = "," .join(self .servers) self .config = Consumer( { 'bootstrap.servers' : self .bootstrap_servers, 'group.id' : 'hello' , 'auto.offset.reset' : 'earliest' , 'enable.auto.commit' : False , 'enable.ssl.certificate.verification' : False , 'security.protocol' : 'SSL' , 'ssl.ca.location' : '/home/top/cert/key/demoCA/client.crt' , 'ssl.certificate.location' : '/home/top/cert/key/ca.crt' , 'ssl.key.location' : '/home/top/cert/key/ca.key' , 'ssl.key.password' : '1q2w3e4r5t6y' } )
Zookeeper添加SSL配置 通过上面的配置,我们已经完成kafka的SSL通信,但Zookeeper中仍然是明文信息,接下来为Zookeeper也添加SSL配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # 注释掉clientPort,改用secureClientPort # clientPort=2181 secureClientPort=2181 serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider ssl.keyStore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks ssl.keyStore.password=1q2w3e4r5t6y ssl.protocol=TLSv1.2 ssl.trustStore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks ssl.trustStore.password=1q2w3e4r5t6y sslQuorum=true ssl.quorum.keyStore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks ssl.quorum.keyStore.password=1q2w3e4r5t6y ssl.quorum.trustStore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks ssl.quorum.trustStore.password=1q2w3e4r5t6y ssl.quorum.hostnameVerification=false ssl.hostnameVerification=false ssl.endpoint.identification.algorithm=
修改完Zookeeper配置,重启服务后,可以在日志中看到类似如下内容,证明服务启动正常:
1 2 3 INFO Received connection request from /192.168.0.114:59150 (org.apache.zookeeper.server.quorum.QuorumCnxManager) INFO Accepted TLS connection from /192.168.0.114:59150 - TLSv1.2 - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256 (org.apache.zookeeper.server.quorum.UnifiedServerSocket) INFO Notification: 2 (message format version), 3 (n.leader), 0x600000123 (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x6 (n.peerEPoch), FOLLOWING (my state)0 (n.config version) (org.apache.zookeeper.server.quorum.FastLeaderElection)
同时kafka的配置文件也要做相应的修改并重启服务,感兴趣的小伙伴可以查看KIP-515了解背景:
1 2 3 4 5 6 7 8 9 10 11 zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty zookeeper.ssl.client.enable=true zookeeper.ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks zookeeper.ssl.keystore.password=1q2w3e4r5t6y zookeeper.ssl.protocol=TLSv1.2 zookeeper.ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks zookeeper.ssl.truststore.password=1q2w3e4r5t6y zookeeper.ssl.quorum.hostnameVerification=false zookeeper.ssl.hostnameVerification=false zookeeper.ssl.endpoint.identification.algorithm=
另外,我们需要创建一个kafka_op.properties
配置用于操作,开启SSL后此前的命令已经无法获取信息:
1 2 $ bin/kafka-topics.sh --list --zookeeper localhost:2181 Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
kafka_op.properties
配置如下:
1 2 3 4 5 6 7 8 9 10 security.protocol=SSL ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks ssl.keystore.password=1q2w3e4r5t6y ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks ssl.truststore.password=1q2w3e4r5t6y ssl.key.password=1q2w3e4r5t6y ssl.enabled.protocols=TLSv1.2 enable.ssl.certificate.verification=false ssl.endpoint.identification.algorithm=
可能你会问,为什么kafka_op.properties
配置比上面的producer.properties
、consumer.properties
配置多了三行,如果不加会因为自签名证书校验问题报错
1 Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager) org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
现在可以通过下面的命令进行测试:
1 2 3 4 5 6 7 8 # 列出当前topic列表 $ bin/kafka-topics.sh --list --bootstrap-server 192.168.0.114:9093 --command-config config/kafka_op.properties # 创建Topic $ bin/kafka-topics.sh --create --bootstrap-server 192.168.0.114:9093 --replication-factor 1 --partitions 1 --topic HelloWorld --command-config config/kafka_op.properties # 删除Topic $ bin/kafka-topics.sh --delete --topic HelloWorld --bootstrap-server 192.168.0.114:9093 --command-config config/kafka_op.properties
在执行命令时如果遇到类似警告,直接忽略即可,由kafka自身的问题造成,并不影响使用,详见issues KAFKA-10090,在2.8.0版本已经修复:
1 2 3 4 WARN The configuration 'ssl.truststore.location' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig) WARN The configuration 'ssl.keystore.password' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig) WARN The configuration 'ssl.enabled.protocols' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig) ....
ACL授权 通过上面的部署,我们对Zookeeper、kafka的通信都开启了SSL,那么接下来进行ACL授权。采用ssl.principal.mapping.rules
规则将CN字段作为User名称,并配置超级用户可以对Kafka集群中的任何资源执行任何操作,在server.properties
中添加如下内容后重启Broker:
1 2 3 4 authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=false super.users=User:www.applenice.net ssl.principal.mapping.rules=RULE:^CN=([^,]*?),.*$/$1/
接下来,我们新建一个客户端配置client.properties
:
1 2 3 4 5 6 7 8 security.protocol=SSL ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/client.keystore.jks ssl.keystore.password=1q2w3e4r5t6y ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/client.truststore.jks ssl.truststore.password=1q2w3e4r5t6y ssl.key.password=1q2w3e4r5t6y ssl.enabled.protocols=TLSv1.2 ssl.endpoint.identification.algorithm=
直接使用该配置测试生产、消费时会因为ACL未授权无法访问topic。
1 2 3 4 5 6 7 $ bin/kafka-console-producer.sh --broker-list 192.168.0.114:9093 --topic HelloWorld --producer.config config/client.properties > eee org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [HelloWorld] $ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.114:9093 --topic HelloWorld --consumer.config config/client.properties --from-beginning org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [HelloWorld] Processed a total of 0 messages
同时在logs/kafka-authorizer.log
日志中也可以看到如下信息:
INFO Principal = User:client.applenice.net is Denied Operation = Describe from host = 192.168.0.114 on resource = Topic:LITERAL:HelloWorld for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger)
kafka中使用kafka-acls.sh
脚本为SSL用户授予集群的权限,因为已经配置了Zookeeper SSL,需要建立一个创建zookeeper_client.properties
文件,并写入如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 zookeeper.connect=192.168.0.114:2181,192.168.0.114:2182,192.168.0.114:2183 zookeeper.connection.timeout.ms=18000 zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty zookeeper.ssl.client.enable=true zookeeper.ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks zookeeper.ssl.keystore.password=1q2w3e4r5t6y zookeeper.ssl.protocol=TLSv1.2 zookeeper.ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks zookeeper.ssl.truststore.password=1q2w3e4r5t6y zookeeper.ssl.quorum.hostnameVerification=false zookeeper.ssl.hostnameVerification=false zookeeper.ssl.endpoint.identification.algorithm=
生成SSL证书时,已经指定了Distinguished Name Field,根据设置的RULE,对client证书的CN值设置ACL即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 生产者授权 $ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:client.applenice.net --producer --topic 'HelloWorld' --zk-tls-config-file config/zookeeper_client.properties # 消费者授权 $ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:client.applenice.net --consumer --topic 'HelloWorld' --group '*' --zk-tls-config-file config/zookeeper_client.properties # 查看当前授权: $ bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181 --zk-tls-config-file config/zookeeper_client.properties # 移除producer授权(如需要) $ bin/kafka-acls.sh --remove --producer --topic HelloWorld --authorizer-properties zookeeper.connect=localhost:2181 --zk-tls-config-file config/zookeeper_client.properties # 移除consumer授权(如需要) $ bin/kafka-acls.sh --remove --consumer --topic HelloWorld --group '*' --authorizer-properties zookeeper.connect=localhost:2181 --zk-tls-config-file config/zookeeper_client.properties
重新执行kafka-console-producer.sh
、kafka-console-consumer.sh
命令,可以看到能够正常生产、消费数据,符合预期效果。这里要说明一下,未设置ACL时,按照上面的Python程序配置,可以直接进行生产、消费,具体差别我还没研究清楚。从流量侧的表现来看,命令行程序产生的流量中是带有client.applenice.net
的,Python程序产生的流量中并没有client.applenice.net
,仅包含了www.applenice.net
,能匹配上super.users
应该就是出现问题的原因了。
到这里Kafka SSL通信加密与ACL授权就全部完成了,能够满足需求了😊😆。
参考 1、https://archive.apache.org/dist/kafka/2.7.0/RELEASE_NOTES.html 2、https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key 3、https://smallstep.com/hello-mtls/doc/server/kafka 4、https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication 5、https://zookeeper.apache.org/doc/r3.5.5/zookeeperAdmin.html 6、https://www.orchome.com/171 7、https://github.com/dpkp/kafka-python/issues/2005 8、https://github.com/edenhill/librdkafka/issues/2758 9、https://awesome-it.de/2022/05/21/kafka-security-mtls-acl-authorization/ 10、https://www.ibm.com/docs/en/z-logdata-analytics/5.1.0?topic=zos-configuring-transport-layer-security-apache-kafka 11、https://stackoverflow.com/questions/51959495/how-to-create-kafka-python-producer-with-ssl-configuration 12、https://docs.vmware.com/en/VMware-Smart-Assurance/10.1.0/sa-ui-installation-config-guide-10.1.0/GUID-DF659094-60D3-4E1B-8D63-3DE3ED8B0EDF.html 13、https://knowledge.digicert.com/generalinformation/INFO1745.html 14、https://cwiki.apache.org/confluence/display/KAFKA/KIP-371%3A+Add+a+configuration+to+build+custom+SSL+principal+name 15、https://docs.cloudera.com/runtime/7.2.7/kafka-securing/topics/kafka-secure-principal-name-mapping.html 16、https://cwiki.apache.org/confluence/display/KAFKA/KIP-371%3A+Add+a+configuration+to+build+custom+SSL+principal+name 17、https://docs.confluent.io/platform/current/kafka/authorization.html 18、https://blog.csdn.net/u013887254/article/details/103217425 19、Kafka 核心技术与实战-云环境下的授权该怎么做?