Kafka SSL通信加密与ACL授权

一般使用kafka时会通过SASL做客户端认证,比如Kerberos SASL GSSAPI,只需要管理员授权即可,认证后流量侧的表现仍是明文的,考虑到数据安全问题,想避免中间设备通过流量还原的方式获取数据,需要对kafka服务开启SSL实现通信加密,实际现在都是TLS了,但是配置文件中仍然使用SSL。

环境介绍

kafka版本: 2.6.3
zookeeper: 使用kafka自带zookeeper版本,3.5.9
采用单机三节点测试,为了便于理解,目录结构大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
├── bin
├── cert
│   ├── server.keystore.jks
│   └── server.truststore.jks
│   └── client.keystore.jks
│   └── client.truststore.jks
├── config
│   ├── client.properties #客户端配置
│   ├── kafka_op.properties #kafka-topic.sh使用
│   ├── consumer.properties
│   ├── producer.properties
│   ├── server01.properties
│   ├── server02.properties
│   ├── server03.properties
│   ├── zookeeper01.properties
│   ├── zookeeper02.properties
│   ├── zookeeper03.properties
│   ├── zookeeper_client.properties # kafka-acls.sh连接Zookeeper SSL配置
├── data
│   ├── kafka
│   │   ├── 01
│   │   ├── 02
│   │   ├── 03
│   └── zookeeper
│   │   ├── 01
│   │   ├── 02
│   │   ├── 03
├── libs
├── LICENSE
├── licenses
├── logs
├── NOTICE
└── site-docs

Zookeeper基本配置

首先为每个节点分配不同的ID并写入myid文件

1
echo "1" > /home/top/KZ/kafka_2.13-2.6.3/data/zookeeper/01/myid

编辑zookeeper的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
clientPort=2181
# 限制客户端最大连接数,预防DOS攻击
maxClientCnxns=60
# 禁用管理控制台
admin.enableServer=false
# 设置snapshot目录、日志目录
dataDir=/home/top/KZ/kafka_2.13-2.6.3/data/zookeeper/01/
dataLogDir=/home/top/KZ/kafka_2.13-2.6.3/logs/zookeeper/01/

tickTime=2000
initLimit=10
syncLimit=5

server.1=192.168.0.114:2887:3887
server.2=192.168.0.114:2888:3888
server.3=192.168.0.114:2889:3889

# 配置日志清理策略,保留10个快照,24小时清理一次
autopurge.snapRetainCount=10
autopurge.purgeInterval=24

分别启动三个zookeeper节点:

1
$ bin/zookeeper-server-start.sh -daemon config/zookeeper01.properties

生成JKS文件

kafka支持通过SSL的方式进行安全加固,配置SSL之前,我们要先解决证书的问题。首先通过OpenSSL生成自签名证书的方式做了证书,在这里建议配置证书密码,当然不建议用弱口令,这里只是作为示例:

1
A challenge password []:1q2w3e4r5t6y

因为kafka在2.7.0版本才支持PEM证书(见KIP-651),在此之前都是使用JKS。那么可以通过如下方式将之前做的证书转为JKS。

1
2
3
4
5
6
7
8
9
10
11
12
13
$ openssl pkcs12 -export -in demoCA/server.crt -inkey server.key -name myserver.internal.net > server.p12
Enter Export Password:
Verifying - Enter Export Password:

$ keytool -importkeystore -srckeystore server.p12 -destkeystore server.keystore.jks -srcstoretype pkcs12 -alias myserver.internal.net
Importing keystore server.p12 to server.keystore.jks...
Enter destination keystore password:
Re-enter new password:
Enter source keystore password:

$ keytool -keystore server.truststore.jks -alias CARoot -import -file ca.crt
Enter keystore password:
Re-enter new password:

现在生成了server.keystore.jksserver.truststore.jks文件,client.keystore.jksclient.truststore.jks同理,关于keystore和truststore的区别从上面的转换步骤可以看出来,也可以阅读以下文章:

注意: 使用时要注意JDK版本问题,如果是JDK 11生成的JKS,在JDK 8下无法使用。

kafka配置

编辑server.properties文件,注意因为是单机测试,在不同节点的配置文件中需要自行修改id、端口、目录等配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
broker.id=1

security.inter.broker.protocol=SSL
security.protocol=SSL

ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks
ssl.keystore.password=1q2w3e4r5t6y
ssl.key.password=1q2w3e4r5t6y
ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks
ssl.truststore.password=1q2w3e4r5t6y
ssl.endpoint.identification.algorithm=

ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1
ssl.secure.random.implementation=SHA1PRNG

