51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Kafka基于SASL框架PLAIN认证机制配置用户密码认证

一、kafka简介

Kafka 被称为下一代分布式-订阅消息系统,是非营利性组织ASF(Apache Software Foundation,简称为ASF)基金会中的一个开源项目,比如HTTP Server、Hadoop、ActiveMQ、Tomcat等开源软件都属于Apache基金会的开源软件,类似的消息系统还有RbbitMQ、ActiveMQ、ZeroMQ,最主要的优势是其具备分布式功能、并且结合zookeeper可以实现动态扩容。

Apache Kafka 与传统消息系统相比,有以下不同:
文章源自小柒网-https://www.yangxingzhen.cn/9396.html

1)它被设计为一个分布式系统,易于向外扩展;
文章源自小柒网-https://www.yangxingzhen.cn/9396.html

2)它同时为发布和订阅提供高吞吐量;
文章源自小柒网-https://www.yangxingzhen.cn/9396.html

3)它支持多订阅者,当失败时能自动平衡消费者;
文章源自小柒网-https://www.yangxingzhen.cn/9396.html

4)它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。
文章源自小柒网-https://www.yangxingzhen.cn/9396.html

文章源自小柒网-https://www.yangxingzhen.cn/9396.html

二、安装JDK

JDK下载地址:https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html
文章源自小柒网-https://www.yangxingzhen.cn/9396.html

1、安装JDK-1.8
文章源自小柒网-https://www.yangxingzhen.cn/9396.html

[root@localhost ~]# yum -y install jdk-8u351-linux-x64.rpm
文章源自小柒网-https://www.yangxingzhen.cn/9396.html

2、查看是否安装成功
文章源自小柒网-https://www.yangxingzhen.cn/9396.html

[root@localhost ~]# java -version

java version "1.8.0_351"

Java(TM) SE Runtime Environment (build 1.8.0_351-b10)

Java HotSpot(TM) 64-Bit Server VM (build 25.351-b10, mixed mode)

三、安装Zookeeper

1、下载zookeeper安装包

[root@localhost ~]# wget -c https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz

2、解压并重命名

