51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

如何开发一个自己的datax插件

# (一)概述 {#一-概述}

DataX采用FrameWork+plugin的方式,插件只需关心数据的读取或者写入本身。而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。在写插件前官方建议先看一遍开发文档。

Datax开发文档:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md

本文将带大家一起开发一个读 Http 接口的 Reader。

# (二)开发一个HttpReader {#二-开发一个httpreader}

1、从git上将项目clone下来 https://github.com/alibaba/DataX

2、新建一个module,选最基本的maven项目,项目名定义为xxxreader或者xxxwriter

3、从任意模块里将pom.xml中的以及中的内容复制,删除无用的依赖,引入新模块需要的依赖

4、将任何一个相同模块(reader或者writer)里的resources中的plugin.json和plugin_job_template.json复制到resources目录下

5、修改plugin.json,主要修改name和class

6、修改plugin_job_template.json,这是datax脚本的结构

7、将任何一个相同模块(reader或者writer)里的assembly复制过来,修改下述内容

8、开始编码

编码之前先看一遍DataX开发文档,再结合已有的插件开发就没什么大问题。下面这段HttpReader实现读取接口中的数据,并作为后续Writer的输入,核心代码如下:

public class HttpReader extends Reader {
    public static class Job extends Reader.Job{
        private static final Logger LOG = LoggerFactory
                .getLogger(Job.class);

        Configuration pluginJobConf=null;
        @Override
        public void init() {
            this.pluginJobConf = super.getPluginJobConf();
            LOG.info("pluginJobConfig",pluginJobConf);
        }
        @Override
        public List<Configuration> split(int adviceNumber) {
            LOG.info("adviceNumber:{}",adviceNumber);
            List<Configuration> configurations = new ArrayList<Configuration>();
            for (int i = 0; i < adviceNumber; i++) {
                configurations.add(this.pluginJobConf.clone());
            }
            return configurations;
        }
        @Override
        public void destroy() {
        }
    }

    public static class Task extends Reader.Task{
        private static final Logger LOG = LoggerFactory.getLogger(Task.class);
        private Configuration configuration;
        private String httpUrl=null;
        private String httpType=null;
        private String httpParamJson=null;
        private List<String> dataColumn=null;

        @Override
        public void init() {
            this.configuration = super.getPluginJobConf();
            this.httpUrl = this.configuration.getString("httpUrl");
            this.httpType = this.configuration.getString("httpType");
            this.httpParamJson = this.configuration.getString("httpParamJson");
            this.dataColumn = this.configuration.getList("dataColumn",String.class);
        }

        @Override
        public void startRead(RecordSender recordSender) {
            LOG.info("------------------- http reader log -----------------------");
            LOG.info(this.httpUrl);
            LOG.info(this.httpType);
            LOG.info(this.httpParamJson);
            //调用获取
            String result=null;
            try {
                if (Key.GET.equalsIgnoreCase(this.httpType)){
                    //get请求调用
                    result = HttpUtil.get(this.httpUrl);
                }else if (Key.POST.equalsIgnoreCase(this.httpType)){
                    //post请求调用
                    result = HttpUtil.post(this.httpUrl,this.httpParamJson,60);
                }else{
                    throw DataXException.asDataXException(HttpReaderErrorCode.NOT_SUPPORTED_ERROR,"该方法暂时不支持");
                }
            }catch (Exception e){
               throw DataXException.asDataXException(HttpReaderErrorCode.REQUEST_CALL_FAILED,"请求调用失败");
            }
            Record record=null;
            //将结果塞到record中
            JSONObject jsonResult = JSON.parseObject(result);
            JSONArray array = jsonResult.getJSONArray("data");
            for (int i = 0; i < array.size(); i++) {
                JSONObject jsonObject = array.getJSONObject(i);
                record=recordSender.createRecord();
                for (String data:this.dataColumn){
                    record.addColumn(new StringColumn(jsonObject.getString(data)));
                }
                recordSender.sendToWriter(record);
            }

            recordSender.flush();
        }
        @Override
        public void destroy() {

        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

9、编码完成后,通过maven打包

根目录的pom.xml的module去掉其他的reader和writer,留下common、core、transformer和刚刚写的httpReader

执行命令:

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

1

10、本地启动

对core文件夹下的Engine类配置启动参数即可直接在本地调用DataX插件

-Ddatax.home=/Users/ly/IdeaProjects/DataX-master/target/datax/datax

-job /Users/ly/IdeaProjects/DataX-master/http2stream.json -jobid 1

/Users/ly/IdeaProjects/DataX-master

1
2
3
4
5

11、配置脚本

配置一个从httpReader读,控制台输出的脚本,读取接口中key为a1、a2、a3的数据

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "httpreader",
          "parameter": {
            "httpUrl": "http://127.0.0.1:8081/test",
            "httpType": "POST",
            "httpParamJson":"{\"aa\":\"aa\"}",
            "dataColumn": [
              "a1","a2","a3"
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

在本地起一个简单的测试接口:

@PostMapping("test")
public String test() {
    String jsonResult = "{\n" +
            "    \"data\":[\n" +
            "        {\n" +
            "            \"a1\":\"a11\"\n" +
            "        },\n" +
            "        {\n" +
            "            \"a2\":\"a22\"\n" +
            "        },\n" +
            "        {\n" +
            "            \"a3\":\"a333\"\n" +
            "        }\n" +
            "    ]\n" +
            "}";
    return jsonResult;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

调用成功获取到接口中的数据:

# (三)总结 {#三-总结}

至此,一个简单的 Datax 插件就开发完成了。可以看出 DataX 的扩展性还是很强的,官网上的这些插件自己基本上都能写出来,值得大家去学习一下。

赞(4)
未经允许不得转载:工具盒子 » 如何开发一个自己的datax插件