listeners=SSL://192.168.0.114:9093
log.dirs=/home/top/KZ/kafka_2.13-2.6.3/data/kafka/01/

zookeeper.connect=192.168.0.114:2181,192.168.0.114:2182,192.168.0.114:2183

分别启动三个节点kafka:

1
$ bin/kafka-server-start.sh -daemon config/server01.properties

创建topic用于生产、消费验证:

1
2
3
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HelloWorld

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

将部分SSL配置分别写入producer.propertiesconsumer.properties,并修改bootstrap.serversgroup.id等信息用于测试:

1
2
3
4
5
6
7
security.protocol=SSL
ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks
ssl.keystore.password=1q2w3e4r5t6y
ssl.key.password=1q2w3e4r5t6y
ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks
ssl.truststore.password=1q2w3e4r5t6y
ssl.endpoint.identification.algorithm=

两个终端分别执行,另外可以开启tcpdump抓包,观察流量侧的表现:

1
2
3
$ bin/kafka-console-producer.sh --broker-list 192.168.0.114:9093 --topic HelloWorld --producer.config config/producer.properties

$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.114:9093 --topic HelloWorld --consumer.config config/consumer.properties --from-beginning

程序测试

如果是JAVA代码,那么JKS文件是可以直接使用的,对于Python来说需要使用我们上面制作的证书文件:ca.keyca.crtclient.crt(使用server.crt也是可以的)。在这里我们仍区分kafka-python与confluent-kafka-python,关于二者的区别,可以查看上一篇文章kafka-python VS confluent-kafka-python

kafka-python

先看kafka-python,网上搜索大部分资料,都是类似这样的配置:

1
2
3
4
5
6
7
8
9
10
11
from kafka import KafkaProducer

producer = KafkaProducer(
bootstrap_servers=["192.168.0.114:9093"],
ssl_cafile="client.crt",
ssl_certfile="ca.crt",
ssl_keyfile="ca.key",
ssl_check_hostname=False,
ssl_password="1q2w3e4r5t6y",
security_protocol="SSL"
)

实际测试中因为自签名证书会报错,同时发现在Python2和Python3下的报错还不一样:

1
2
3
4
5
# python 3
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1056)

# python 2
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

正确方式是通过ssl库与参数ssl_*组合使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from kafka import KafkaProducer
import ssl

cert = "ca.crt"
key = "ca.key"
context = ssl.create_default_context()
context.load_cert_chain(certfile=cert, keyfile=key, password="1q2w3e4r5t6y")
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
#context.verify_mode = ssl.CERT_REQUIRED #设置该模式会报错SSL: CERTIFICATE_VERIFY_FAILED

producer = KafkaProducer(
bootstrap_servers=["192.168.0.114:9093",
"192.168.0.114:9095",
"192.168.0.114:9097"],
ssl_context=context,
ssl_cafile="client.crt",
security_protocol="SSL",
)

confluent-kafka-python

同样的需求,confluent-kafka-python中设置enable.ssl.certificate.verification为False即可解决自签名证书问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from confluent_kafka import Consumer
from confluent_kafka import TopicPartition
from confluent_kafka import KafkaException


class ConfluentConsumer:
def __init__(self):
self.servers = ["192.168.0.114:9093",
"192.168.0.114:9095",
"192.168.0.114:9097"]
self.bootstrap_servers = ",".join(self.servers)

self.config = Consumer(
{
'bootstrap.servers': self.bootstrap_servers,
'group.id': 'hello',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'enable.ssl.certificate.verification': False,
'security.protocol': 'SSL',
'ssl.ca.location': '/home/top/cert/key/demoCA/client.crt',
'ssl.certificate.location': '/home/top/cert/key/ca.crt',
'ssl.key.location': '/home/top/cert/key/ca.key',
'ssl.key.password': '1q2w3e4r5t6y'
}
)

Zookeeper添加SSL配置

通过上面的配置,我们已经完成kafka的SSL通信,但Zookeeper中仍然是明文信息,接下来为Zookeeper也添加SSL配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 注释掉clientPort,改用secureClientPort
# clientPort=2181
secureClientPort=2181

serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider

ssl.keyStore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks
ssl.keyStore.password=1q2w3e4r5t6y
ssl.protocol=TLSv1.2
ssl.trustStore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks
ssl.trustStore.password=1q2w3e4r5t6y