{#8013-1534041091339}[root@localhost ~]# tar xf apache-zookeeper-3.7.1-bin.tar.gz

{#4388-1534041439255}[root@localhost ~]# mv apache-zookeeper-3.7.1-bin /usr/local/zookeeper

{#7710-1534041192410}3、创建快照日志存放目录和事务日志存放

[root@localhost ~]# mkdir -p /usr/local/zookeeper/{data,logs}

{#6381-1534041200994}注:如果不配置dataLogDir,那么事务日志也会写在data目录中。这样会严重影响zookeeper的性能。因为在zookeeper吞吐量很高的时候,产生的事务日志和快照日志太多。

{#0082-1534041765302}[root@localhost ~]# cd /usr/local/zookeeper/conf

[root@localhost conf]# cp zoo_sample.cfg zoo.cfg

{#6379-1534041774689}[root@localhost conf]# vim zoo.cfg

{#1837-1534043115319}# 配置内容

# 服务器之间或客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒
tickTime=2000
# 集群中leader服务器与follower服务器第一次连接最多次数
initLimit=10
# 集群中leader服务器与follower服务器第一次连接最多次数
syncLimit=5
# 客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
clientPort=2181
# 存放数据文件
dataDir=/usr/local/zookeeper/data
# 存放日志文件
dataLogDir=/usr/local/zookeeper/logs

4、配置系统服务

[root@localhost conf]# vim /etc/systemd/system/zookeeper.service

[Unit]
Description=Zookeeper Server
After=network-online.target remote-fs.target nss-lookup.target
Wants=network-online.target

\[Service\]
Type=forking
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
User=root
Group=root


\[Install\]
WantedBy=multi-user.target

5、启动Zookeeper

[root@localhost conf]# systemctl daemon-reload

[root@localhost conf]# systemctl start zookeeper

[root@localhost conf]# systemctl enable zookeeper

6、查看Zookeeper端口和进程

[root@localhost conf]# netstat -lntup |grep 2181

[root@localhost conf]# ps -ef |grep zookeeper

7、查看Zookeeper状态

[root@localhost conf]# /usr/local/zookeeper/bin/zkServer.sh status

/bin/java

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Client port found: 2181. Client address: localhost. Client SSL: false.

Mode: standalone

四、安装Kafka

1、下载安装包

[root@localhost conf]# cd ~ && wget -c https://archive.apache.org/dist/kafka/3.2.1/kafka_2.13-3.2.1.tgz

2、解压并重命名

[root@localhost ~]# tar xf kafka_2.13-3.2.1.tgz

[root@localhost ~]# mv kafka_2.13-3.2.1 /usr/local/kafka

3、配置server.properties

[root@localhost ~]# vim /usr/local/kafka/config/server.properties

broker.id=0
listeners=PLAINTEXT://192.168.8.38:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
auto.create.topics.enable=true

4、配置系统服务

[root@localhost ~]# vim /etc/systemd/system/kafka.service

[Unit]
Description=Kafka Server
After=network-online.target remote-fs.target nss-lookup.target
Wants=network-online.target

\[Service\]
Type=forking
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
User=root
Group=root


\[Install\]
WantedBy=multi-user.target

5、启动Kafka

[root@localhost ~]# systemctl daemon-reload

[root@localhost ~]# systemctl start kafka

[root@localhost ~]# systemctl enable kafka

6、创建topic

创建名为test,partitions(分区)为10,replication(副本)为1的topic

[root@localhost ~]# /usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.8.38:9092 --partitions 10 --replication-factor 1 --topic test

Created topic test.

7、获取topic

[root@localhost ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 192.168.8.38:9092 --topic test

8、删除topic

[root@localhost ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 192.168.8.38:9092 --topic test

9、获取所有topic

[root@localhost ~]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.8.38:9092

10、kafka命令测试消息发送

1)创建topic

[root@localhost ~]# /usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.8.38:9092 --partitions 10 --replication-factor 1 --topic test

Created topic test.

2)发送消息

[root@localhost ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.8.38:9092 --topic test

>hello

>test

3)获取数据

[root@localhost ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.38:9092 --topic test --from-beginning

hello

test

五、配置PLAIN认证

1、修改server.properties

[root@localhost ~]# vim /usr/local/kafka/config/server.properties

broker.id=0
listeners=SASL_PLAINTEXT://192.168.8.38:9092
advertised.listeners=SASL_PLAINTEXT://192.168.8.38:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
auto.create.topics.enable=true

2、添加SASL配置文件

[root@localhost ~]# vim /usr/local/kafka/kafka_server_jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="Aa123456"
    user_admin="Aa123456"
    user_producer="Aa123456"
    user_consumer="Aa123456";
};

说明:该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule指定采用PLAIN机制,定义了用户。

usemame和password指定该代理与集群其他代理初始化连接的用户名和密码

"user_"为前缀后接用户名方式创建连接代理的用户名和密码,例如,user_producer="Aa123456"是指用户名为producer,密码为Aa123456

username为admin的用户,和user为admin的用户,密码要保持一致,否则会认证失败

上述配置中,创建了三个用户,分别为admin、producer和consumer(创建多少个用户,可根据业务需要配置,用户名和密码可自定义设置)

3、修改启动脚本

[root@localhost ~]# vim /usr/local/kafka/bin/kafka-server-start.sh

找到KAFKA_HEAP_OPTS,添加jvm参数为kafka_server_jaas.conf文件

-Djava.security.auth.login.config=/usr/local/kafka/kafka_server_jaas.conf

4、重启Kafka

[root@localhost ~]# systemctl restart kafka

[root@localhost ~]# systemctl status kafka

5、创建客户端认证配置文件

[root@localhost ~]# vim /usr/local/kafka/kafka_client_jaas.conf

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="producer"
    password="Aa123456";
};

注:这里配置用户名和密码需要和服务端配置的账号密码保持一致,这里配置了producer这个用户。

6、添加kafka-console-producer.sh认证文件

[root@localhost ~]# vim /usr/local/kafka/bin/kafka-console-producer.sh

找到"KAFKA_HEAP_OPTS",添加以下参数:

-Djava.security.auth.login.config=/usr/local/kafka/kafka_client_jaas.conf

7、添加kafka-console-consumer.sh认证文件

找到"KAFKA_HEAP_OPTS",添加以下参数:

-Djava.security.auth.login.config=/usr/local/kafka/kafka_client_jaas.conf

8、客户端验证

生产者

[root@localhost ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.8.38:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

>hello

>abc

>efg

消费者

[root@localhost ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.38:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

hello

abc

efg

结果:生产者可正常生产数据,消费者能消费到数据。
继续阅读

历史上的今天

7 月
22

赞(0)
未经允许不得转载:工具盒子 » Kafka基于SASL框架PLAIN认证机制配置用户密码认证