51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

京东多线程编排框架AsyncTool

并行框架的使用背景 {#并行框架的使用背景}

经常会有这样的调用场景:app(或web前端)调用后台的一个接口,该接口接到该请求后,需要调用其他多个微服务来获取数据,最终汇总一个最终结果返回给用户。

整如用户请求 我的订单",后台在收到请求后,就需要去调用用户详情RPC、商品详情RPC、库存RPC,优惠券等等很多个服务。有些服务是可以并行去请求的,但有些服务是依赖于某个服务的返回值的(如查库存、优惠券,就依赖于商品详情回复到达后才能去请求)。

CompleteableFuture VS AsyncTool {#completeablefuture-vs-asynctool}

CompleteableFuture大家都用过,里面有supply、then、combine、allOf等等方法,都可以用来接收一个任务,最终将多个任务汇总成一个结果。

但有一个问题,你supply一个任务后,这个任务就黑盒了。如果你编排了很多个任务,每一个任务的执行情况,执行到哪一步了,每一步的执行结果情况,我们是不知道的。只能等它最终执行完毕后,最后汇总结果。

一个并行框架,它最好是对每一步的执行都能监控。每一步的执行结果,无论成功与失败,它应该有个回调,才算完整。拥有回调的任务,可以监控任务的执行状况,如果执行失败、超时,可以记录异常信息或者处理个性化的默认值。

CompleteableFuture中也有一些回调方法,例如:thenAccept(),whenComplete(),handle(),exceptionally()等,这些方法也能支持任务的回调,但是前提是任务执行了,才能完成回调。在某些场景中,有些任务单元是可能被SKIP跳过不执行的,不执行的任务也应该有回调。

AsyncTool方法说明 {#asynctool方法说明}

Worker的定义如下,实现IWorker,ICallback函数式接口,并重写下面的4个方法。

4个方法的说明如下:

Worker的定义如下,实现IWorker,ICallback函数式接口,并重写下面的4个方法。4个方法的说明如下:

  1. begin():Worker开始执行前,先回调begin()。
  2. action():Worker中执行耗时操作的地方,比如RPC接口调用。
  3. result():action()执行完毕后,回调result方法,可以在此处处理action中的返回值。
  4. defaultValue():整个Worker执行异常,或者超时,会回调defaultValue(),Worker返回默认值。

如果没有实现ICallback,会默认执行DefaultCallback的回调方法。DefaultCallback是一个空的回调,里面没有任何逻辑。

  1. 通过执行器类Async的beginWork方法提交任务执行。

    • Timeout:全组任务超时时间设定,如果Worker任务超时,则Worker结果使用defaultValue()默认值。

    • ExecutorService executorService:自定义线程池,不自定义的话,就走默认的COMMON_POOL。默认的线程池是不定长线程池。

    • WorkerWrapper... workerWrapper:起始任务,可以是多个。注意不要提交中间节点的任务,只需要提交起始任务即可,编排的后续任务会自动执行。

定义几个测试worker:

package com.itjing.base.concurrency.jdasync;

import cn.hutool.core.date.SystemClock;
import cn.hutool.json.JSONUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;


import java.util.Map;


public class WorkerA implements IWorker\<Integer, Integer\>, ICallback\<Integer, Integer\> {


    <span class="hljs-comment">/**
     * Worker开始的时候先执行begin
     */</span>
    <span class="hljs-meta">@Override</span>
    <span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title function_">begin</span><span class="hljs-params">()</span> {
        System.out.println(<span class="hljs-string">"A - Thread:"</span> + Thread.currentThread().getName() + <span class="hljs-string">"- start --"</span> + SystemClock.now());
    }

    <span class="hljs-comment">/**
     * Worker中耗时操作在此执行RPC/IO
     * <span class="hljs-doctag">@param</span> object      object
     * <span class="hljs-doctag">@param</span> allWrappers 任务包装
     * <span class="hljs-doctag">@return</span>
     */</span>
    <span class="hljs-meta">@Override</span>
    <span class="hljs-keyword">public</span> Integer <span class="hljs-title function_">action</span><span class="hljs-params">(Integer object, Map&lt;String, WorkerWrapper&gt; allWrappers)</span> {
        <span class="hljs-type">Integer</span> <span class="hljs-variable">res</span> <span class="hljs-operator">=</span> object + <span class="hljs-number">1</span>;
        <span class="hljs-keyword">return</span> res;
    }

    <span class="hljs-comment">/**
     * action执行结果的回调
     * <span class="hljs-doctag">@param</span> success
     * <span class="hljs-doctag">@param</span> param
     * <span class="hljs-doctag">@param</span> workResult
     */</span>
    <span class="hljs-meta">@Override</span>
    <span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title function_">result</span><span class="hljs-params">(<span class="hljs-type">boolean</span> success, Integer param, WorkResult&lt;Integer&gt; workResult)</span> {
        System.out.println(<span class="hljs-string">"A - param:"</span> + JSONUtil.toJsonStr(param));
        System.out.println(<span class="hljs-string">"A - result:"</span> + JSONUtil.toJsonStr(workResult));
        System.out.println(<span class="hljs-string">"A - Thread:"</span> + Thread.currentThread().getName() + <span class="hljs-string">"- end --"</span> + SystemClock.now());
    }

    <span class="hljs-comment">/**
     * Worker异常时的回调
     * <span class="hljs-doctag">@return</span>
     */</span>
    <span class="hljs-meta">@Override</span>
    <span class="hljs-keyword">public</span> Integer <span class="hljs-title function_">defaultValue</span><span class="hljs-params">()</span> {
        System.out.println(<span class="hljs-string">"A - defaultValue"</span>);
        <span class="hljs-keyword">return</span> <span class="hljs-number">101</span>;
    }



`}
`

package com.itjing.base.concurrency.jdasync;

import cn.hutool.core.date.SystemClock;
import cn.hutool.json.JSONUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;

import java.util.Map;

public class WorkerB implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {

    /**
     * Worker开始的时候先执行begin
     */
    @Override
    public void begin() {
        System.out.println("B - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
    }

    /**
     * Worker中耗时操作在此执行RPC/IO
     *
     * @param object      object
     * @param allWrappers 任务包装
     * @return
     */
    @Override
    public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
        Integer res = object + 2;
        return res;
    }

    /**
     * action执行结果的回调
     *
     * @param success
     * @param param
     * @param workResult
     */
    @Override
    public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
        System.out.println("B - param:" + JSONUtil.toJsonStr(param));
        System.out.println("B - result:" + JSONUtil.toJsonStr(workResult));

        System.out.println("B - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
    }

    /**
     * Worker异常时的回调
     *
     * @return
     */
    @Override
    public Integer defaultValue() {
        System.out.println("B - defaultValue");
        return 102;
    }
}

package com.itjing.base.concurrency.jdasync;

import cn.hutool.core.date.SystemClock;
import cn.hutool.json.JSONUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;

import java.util.Map;

public class WorkerC implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {

    /**
     * Worker开始的时候先执行begin
     */
    @Override
    public void begin() {
        System.out.println("C - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
    }

    /**
     * Worker中耗时操作在此执行RPC/IO
     * @param object      object
     * @param allWrappers 任务包装
     * @return
     */
    @Override
    public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
        Integer res = object + 3;
        return res;
    }

    /**
     * action执行结果的回调
     * @param success
     * @param param
     * @param workResult
     */
    @Override
    public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
        System.out.println("C - param:" + JSONUtil.toJsonStr(param));
        System.out.println("C - result:" + JSONUtil.toJsonStr(workResult));

        System.out.println("C - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
    }

    /**
     * Worker异常时的回调
     * @return
     */
    @Override
    public Integer defaultValue() {
        System.out.println("C - defaultValue");
        return 103;
    }
}

开发常见场景 {#开发常见场景}

串行场景 {#串行场景}

next写法:

public static void main(String[] args) {
    //引入Worker工作单元
    WorkerA workerA = new WorkerA();
    WorkerB workerB = new WorkerB();
    WorkerC workerC = new WorkerC();

    <span class="hljs-comment">//包装Worker,编排串行顺序:C &lt;- B &lt;- A</span>
    <span class="hljs-comment">//C是最后一步,它没有next</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperC</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
    .id(<span class="hljs-string">"workerC"</span>)
    .worker(workerC)
    .callback(workerC)
    .param(<span class="hljs-number">3</span>)<span class="hljs-comment">//3+3</span>
    .build();
    <span class="hljs-comment">//B的next是C</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperB</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
    .id(<span class="hljs-string">"workerB"</span>)
    .worker(workerB)
    .callback(workerB)
    .param(<span class="hljs-number">2</span>)<span class="hljs-comment">//2+2</span>
    .next(wrapperC)
    .build();
    <span class="hljs-comment">//A的next是B</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperA</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
    .id(<span class="hljs-string">"workerA"</span>)
    .worker(workerA)
    .callback(workerA)
    .param(<span class="hljs-number">1</span>)<span class="hljs-comment">//1+1</span>
    .next(wrapperB)
    .build();

    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">//Action</span>
        Async.beginWork(<span class="hljs-number">1000</span>, wrapperA);
    } <span class="hljs-keyword">catch</span> (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }

    Async.shutDown();



`}
`

depend写法:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    //引入Worker工作单元
    WorkerA workerA = new WorkerA();
    WorkerB workerB = new WorkerB();
    WorkerC workerC = new WorkerC();
    //A没有depend
    WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
    .id("workerA")
    .worker(workerA)
    .callback(workerA)
    .param(1)
    .build();

    <span class="hljs-comment">//B的depend是A</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperB</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
    .id(<span class="hljs-string">"workerB"</span>)
    .worker(workerB)
    .callback(workerB)
    .param(<span class="hljs-number">2</span>)
    .depend(wrapperA)
    .build();

    <span class="hljs-comment">//C的depend是B</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperC</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
    .id(<span class="hljs-string">"workerC"</span>)
    .worker(workerC)
    .callback(workerC)
    .param(<span class="hljs-number">3</span>)
    .depend(wrapperB)
    .build();
    <span class="hljs-comment">//begin</span>
    Async.beginWork(<span class="hljs-number">1000</span>, wrapperA);



`}
`

并行场景 {#并行场景}

public static void main(String[] args) {
    //引入Worker工作单元
    WorkerA workerA = new WorkerA();
    WorkerB workerB = new WorkerB();
    WorkerC workerC = new WorkerC();

    <span class="hljs-comment">/**
     * 包装Worker,编排并行顺序
     */</span>

    <span class="hljs-comment">//A</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperA</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
    .id(<span class="hljs-string">"workerA"</span>)
    .worker(workerA)
    .callback(workerA)
    .param(<span class="hljs-number">1</span>)<span class="hljs-comment">//1+1</span>
    .build();
    <span class="hljs-comment">//B</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperB</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
    .id(<span class="hljs-string">"workerB"</span>)
    .worker(workerB)
    .callback(workerB)
    .param(<span class="hljs-number">2</span>)<span class="hljs-comment">//2+2</span>
    .build();
    <span class="hljs-comment">//C</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperC</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
    .id(<span class="hljs-string">"workerC"</span>)
    .worker(workerC)
    .callback(workerC)
    .param(<span class="hljs-number">3</span>)<span class="hljs-comment">//3+3</span>
    .build();
    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">//3个WorkerWrapper一起begin</span>
        Async.beginWork(<span class="hljs-number">1000</span>, wrapperA, wrapperB, wrapperC);
    } <span class="hljs-keyword">catch</span> (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }



`}
`

阻塞等待 - 先串行,后并行 {#阻塞等待---先串行后并行}

改造WorkerB、WorkerC的action:

public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
    WorkerWrapper workerA = allWrappers.get("workerA");
    System.out.println("获取workerA的结果:" + JSONUtil.toJsonStr(workerA));
    Integer res = 0;
    if (Objects.nonNull(workerA)) {
        Integer result = (Integer) workerA.getWorkResult().getResult();
        res = 2;
        res += result;
    } else {
        res = object + 2;
    }
    return res;
}

next写法:

public static void nextWork() {
    //引入Worker工作单元
    WorkerA workerA = new WorkerA();
    WorkerB workerB = new WorkerB();
    WorkerC workerC = new WorkerC();

    <span class="hljs-comment">//C是最后一步,它没有next</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperC</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
            .id(<span class="hljs-string">"workerC"</span>)
            .worker(workerC)
            .callback(workerC)
            .param(<span class="hljs-literal">null</span>)<span class="hljs-comment">//没有参数,根据A的返回值+3</span>
            .build();
    <span class="hljs-comment">//B是最后一步,它没有next</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperB</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
            .id(<span class="hljs-string">"workerB"</span>)
            .worker(workerB)
            .callback(workerB)
            .param(<span class="hljs-literal">null</span>)<span class="hljs-comment">//没有参数,根据A的返回值+2</span>
            .build();
    <span class="hljs-comment">//A的next是B、C</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperA</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
            .id(<span class="hljs-string">"workerA"</span>)
            .worker(workerA)
            .callback(workerA)
            .param(<span class="hljs-number">1</span>)<span class="hljs-comment">//1+1</span>
      			<span class="hljs-comment">//next是B、C</span>
            .next(wrapperB, wrapperC)
            .build();


    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">//Action</span>
        Async.beginWork(<span class="hljs-number">1000</span>, wrapperA);
    } <span class="hljs-keyword">catch</span> (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }



`}
`

depend写法:

//A没有depend,就是开始
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
        .id("workerA")
        .worker(workerA)
        .callback(workerA)
        .param(1)
        .build();
//C depend A`
`WorkerWrapper` `wrapperC` `=` `new` `WorkerWrapper`.Builder<Integer, Integer>()
.id(`"workerC"`)
.worker(workerC)
.callback(workerC)
.param(`null`)
.depend(wrapperA)
.build();
`//B depend A`
`WorkerWrapper` `wrapperB` `=` `new` `WorkerWrapper`.Builder<Integer, Integer>()
.id(`"workerB"`)
.worker(workerB)
.callback(workerB)
.param(`null`)
.depend(wrapperA)
.build();
`

阻塞等待 - 先并行,后串行 {#阻塞等待---先并行后串行}

改造WorkerA的action方法:

public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
    WorkerWrapper workerB = allWrappers.get("workerB");
    System.out.println("获取workerB的结果:" + JSONUtil.toJsonStr(workerB));
    WorkerWrapper workerC = allWrappers.get("workerC");
    System.out.println("获取workerC的结果:" + JSONUtil.toJsonStr(workerC));
    Integer res = null;
    if (Objects.nonNull(workerB) && Objects.nonNull(workerC)) {
        Integer resultB = (Integer) workerB.getWorkResult().getResult();
        Integer resultC = (Integer) workerC.getWorkResult().getResult();
        res = resultB + resultC;
    } else {
        res = object + 1;
    }
    return res;
}

next写法:

public static void nextWork() {

    <span class="hljs-comment">//引入Worker工作单元</span>
    <span class="hljs-type">WorkerA</span> <span class="hljs-variable">workerA</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerA</span>();
    <span class="hljs-type">WorkerB</span> <span class="hljs-variable">workerB</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerB</span>();
    <span class="hljs-type">WorkerC</span> <span class="hljs-variable">workerC</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerC</span>();

    <span class="hljs-comment">//A是最后一步,没有next</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperA</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
            .id(<span class="hljs-string">"workerA"</span>)
            .worker(workerA)
            .callback(workerA)
            .param(<span class="hljs-literal">null</span>)<span class="hljs-comment">//参数是null,A = B + C</span>
            .build();

    <span class="hljs-comment">//C next A</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperC</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
            .id(<span class="hljs-string">"workerC"</span>)
            .worker(workerC)
            .callback(workerC)
            .param(<span class="hljs-number">3</span>)<span class="hljs-comment">//3+3 = 6</span>
            .next(wrapperA)
            .build();
    <span class="hljs-comment">//B next A</span>
    <span class="hljs-type">WorkerWrapper</span> <span class="hljs-variable">wrapperB</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">WorkerWrapper</span>.Builder&lt;Integer, Integer&gt;()
            .id(<span class="hljs-string">"workerB"</span>)
            .worker(workerB)
            .callback(workerB)
            .param(<span class="hljs-number">2</span>)<span class="hljs-comment">//2+2 = 4</span>
            .next(wrapperA)
            .build();

    <span class="hljs-keyword">try</span> {<span class="hljs-keyword">new</span> <span class="hljs-title class_">SynchronousQueue</span>&lt;Runnable&gt;();
        <span class="hljs-comment">//Action</span>
        Async.beginWork(<span class="hljs-number">4000</span>, wrapperB, wrapperC);
    } <span class="hljs-keyword">catch</span> (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }



`}
`

depend写法:

//C没有depend,是起始节点
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
        .id("workerC")
        .worker(workerC)
        .callback(workerC)
        .param(3)//3+3 = 6
        .build();
//B没有depend,是起始节点
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
        .id("workerB")
        .worker(workerB)
        .callback(workerB)
        .param(2)//2+2 = 4
        .build();
//A depend B,C`
`WorkerWrapper` `wrapperA` `=` `new` `WorkerWrapper`.Builder<Integer, Integer>()
.id(`"workerA"`)
.worker(workerA)
.callback(workerA)
.param(`null`)`//参数是null,A = B + C`
.depend(wrapperB, wrapperC)
.build();
`

赞(10)
未经允许不得转载:工具盒子 » 京东多线程编排框架AsyncTool