sslQuorum=true
ssl.quorum.keyStore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks
ssl.quorum.keyStore.password=1q2w3e4r5t6y
ssl.quorum.trustStore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks
ssl.quorum.trustStore.password=1q2w3e4r5t6y

ssl.quorum.hostnameVerification=false
ssl.hostnameVerification=false
ssl.endpoint.identification.algorithm=

修改完Zookeeper配置,重启服务后,可以在日志中看到类似如下内容,证明服务启动正常:

1
2
3
INFO Received connection request from /192.168.0.114:59150 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
INFO Accepted TLS connection from /192.168.0.114:59150 - TLSv1.2 - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256 (org.apache.zookeeper.server.quorum.UnifiedServerSocket)
INFO Notification: 2 (message format version), 3 (n.leader), 0x600000123 (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x6 (n.peerEPoch), FOLLOWING (my state)0 (n.config version) (org.apache.zookeeper.server.quorum.FastLeaderElection)

同时kafka的配置文件也要做相应的修改并重启服务,感兴趣的小伙伴可以查看KIP-515了解背景:

1
2
3
4
5
6
7
8
9
10
11
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.client.enable=true
zookeeper.ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks
zookeeper.ssl.keystore.password=1q2w3e4r5t6y
zookeeper.ssl.protocol=TLSv1.2
zookeeper.ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks
zookeeper.ssl.truststore.password=1q2w3e4r5t6y

zookeeper.ssl.quorum.hostnameVerification=false
zookeeper.ssl.hostnameVerification=false
zookeeper.ssl.endpoint.identification.algorithm=

另外,我们需要创建一个kafka_op.properties配置用于操作,开启SSL后此前的命令已经无法获取信息:

1
2
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING

kafka_op.properties配置如下:

1
2
3
4
5
6
7
8
9
10
security.protocol=SSL
ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks
ssl.keystore.password=1q2w3e4r5t6y
ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks
ssl.truststore.password=1q2w3e4r5t6y
ssl.key.password=1q2w3e4r5t6y

ssl.enabled.protocols=TLSv1.2
enable.ssl.certificate.verification=false
ssl.endpoint.identification.algorithm=

可能你会问,为什么kafka_op.properties配置比上面的producer.propertiesconsumer.properties配置多了三行,如果不加会因为自签名证书校验问题报错

1
Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager) org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed

现在可以通过下面的命令进行测试:

1
2
3
4
5
6
7
8
# 列出当前topic列表
$ bin/kafka-topics.sh --list --bootstrap-server 192.168.0.114:9093 --command-config config/kafka_op.properties

# 创建Topic
$ bin/kafka-topics.sh --create --bootstrap-server 192.168.0.114:9093 --replication-factor 1 --partitions 1 --topic HelloWorld --command-config config/kafka_op.properties

# 删除Topic
$ bin/kafka-topics.sh --delete --topic HelloWorld --bootstrap-server 192.168.0.114:9093 --command-config config/kafka_op.properties

在执行命令时如果遇到类似警告,直接忽略即可,由kafka自身的问题造成,并不影响使用,详见issues KAFKA-10090,在2.8.0版本已经修复:

1
2
3
4
WARN The configuration 'ssl.truststore.location' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
WARN The configuration 'ssl.keystore.password' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
WARN The configuration 'ssl.enabled.protocols' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
....

ACL授权

通过上面的部署,我们对Zookeeper、kafka的通信都开启了SSL,那么接下来进行ACL授权。采用ssl.principal.mapping.rules规则将CN字段作为User名称,并配置超级用户可以对Kafka集群中的任何资源执行任何操作,在server.properties中添加如下内容后重启Broker:

1
2
3
4
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:www.applenice.net
ssl.principal.mapping.rules=RULE:^CN=([^,]*?),.*$/$1/

接下来,我们新建一个客户端配置client.properties:

1
2
3
4
5
6
7
8
security.protocol=SSL
ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/client.keystore.jks
ssl.keystore.password=1q2w3e4r5t6y
ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/client.truststore.jks
ssl.truststore.password=1q2w3e4r5t6y
ssl.key.password=1q2w3e4r5t6y
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=

直接使用该配置测试生产、消费时会因为ACL未授权无法访问topic。

1
2
3
4
5
6
7
$ bin/kafka-console-producer.sh --broker-list 192.168.0.114:9093 --topic HelloWorld --producer.config config/client.properties 
>eee
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [HelloWorld]

$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.114:9093 --topic HelloWorld --consumer.config config/client.properties --from-beginning
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [HelloWorld]
Processed a total of 0 messages

同时在logs/kafka-authorizer.log日志中也可以看到如下信息:

