51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

数据传输工具 —— Kafka Connect

1、什么是 kafka connect?

  Kafka Connect 是一种用于在 kafka 和其他系统之间可扩展、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出 kafka 的连接器变得简单。

  Kafka Connect 可以获取整个数据库或从应用程序服务器收集指标到 kafka 主题,使数据可用于低延迟的流处理。

  导出作业可以将数据从 kafka topic 传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。


2、功能

  • kafka connector 通用框架,提供统一的集成 API

  • 同时支持分布式模式和单机模式

  • 自动化的 offset 管理,开发人员不必担心错误处理的影响

  • rest 接口,用来查看和管理 kafka connectors


3、概念

Connectors :通过管理任务来处理数据流的高级抽象
Tasks :数据写入 kafka 和从 kafka 读出的实现
Workers :运行 connectors 和 tasks 的进程
Converters :kafka connect 和其他存储系统直接发送和接收数据之间转换数据

  Connector 决定了数据要从哪里复制过来以及数据应该写到哪里去,一个 connector 实例是一个需要负责在 kafka 和其他系统之间复制数据的逻辑作业,connector plugin 是 jar 文件,实现了 kafka 定义的一些接口来完成特定的任务。

  Task 是 kafka connect 数据模型的主角,每一个 connector 都会协调一系列的 task 去执行任务,connector 可以把一项工作分割成许多的 task,然后再把 task 分发到各个 worker 中去执行(分布式模式下),task 不自己保存自己的状态信息,而是交给特定的 kafka 主题去保存(config.storage.topicstatus.storage.topic )。在分布式模式下有一个概念叫做任务再平衡(Task Rebalancing),当一个 connector 第一次提交到集群时,所有的 worker 都会做一个 task rebalancing 从而保证每一个 worker 都运行了差不多数量的工作,而不是所有的工作压力都集中在某个 worker 进程中,而当每个进程挂了之后也会执行 task rebalance。

  Connectors 和 Tasks 都是逻辑工作单位,必须安排在进程中执行,而在 kafka connect 中,这些进程就是 workers,分别有两种 worker:standalone、distributed。生产中 distributed worker 表现很棒,因为它提供了可扩展性以及自动容错的功能,可以用一个 group.id 来启动很多 worker 进程,在有效的 worker 进程中它们会自动地去协调执行 connector 和 task,如果新加或者挂了一个 worker,其他的 worker 会检测到然后再重新分配 connector 和 task。

  Converter 会把 bytes 数据转换为 kafka connect 内部的格式,也可以把 kafka connect 内部存储格式的数据变成 bytes,converter 对 connector 来说是解耦的,所以其他的 connector 都可以重用。例如使用了 avro converter,那么 jdbc connector 可以写 avro 格式的数据到 kafka,同时 hfds connector 也可以从 kafka 中读出 avro 格式的数据。





4、实战

  启动 confluent

cd /app/confluent/bin
./confluent local start

  使用 standalone 模式启动

# 启动 kafka connect
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
connector1.properties [connector2.properties]

  在 $CONFLUENT_HOME/etc/kafka 下存在很多配置文件

  其中 connect-standalone.properties 是启动 connect 服务组件自身的配置,内容如下:

# kafka 服务
bootstrap.servers=localhost:9092

# 转换器 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter

# 是否启用转换器 key.converter.schemas.enable=true value.converter.schemas.enable=true

# 偏移量存储文件名 offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000

# 插件路径 plugin.path=/usr/share/java,/app/confluent/share/confluent-hub-components

# 默认端口为8083,需要修改端口时启动以下配置 # rest.port=8084


(1)标准 connect

启动一个带 FileSource 的 Connect

  connect-file-source.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:

# connect 的名字
name=local-file-source
# 将文件读取到数据流中
connector.class=FileStreamSource
# 工作线程是 1 个
tasks.max=1
# 读取的文件名为 test.txt
file=test.txt
# 复制到的主题为 connect-test
topic=connect-test

  启动 connect

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties

  结果报错 Java 内存不足

  关闭虚拟机,加大内存,重启服务器和 confluent,再次启动 connect,报错 8083 端口已被绑定

  修改 connect-standalone.properties 配置中的端口为 8084 再启动,新的报错:不存在 source 配置文件中的指定的文件,在启动路径下创建文件,日志恢复正常

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

  可以通过 kafka tools 看到新增了主题 connect-test,写入了3条数据

  往文件中写入数据,会报告又成功提交一次偏移量

