kafka安装与配置 {#kafka安装与配置}
下载kafka地址:https://kafka.apache.org/downloads
注意:
如果安装kafka时,提示Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=2.12.8'
发现从官网下的是源码。。。
不要下载带src的文件
正确文件名形如:kafka_2.13-2.7.0.tgz
ps:我这里下载的是Scala 2.12的
安装kafka {#安装kafka}
解压到指定路径
|-----------|-----------------------------------------------------------|
| 1
| tar -zxvf kafka_2.12-2.7.0.tgz -C /export/servers
|
给目录赋予权限
|-----------|-----------------------------------|
| 1
| sudo chmod -R 777 /export
|
发送安装包至其他节点
|-------------|-------------------------------------------------------------------------------------------------------|
| 1 2
| scp -r kafka_2.12-2.7.0 slave1:/export/servers scp -r kafka_2.12-2.7.0 slave2:/export/servers
|
启动kafka {#启动kafka}
-- 启动hadoop
|-----------|----------------------|
| 1
| start-all.sh
|
-- 启动Zookeeper
|-----------|---------------------------|
| 1
| zkServer.sh start
|
进入kafka_2.12-2.7.0目录执行以下命令
|-----------|------------------------------------------------------------------------|
| 1
| ./bin/kafka-server-start.sh -daemon ./config/server.properties
|
测试kafka {#测试kafka}
以下执行的命令都要进入kafka_2.12-2.7.0目录下执行
创建名为 test
的 Topic
。
|-----------|-------------------------------------------------------------------------------------------------------------------|
| 1
| bin/kafka-topics.sh -zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic test
|
--partitions:分区数
--replication-factor:副本数
查询 Topic
列表
|-----------|---------------------------------------------------------------|
| 1
| bin/kafka-topics.sh --list --zookeeper localhost:2181
|
配置 delete.topic.enable
为 true
,这样才能删除 topic
查询 Topic
的信息
|-----------|--------------------------------------------------------------------------------|
| 1
| bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
|
如果未指定 topic
则输出所有 topic
的信息
消息相关 {#消息相关}
生产者发送消息
|-----------|---------------------------------------------------------------------------------|
| 1
| bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
|
消费者查询消息
|-----------|------------------------------------------------------------------------------------------------------------------|
| 1
| bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group t1
|
--from-beginning:表示从头开始接收数据
--group:指定消费者组
查询名为 test 的 Topic 的消息。
|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3
| bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2
|
--time-1 表示要获取指定 topic 所有分区当前的最大位移(历史总消息数),--time-2 表示获取当前最早位移(被消费的消息数),两个命令的输出结果相减便可得到所有分区当前的消息总数。
输出示例:test:0:3
第一个数字0表示分区,第二个数字3表示偏移量。
关闭kafka
|-----------|-----------------------------------------------------------|
| 1
| bin/kafka-server-stop.sh config/server.properties
|
关闭Zookeeper
|-----------|--------------------------|
| 1
| zkServer.sh stop
|
关闭hadoop
|-----------|---------------------|
| 1
| stop-all.sh
|