1 Star 0 Fork 2.6K

jiayuanchun / asyncTool

forked from 京东零售 / asyncTool 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

并发框架说明

有问题可以给作者发邮件说明,或者有特定的场景需求,感谢您的意见。wuweifeng10@jd.com

有对区块链感兴趣的,可以参考作者另一个GVP项目,java区块链底层入门

并发场景可能存在的需求之——任意编排

1 多个执行单元的串行请求

输入图片说明

2 多个执行单元的并行请求

输入图片说明

3 阻塞等待,串行的后面跟多个并行

输入图片说明

4 阻塞等待,多个并行的执行完毕后才执行某个

输入图片说明

5 串并行相互依赖

输入图片说明

6 复杂场景

输入图片说明

并发场景可能存在的需求之——每个执行结果的回调

传统的Future、CompleteableFuture一定程度上可以完成任务编排,并可以把结果传递到下一个任务。如CompletableFuture有then方法,但是却无法做到对每一个执行单元的回调。譬如A执行完毕成功了,后面是B,我希望A在执行完后就有个回调结果,方便我监控当前的执行状况,或者打个日志什么的。失败了,我也可以记录个异常信息什么的。

此时,传统的就无能为力了。

我的框架提供了这样的回调功能。并且,如果执行失败、超时,可以在定义这个执行单元时就设定默认值。

并发场景可能存在的需求之——执行顺序的强依赖和弱依赖

如上图的3,A和B并发执行,最后是C。

有些场景下,我们希望A和B都执行完毕后,才能执行C,CompletableFuture里有个allOf(futures...).then()方法可以做到。

有些场景下,我们希望A或者B任何一个执行完毕,就执行C,CompletableFuture里有个anyOf(futures...).then()方法可以做到。

我的框架同样提供了类似的功能,通过设定wrapper里的addDepend依赖时,可以指定依赖的任务是否must执行完毕。如果依赖的是must要执行的,那么就一定会等待所有的must依赖项全执行完毕,才执行自己。

如果依赖的都不是must,那么就可以任意一个依赖项执行完毕,就可以执行自己了。

并发场景可能存在的需求之——依赖上游的执行结果作为入参

譬如A-B-C三个执行单元,A的入参是String,出参是int,B呢它需要用A的结果作为自己的入参。也就是说A、B并不是独立的,而是有结果依赖关系的。

在A执行完毕之前,B是取不到结果的,只是知道A的结果类型。

那么,我的框架也支持这样的场景。可以在编排时,就取A的结果包装类,作为B的入参。虽然此时尚未执行,必然是空,但可以保证A执行完毕后,B的入参会被赋值。

并发场景可能存在的需求之——全组任务的超时

一组任务,虽然内部的各个执行单元的时间不可控,但是我可以控制全组的执行时间不超过某个值。通过设置timeOut,来控制全组的执行阈值。

并发场景可能存在的需求之——高性能、低线程数

该框架全程无锁,没有一个加锁的地方。

创建线程量少。输入图片说明 如这样的,A会运行在B、C执行更慢的那个单元的线程上,而不会额外创建线程。

async-Tool介绍

解决任意的多线程并行、串行、阻塞、依赖、回调的并发框架,可以任意组合各线程的执行顺序,还带全链路回调和超时控制。

其中的A、B、C分别是一个最小执行单元(worker),可以是一段耗时代码、一次Rpc调用等,不局限于你做什么。

该框架,可以将这些worker,按照你想要的各种执行顺序,加以组合编排。最终得到结果。

并且,该框架 为每一个worker都提供了执行结果的回调和执行失败后自定义默认值 。譬如A执行完毕后,A的监听器会收到回调,带着A的执行结果(成功、超时、异常)。

根据你的需求,将各个执行单元组合完毕后,开始在主线程执行并阻塞,直到最后一个执行完毕。并且 可以设置全组的超时时间

该框架支持后面的执行单元以前面的执行单元的结果为自己的入参 。譬如你的执行单元B的入参是ResultA,ResultA就是A的执行结果,那也可以支持。在编排时,就可以预先设定B或C的入参为A的result,即便此时A尚未开始执行。当A执行完毕后,自然会把结果传递到B的入参去。

该框架全程无锁。

基本组件

worker: 一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。

T,V两个泛型,分别是入参和出参类型。

譬如该耗时操作,入参是String,执行完毕的结果是Integer,那么就可以用泛型来定义。

多个不同的worker之间,没有关联,分别可以有不同的入参、出参类型。

/**
 * 每个最小执行单元需要实现该接口
 * @author wuweifeng wrote on 2019-11-19.
 */
public interface IWorker<T, V> {
    /**
     * 在这里做耗时操作,如rpc请求、IO等
     *
     * @param object
     *         object
     */
    V action(T object);

    /**
     * 超时、异常时,返回的默认值
     * @return 默认值
     */
    V defaultValue();
}