# 写数据
/app/confluent# echo -e "foo1\nbar1\n" >> test.txt

# 日志 INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515) ...

  然后可以看到主题中多了3条数据


启动带 FileSource 和 FileSink 的 Connect

  connect-file-sink.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:

# connect 的名字
name=local-file-sink
# 从数据流中读取数据到文件中
connector.class=FileStreamSink
# 工作线程是 1 个
tasks.max=1
# 写入的文件是 test.sink.txt
file=test.sink.txt
# 读取数据的主题是 connect-test
topics=connect-test

  启动 connect

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

  可以看到自动创建了 test.sink.txt 文件

  同时可以看到 consumer 中多了一个 connect-local-file-sink ,偏移量为6(即已将6条数据都 sink 到了文件中)




(2)REST API

  使用 Rest API 必须启动分布式模式,通过 Rest API 可以管理集群中的 connect 服务,默认端口是 8083。

GET /connectors - 返回所有正在运行的connector名。
POST /connectors - 新建一个connector;请求体必须是json格式并且需要包含name字段和config字段,name是connectors的名字,config是json格式,必须包含connector的配置信息。
GET /connectors/{name} - 获取指定connector的信息。
GET /connectors/{name}/config - 获取指定connector的配置信息。

  在分布式模式下,有两种方式来配置 connector,第一种是类似 standalone 模式一样,写好配置文件,然后在启动时指定

$CONFLUENT_HOME/bin/connect-distributed \
  $CONFLUENT_HOME/etc/kafka/connect-distributed.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

  另外一种方式更加灵活,就是直接通过 Rest API 来对 connector 配置进行增删查。

  查看 connectors

  添加 connectors

  查看某个 connector

  这里指定的文件是相对路径,所以要在 $CONFLUENT_HOME/bin 路径下创建一个 test-distributed.txt 文件

cd $CONFLUENT_HOME/bin
echo -e "foo\nbar\n" > test-distributed.txt

  可以看到出现了 connect-distributed 主题

  添加 sink

  从服务器可以看到产生了 sink 文件

  删除 connector

  再次往 test-distributed.txt 文件中追加数据,可以看到 connect-distributed 主题中的数据增加了,source connector 依然在工作,但是 sink connector 已经停止了,所以 test-distributed.sink.txt 文件中数据不再从主题中复制。

【注意】

  如果要在脚本中处理,发起HTTP请求,可以使用 curl 工具,将请求的配置在 json 文件中,如:

curl -d @$CONFLUENT_HOME/connect-file-sink.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

创建带有 Convert 的 connector

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-transformation.txt",
        "topic": "connect-transformation",
        "transforms": "MakeMap,InsertSource",
        "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.MakeMap.field": "line",
        "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertSource.static.field": "data_source",
        "transforms.InsertSource.static.value": "test-file-source"
    }
}

  添加 connector(由于跟上述实验 name 一致,所以需要先删除或者换个 name)

  创建 test-transformation.txt 文件,可以看到自动创建了 connect-transformation 主题

  添加 sink

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-transformation.sink.txt",
        "topics": "connect-transformation"
    }
}

  可以看到 sink 自动生成了 test-transformation.sink.txt 文件,并且内容不是 source 过来的原始数据,而是经过 convertor 处理后的带格式的数据


(3)MySQL Source、ESSink

  演示将数据从 MySQL 复制到 kafka 中,再通过 kafka 将数据下沉到 ElasticSearch。这里 MySQL 是数据源,所以需要支持 MySQL 的 source connector,ES 是目标数据系统,所以需要支持 ES 的 sink connectors,可以从 https://www.confluent.io/hub/ 下载。

MySQL

  MySQL 下载插件搜索关键字 "JDBC" ,可以看到提供了在线安装的脚本和离线安装的包下载。

  MySQL 环境准备

# 安装 MySQL
sudo apt-get install mysql-server