INFO Principal = User:client.applenice.net is Denied Operation = Describe from host = 192.168.0.114 on resource = Topic:LITERAL:HelloWorld for request = Metadata with resourceRefCount = 1 (kafka.authorizer.logger)

kafka中使用kafka-acls.sh脚本为SSL用户授予集群的权限,因为已经配置了Zookeeper SSL,需要建立一个创建zookeeper_client.properties文件,并写入如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
zookeeper.connect=192.168.0.114:2181,192.168.0.114:2182,192.168.0.114:2183
zookeeper.connection.timeout.ms=18000

zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.client.enable=true
zookeeper.ssl.keystore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.keystore.jks
zookeeper.ssl.keystore.password=1q2w3e4r5t6y
zookeeper.ssl.protocol=TLSv1.2
zookeeper.ssl.truststore.location=/home/top/KZ/kafka_2.13-2.6.3/cert/server.truststore.jks
zookeeper.ssl.truststore.password=1q2w3e4r5t6y

zookeeper.ssl.quorum.hostnameVerification=false
zookeeper.ssl.hostnameVerification=false
zookeeper.ssl.endpoint.identification.algorithm=

生成SSL证书时,已经指定了Distinguished Name Field,根据设置的RULE,对client证书的CN值设置ACL即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 生产者授权
$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:client.applenice.net --producer --topic 'HelloWorld' --zk-tls-config-file config/zookeeper_client.properties

# 消费者授权
$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:client.applenice.net --consumer --topic 'HelloWorld' --group '*' --zk-tls-config-file config/zookeeper_client.properties

#查看当前授权:
$ bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181 --zk-tls-config-file config/zookeeper_client.properties

#移除producer授权(如需要)
$ bin/kafka-acls.sh --remove --producer --topic HelloWorld --authorizer-properties zookeeper.connect=localhost:2181 --zk-tls-config-file config/zookeeper_client.properties

#移除consumer授权(如需要)
$ bin/kafka-acls.sh --remove --consumer --topic HelloWorld --group '*' --authorizer-properties zookeeper.connect=localhost:2181 --zk-tls-config-file config/zookeeper_client.properties

重新执行kafka-console-producer.shkafka-console-consumer.sh命令,可以看到能够正常生产、消费数据,符合预期效果。这里要说明一下,未设置ACL时,按照上面的Python程序配置,可以直接进行生产、消费,具体差别我还没研究清楚。从流量侧的表现来看,命令行程序产生的流量中是带有client.applenice.net的,Python程序产生的流量中并没有client.applenice.net,仅包含了www.applenice.net,能匹配上super.users应该就是出现问题的原因了。

到这里Kafka SSL通信加密与ACL授权就全部完成了,能够满足需求了😊😆。

参考

1、https://archive.apache.org/dist/kafka/2.7.0/RELEASE_NOTES.html
2、https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key
3、https://smallstep.com/hello-mtls/doc/server/kafka
4、https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication
5、https://zookeeper.apache.org/doc/r3.5.5/zookeeperAdmin.html
6、https://www.orchome.com/171
7、https://github.com/dpkp/kafka-python/issues/2005
8、https://github.com/edenhill/librdkafka/issues/2758
9、https://awesome-it.de/2022/05/21/kafka-security-mtls-acl-authorization/
10、https://www.ibm.com/docs/en/z-logdata-analytics/5.1.0?topic=zos-configuring-transport-layer-security-apache-kafka
11、https://stackoverflow.com/questions/51959495/how-to-create-kafka-python-producer-with-ssl-configuration
12、https://docs.vmware.com/en/VMware-Smart-Assurance/10.1.0/sa-ui-installation-config-guide-10.1.0/GUID-DF659094-60D3-4E1B-8D63-3DE3ED8B0EDF.html
13、https://knowledge.digicert.com/generalinformation/INFO1745.html
14、https://cwiki.apache.org/confluence/display/KAFKA/KIP-371%3A+Add+a+configuration+to+build+custom+SSL+principal+name
15、https://docs.cloudera.com/runtime/7.2.7/kafka-securing/topics/kafka-secure-principal-name-mapping.html
16、https://cwiki.apache.org/confluence/display/KAFKA/KIP-371%3A+Add+a+configuration+to+build+custom+SSL+principal+name
17、https://docs.confluent.io/platform/current/kafka/authorization.html
18、https://blog.csdn.net/u013887254/article/details/103217425
19、Kafka 核心技术与实战-云环境下的授权该怎么做?

0%