callBack:对每个worker的回调。worker执行完毕后,会回调该接口,带着执行成功、失败、原始入参、和详细的结果。

/**
 * 每个执行单元执行完毕后,会回调该接口</p>
 * 需要监听执行结果的,实现该接口即可
 * @author wuweifeng wrote on 2019-11-19.
 */
public interface ICallback<T, V> {

    void begin();

    /**
     * 耗时操作执行完毕后,就给value注入值
     *
     */
    void result(boolean success, T param, WorkResult<V> workResult);
}

wrapper:组合了worker和callback,是一个 最小的调度单元 。通过编排wrapper之间的关系,达到组合各个worker顺序的目的。

wrapper的泛型和worker的一样,决定了入参和结果的类型。

        WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
        WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
        WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
        WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3);

        workerWrapper.addNext(workerWrapper1, workerWrapper2);
        workerWrapper1.addNext(workerWrapper3);
        workerWrapper2.addNext(workerWrapper3);

输入图片说明

0执行完,同时1和2, 1\2都完成后3。3会等待2完成

譬如,你可以定义一个 worker

/**
 * @author wuweifeng wrote on 2019-11-20.
 */
public class ParWorker1 implements IWorker<String, String>, ICallback<String, String> {

    @Override
    public String action(String object) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "result = " + SystemClock.now() + "---param = " + object + " from 1";
    }

    @Override
    public String defaultValue() {
        return "worker1--default";
    }

    @Override
    public void begin() {
        //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
    }

    @Override
    public void result(boolean success, String param, WorkResult<String> workResult) {
        if (success) {
            System.out.println("callback worker1 success--" + SystemClock.now() + "----" + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        } else {
            System.err.println("callback worker1 failure--" + SystemClock.now() + "----"  + workResult.getResult()
                    + "-threadName:" +Thread.currentThread().getName());
        }
    }

}

通过这一个类看一下,action里就是你的耗时操作,begin就是任务开始执行时的回调,result就是worker执行完毕后的回调。当你组合了多个执行单元时,每一步的执行,都在掌控之内。失败了,还会有自定义的默认值。这是CompleteableFuture无法做到的。

安装教程

代码不多,直接拷贝包过去即可。

使用说明

  1. 3个任务并行

输入图片说明

        ParWorker w = new ParWorker();
        ParWorker1 w1 = new ParWorker1();
        ParWorker2 w2 = new ParWorker2();

        WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
        WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
        WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
        long now = SystemClock.now();
        System.out.println("begin-" + now);

        Async.beginWork(1500, workerWrapper, workerWrapper1, workerWrapper2);
//        Async.beginWork(800, workerWrapper, workerWrapper1, workerWrapper2);
//        Async.beginWork(1000, workerWrapper, workerWrapper1, workerWrapper2);

        System.out.println("end-" + SystemClock.now());
        System.err.println("cost-" + (SystemClock.now() - now));
        System.out.println(getThreadCount());

        System.out.println(workerWrapper.getWorkResult());
//        System.out.println(getThreadCount());
        Async.shutDown();
  1. 1个执行完毕后,开启另外两个,另外两个执行完毕后,开始第4个

输入图片说明

        ParWorker w = new ParWorker();
        ParWorker1 w1 = new ParWorker1();
        ParWorker2 w2 = new ParWorker2();
        w2.setSleepTime(2000);
        ParWorker3 w3 = new ParWorker3();

        WorkerWrapper<String, String> workerWrapper = new WorkerWrapper<>(w, "0", w);
        WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper<>(w1, "1", w1);
        WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper<>(w2, "2", w2);
        WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper<>(w3, "3", w3);

        workerWrapper.addNext(workerWrapper1, workerWrapper2);
        workerWrapper1.addNext(workerWrapper3);
        workerWrapper2.addNext(workerWrapper3);

        long now = SystemClock.now();
        System.out.println("begin-" + now);

        //正常完毕
        Async.beginWork(4100, workerWrapper);
        //3会超时
//        Async.beginWork(3100, workerWrapper);
        //2,3会超时
//        Async.beginWork(2900, workerWrapper);

        System.out.println("end-" + SystemClock.now());
        System.err.println("cost-" + (SystemClock.now() - now));

        System.out.println(getThreadCount());
        Async.shutDown();
  1. 复杂点的

输入图片说明

在测试类里能找到,下图是执行结果。看时间戳,就知道执行的顺序。每个执行单元都是睡1秒。

输入图片说明

  1. 其他的详见test包下的测试类,支持各种形式的组合、编排。
Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

解决任意的多线程并行、串行、阻塞、依赖、回调的并发框架,可以任意组合各线程的执行顺序,带全链路执行结果回调。多线程编排一站式解决方案。来自于京东,目前内测期,欢迎提issue 展开 收起
Java
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/jiayuanchun/asyncTool.git
git@gitee.com:jiayuanchun/asyncTool.git
jiayuanchun
asyncTool
asyncTool
master

搜索帮助