# 安装 Confluent 插件 confluent-hub install confluentinc/kafka-connect-jdbc:10.4.1

# 将 MySQL 驱动上传到 confluent 目录 # mv mysql.jar /app/confluent/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib

【注意】下载下来的 jdbc connector 插件,在处理 mysql 时需要相应的驱动,而插件不带驱动,实际采集数据时会报错,这时需要将驱动 jar 包拷贝到插件库目录中。

  数据准备,创建用户并授权,用该用户创建数据库、表和插入数据

grant all on *.* to hyh@'localhost' identified by 'hyh';
create database studentsDB;
use studentsDB;
create table students (rollno int primary key auto_increment, name varchar(30), marks varchar(30));
insert into students (name, marks) values ('James', 35);

  创建 source 配置文件(connect-mysql-source.properties),内容如下:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=hyh&password=hyh
mode=incrementing
# 表中的自增列字段
incrementing.column.name=rollno
# 表会被采集到的 topic 名前缀,比如表名叫 students,对应的 topic 就为 test-mysql-jdbc-students
topic.prefix=test-mysql-jdbc-

  启动 mysql source connector

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties

  可以看到启动之后,开启了 JDBC source task,然后执行了查询的 SQL,最后提交和刷新的偏移量

  与此同时,可以看到 kafka 中新增了一个 topic test-mysql-jdbc-students

  里面有一条数据,如果此时往表中再插入两条数据,可以看到数据变成了3条




ElasticSearch

  ES 下载插件搜索关键字 "ElasticSearch" ,可以看到有 ElasticSearch Sink Connector、ElasticSearch Source Connector,注意有些插件是支持 source、sink,有些是分开两个插件。

  ES 环境准备

tar -zxvf elasticsearch-7.6.0-linux-x86_64.tar.gz -C /app
mv /app/elasticsearch-7.6.0 /app/elasticsearch

# 配置环境变量 export ES_HOME=/app/elasticsearch export PATH=${ES_HOME}/bin:$PATH

# 安装 Confluent 插件 confluent-hub install confluentinc/kafka-connect-elasticsearch:13.0.0

  启动 ES

cd /app/elasticsearch
.bin/elasticsearch

  报错不能以root用户启动

  创建用户用户组es,并修改 es 安装目录所属用户和组

chown -R es:es elasticsearch/

  再次启动看到以下日志即正常

  配置 sink 配置文件(connect-es-sink.properties),内容如下:

name=test-sink-elastic
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
connection.url=http://localhost:9200
topics=test-mysql-jdbc-students 
key.ignore=true
type.name=kafka-connect

  启动 ES sink connector

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-es-sink.properties

  访问 es 9092 端口查询数据,可以查到有三条数据

# 查询命令
curl -H "Content-Type: application/json" -X GET http://localhost:9200/test-mysql-jdbc-students/_search

# 查到的结果 {     "took": 121,     "timed_out": false,     "_shards": {         "total": 1,         "successful": 1,         "skipped": 0,         "failed": 0     },     "hits": {         "total": {             "value": 3,             "relation": "eq"         },         "max_score": 1,         "hits": [             {                 "_index": "test-mysql-jdbc-students",                 "_type": "_doc",                 "_id": "test-mysql-jdbc-students+0+0",                 "_score": 1,                 "_source": {                     "rollno": 1,                     "name": "James",                     "marks": "35"                 }             },             {                 "_index": "test-mysql-jdbc-students",                 "_type": "_doc",                 "_id": "test-mysql-jdbc-students+0+1",                 "_score": 1,                 "_source": {                     "rollno": 2,                     "name": "James2",                     "marks": "36"                 }             },             {                 "_index": "test-mysql-jdbc-students",                 "_type": "_doc",                 "_id": "test-mysql-jdbc-students+0+2",                 "_score": 1,                 "_source": {                     "rollno": 3,                     "name": "James3",                     "marks": "37"                 }             }         ]     } }

  往数据库插入一条新的数据

insert into students (name, marks) values ('James4', 38);

  可以看到 es 侧接收到了这条数据

赞(17)
未经允许不得转载:工具盒子 » 数据传输工具 —— Kafka Connect