callback;
/**
* 标记该事件是否已经被处理过了,譬如已经超时返回false了,后续rpc又收到返回值了,则不再二次回调
* 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取
*
* 1-finish, 2-error, 3-working
*/
- private AtomicInteger state = new AtomicInteger(0);
+ protected final AtomicInteger state = new AtomicInteger(0);
/**
- * 该map存放所有wrapper的id和wrapper映射
+ * 也是个钩子变量,用来存临时的结果
*/
- private Map forParamUseWrappers;
+ protected volatile WorkResult workResult = WorkResult.defaultResult();
/**
- * 也是个钩子变量,用来存临时的结果
+ * 该map存放所有wrapper的id和wrapper映射
+ *
+ * 需要线程安全。
*/
- private volatile WorkResult workResult = WorkResult.defaultResult();
+ private Map> forParamUseWrappers;
/**
- * 是否在执行自己前,去校验nextWrapper的执行结果
- * 1 4
- * -------3
- * 2
- * 如这种在4执行前,可能3已经执行完毕了(被2执行完后触发的),那么4就没必要执行了。
- * 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的
+ * 各种策略的封装类。
+ *
+ * 其实是因为加功能太多导致这个对象大小超过了128Byte,所以强迫症的我不得不把几个字段丢到策略类里面去。
+ * ps: 大小超过128Byte令我(TcSnZh)难受的一比,就像走在草坪的格子上,一步嫌小、两步扯蛋。
+ * IDEA可以使用JOL Java Object Layout插件查看对象大小。
*/
- private volatile boolean needCheckNextWrapperResult = true;
+ private final WrapperStrategy wrapperStrategy = new WrapperStrategy();
- private static final int FINISH = 1;
- private static final int ERROR = 2;
- private static final int WORKING = 3;
- private static final int INIT = 0;
+ // ***** state属性的常量值 *****
- private WorkerWrapper(String id, IWorker worker, T param, ICallback callback) {
+ public static final int FINISH = 1;
+ public static final int ERROR = 2;
+ public static final int WORKING = 3;
+ public static final int INIT = 0;
+
+ WorkerWrapper(String id, IWorker worker, T param, ICallback callback) {
if (worker == null) {
throw new NullPointerException("async.worker is null");
}
@@ -92,66 +82,44 @@ public class WorkerWrapper {
this.callback = callback;
}
+ // ========== public ==========
+
/**
- * 开始工作
- * fromWrapper代表这次work是由哪个上游wrapper发起的
+ * 外部调用本线程运行此Wrapper的入口方法。
+ *
+ * @param executorService 该ExecutorService将成功运行后,在nextWrapper有多个时被使用于多线程调用。
+ * @param remainTime 剩下的时间
+ * @param forParamUseWrappers 用于保存经过的wrapper的信息的Map,key为id。
+ * @param inspector wrapper调度检查器
*/
- private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map forParamUseWrappers) {
- this.forParamUseWrappers = forParamUseWrappers;
- //将自己放到所有wrapper的集合里去
- forParamUseWrappers.put(id, this);
- long now = SystemClock.now();
- //总的已经超时了,就快速失败,进行下一个
- if (remainTime <= 0) {
- fastFail(INIT, null);
- beginNext(executorService, now, remainTime);
- return;
- }
- //如果自己已经执行过了。
- //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
- if (getState() == FINISH || getState() == ERROR) {
- beginNext(executorService, now, remainTime);
- return;
- }
-
- //如果在执行前需要校验nextWrapper的状态
- if (needCheckNextWrapperResult) {
- //如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了
- if (!checkNextWrapperResult()) {
- fastFail(INIT, new SkippedException());
- beginNext(executorService, now, remainTime);
- return;
- }
- }
-
- //如果没有任何依赖,说明自己就是第一批要执行的
- if (dependWrappers == null || dependWrappers.size() == 0) {
- fire();
- beginNext(executorService, now, remainTime);
- return;
- }
+ public void work(ExecutorService executorService,
+ long remainTime,
+ Map> forParamUseWrappers,
+ WrapperEndingInspector inspector) {
+ work(executorService, null, remainTime, forParamUseWrappers, inspector);
+ }
- /*如果有前方依赖,存在两种情况
- 一种是前面只有一个wrapper。即 A -> B
- 一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。
- 所以需要B来做判断,必须A、C、D都完成,自己才能执行 */
-
- //只有一个依赖
- if (dependWrappers.size() == 1) {
- doDependsOneJob(fromWrapper);
- beginNext(executorService, now, remainTime);
- } else {
- //有多个依赖时
- doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
- }
+ public String getId() {
+ return id;
+ }
+ public WorkResult getWorkResult() {
+ return workResult;
}
+ public void setParam(T param) {
+ this.param = param;
+ }
- public void work(ExecutorService executorService, long remainTime, Map forParamUseWrappers) {
- work(executorService, null, remainTime, forParamUseWrappers);
+ public int getState() {
+ return state.get();
}
+ /**
+ * 获取之后的下游Wrapper
+ */
+ public abstract Set> getNextWrappers();
+
/**
* 总控制台超时,停止所有任务
*/
@@ -162,448 +130,395 @@ public class WorkerWrapper {
}
/**
- * 判断自己下游链路上,是否存在已经出结果的或已经开始执行的
- * 如果没有返回true,如果有返回false
+ * 快速失败
+ *
+ * @return 已经失败则返回false,如果刚才设置为失败了则返回true。
*/
- private boolean checkNextWrapperResult() {
- //如果自己就是最后一个,或者后面有并行的多个,就返回true
- if (nextWrappers == null || nextWrappers.size() != 1) {
- return getState() == INIT;
+ protected boolean fastFail(int expect, Exception e) {
+ //试图将它从expect状态,改成Error
+ if (!compareAndSetState(expect, ERROR)) {
+ return false;
+ }
+
+ //尚未处理过结果
+ if (checkIsNullResult()) {
+ if (e == null) {
+ workResult.setResultState(ResultState.TIMEOUT);
+ } else {
+ workResult.setResultState(ResultState.EXCEPTION);
+ workResult.setEx(e);
+ }
+ workResult.setResult(worker.defaultValue());
}
- WorkerWrapper nextWrapper = nextWrappers.get(0);
- boolean state = nextWrapper.getState() == INIT;
- //继续校验自己的next的状态
- return state && nextWrapper.checkNextWrapperResult();
+ callback.result(false, param, workResult);
+ return true;
}
/**
- * 进行下一个任务
+ * 判断{@link #state}状态是否是初始值。
*/
- private void beginNext(ExecutorService executorService, long now, long remainTime) {
- //花费的时间
- long costTime = SystemClock.now() - now;
- if (nextWrappers == null) {
- return;
- }
- if (nextWrappers.size() == 1) {
- nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
- return;
- }
- CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
- for (int i = 0; i < nextWrappers.size(); i++) {
- int finalI = i;
- futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
- .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
- }
- try {
- CompletableFuture.allOf(futures).get();
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
+ protected boolean checkIsNullResult() {
+ return ResultState.DEFAULT == workResult.getResultState();
}
- private void doDependsOneJob(WorkerWrapper dependWrapper) {
- if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
- workResult = defaultResult();
- fastFail(INIT, null);
- } else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
- workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
- fastFail(INIT, null);
- } else {
- //前面任务正常完毕了,该自己了
- fire();
- }
+ protected boolean compareAndSetState(int expect, int update) {
+ return this.state.compareAndSet(expect, update);
}
- private synchronized void doDependsJobs(ExecutorService executorService, List dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
- boolean nowDependIsMust = false;
- //创建必须完成的上游wrapper集合
- Set mustWrapper = new HashSet<>();
- for (DependWrapper dependWrapper : dependWrappers) {
- if (dependWrapper.isMust()) {
- mustWrapper.add(dependWrapper);
- }
- if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
- nowDependIsMust = dependWrapper.isMust();
- }
- }
-
- //如果全部是不必须的条件,那么只要到了这里,就执行自己。
- if (mustWrapper.size() == 0) {
- if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
- fastFail(INIT, null);
- } else {
- fire();
- }
- beginNext(executorService, now, remainTime);
+ /**
+ * 工作的核心方法。
+ *
+ * @param fromWrapper 代表这次work是由哪个上游wrapper发起的。如果是首个Wrapper则为null。
+ * @param remainTime 剩余时间。
+ */
+ protected void work(ExecutorService executorService,
+ WorkerWrapper, ?> fromWrapper,
+ long remainTime,
+ Map> forParamUseWrappers,
+ WrapperEndingInspector inspector) {
+ this.setForParamUseWrappers(forParamUseWrappers);
+ //将自己放到所有wrapper的集合里去
+ forParamUseWrappers.put(id, this);
+ long now = SystemClock.now();
+ //总的已经超时了,就快速失败,进行下一个
+ if (remainTime <= 0) {
+ fastFail(INIT, null);
+ beginNext(executorService, now, remainTime, inspector);
return;
}
-
- //如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干
- if (!nowDependIsMust) {
+ //如果自己已经执行过了。
+ //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
+ if (getState() == FINISH || getState() == ERROR) {
+ beginNext(executorService, now, remainTime, inspector);
return;
}
- //如果fromWrapper是必须的
- boolean existNoFinish = false;
- boolean hasError = false;
- //先判断前面必须要执行的依赖任务的执行结果,如果有任何一个失败,那就不用走action了,直接给自己设置为失败,进行下一步就是了
- for (DependWrapper dependWrapper : mustWrapper) {
- WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
- WorkResult tempWorkResult = workerWrapper.getWorkResult();
- //为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完
- if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {
- existNoFinish = true;
- break;
- }
- if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
- workResult = defaultResult();
- hasError = true;
- break;
- }
- if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
- workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
- hasError = true;
- break;
- }
-
- }
- //只要有失败的
- if (hasError) {
- fastFail(INIT, null);
- beginNext(executorService, now, remainTime);
+ // 判断是否要跳过自己,该方法可能会跳过正在工作的自己。
+ final WrapperStrategy wrapperStrategy = getWrapperStrategy();
+ if (wrapperStrategy.shouldSkip(getNextWrappers(), this, fromWrapper)) {
+ fastFail(INIT, new SkippedException());
+ beginNext(executorService, now, remainTime, inspector);
return;
}
- //如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working
- //都finish的话
- if (!existNoFinish) {
- //上游都finish了,进行自己
+ //如果没有任何依赖,说明自己就是第一批要执行的
+ final Set> dependWrappers = getDependWrappers();
+ if (dependWrappers == null || dependWrappers.size() == 0) {
fire();
- beginNext(executorService, now, remainTime);
+ beginNext(executorService, now, remainTime, inspector);
return;
}
- }
- /**
- * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
- */
- private void fire() {
- //阻塞取结果
- workResult = workerDoJob();
+ DependenceAction.WithProperty judge = wrapperStrategy.judgeAction(dependWrappers, this, fromWrapper);
+
+ switch (judge.getDependenceAction()) {
+ case TAKE_REST:
+ inspector.reduceWrapper(this);
+ return;
+ case FAST_FAIL:
+ switch (judge.getResultState()) {
+ case TIMEOUT:
+ fastFail(INIT, null);
+ break;
+ case EXCEPTION:
+ fastFail(INIT, judge.getFastFailException());
+ break;
+ default:
+ fastFail(INIT, new RuntimeException("ResultState " + judge.getResultState() + " set to FAST_FAIL"));
+ break;
+ }
+ beginNext(executorService, now, remainTime, inspector);
+ break;
+ case START_WORK:
+ fire();
+ beginNext(executorService, now, remainTime, inspector);
+ break;
+ case JUDGE_BY_AFTER:
+ default:
+ inspector.reduceWrapper(this);
+ throw new IllegalStateException("策略配置错误,不应当在WorkerWrapper中返回JUDGE_BY_AFTER或其他无效值 : this=" + this + ",fromWrapper=" + fromWrapper);
+ }
}
/**
- * 快速失败
+ * 进行下一个任务
*/
- private boolean fastFail(int expect, Exception e) {
- //试图将它从expect状态,改成Error
- if (!compareAndSetState(expect, ERROR)) {
- return false;
+ protected void beginNext(ExecutorService executorService, long now, long remainTime, WrapperEndingInspector inspector) {
+ //花费的时间
+ final long costTime = SystemClock.now() - now;
+ final long nextRemainTIme = remainTime - costTime;
+ Set> nextWrappers = getNextWrappers();
+ if (nextWrappers == null) {
+ inspector.setWrapperEndWithTryPolling(this);
+ return;
}
-
- //尚未处理过结果
- if (checkIsNullResult()) {
- if (e == null) {
- workResult = defaultResult();
- } else {
- workResult = defaultExResult(e);
+ // nextWrappers只有一个,就用本线程继续跑。
+ if (nextWrappers.size() == 1) {
+ try {
+ WorkerWrapper, ?> next = nextWrappers.stream().findFirst().get();
+ inspector.addWrapper(next);
+ next.work(executorService, WorkerWrapper.this, nextRemainTIme, getForParamUseWrappers(), inspector);
+ } finally {
+ inspector.setWrapperEndWithTryPolling(this);
}
+ return;
+ }
+ // nextWrappers有多个
+ try {
+ inspector.addWrapper(nextWrappers);
+ nextWrappers.forEach(next -> {
+ executorService.submit(() -> next.work(executorService, this, nextRemainTIme, getForParamUseWrappers(), inspector));
+ });
+ } finally {
+ inspector.setWrapperEndWithTryPolling(this);
}
-
- callback.result(false, param, workResult);
- return true;
}
/**
- * 具体的单个worker执行任务
+ * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
*/
- private WorkResult workerDoJob() {
+ protected void fire() {
+ //阻塞取结果
//避免重复执行
if (!checkIsNullResult()) {
- return workResult;
+ return;
}
try {
//如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行
if (!compareAndSetState(INIT, WORKING)) {
- return workResult;
+ return;
}
-
callback.begin();
-
//执行耗时操作
- V resultValue = worker.action(param, forParamUseWrappers);
-
+ V resultValue = resultValue = (V) worker.action(param, (Map) getForParamUseWrappers());
//如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) {
- return workResult;
+ return;
}
-
workResult.setResultState(ResultState.SUCCESS);
workResult.setResult(resultValue);
//回调成功
callback.result(true, param, workResult);
-
- return workResult;
} catch (Exception e) {
//避免重复回调
if (!checkIsNullResult()) {
- return workResult;
+ return;
}
fastFail(WORKING, e);
- return workResult;
}
}
- public WorkResult getWorkResult() {
- return workResult;
- }
+ // ========== hashcode and equals ==========
- public List> getNextWrappers() {
- return nextWrappers;
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
}
- public void setParam(T param) {
- this.param = param;
+ /**
+ * {@code return id.hashCode();}返回id值的hashcode
+ */
+ @Override
+ public int hashCode() {
+ // final String id can use to .hashcode() .
+ return id.hashCode();
}
- private boolean checkIsNullResult() {
- return ResultState.DEFAULT == workResult.getResultState();
- }
+ // ========== Builder ==========
- private void addDepend(WorkerWrapper, ?> workerWrapper, boolean must) {
- addDepend(new DependWrapper(workerWrapper, must));
+ public static WorkerWrapperBuilder builder() {
+ return new Builder<>();
}
- private void addDepend(DependWrapper dependWrapper) {
- if (dependWrappers == null) {
- dependWrappers = new ArrayList<>();
- }
- //如果依赖的是重复的同一个,就不重复添加了
- for (DependWrapper wrapper : dependWrappers) {
- if (wrapper.equals(dependWrapper)) {
- return;
- }
+ /**
+ * 自v1.5,该类被抽取到{@link StableWorkerWrapperBuilder}抽象类,兼容之前的版本。
+ */
+ public static class Builder extends StableWorkerWrapperBuilder> {
+ /**
+ * @deprecated 建议使用 {@link #builder()}返回{@link WorkerWrapperBuilder}接口,以调用v1.5之后的规范api
+ */
+ @Deprecated
+ public Builder() {
}
- dependWrappers.add(dependWrapper);
}
- private void addNext(WorkerWrapper, ?> workerWrapper) {
- if (nextWrappers == null) {
- nextWrappers = new ArrayList<>();
- }
- //避免添加重复
- for (WorkerWrapper wrapper : nextWrappers) {
- if (workerWrapper.equals(wrapper)) {
- return;
- }
- }
- nextWrappers.add(workerWrapper);
- }
+ // ========== package access methods , for example , some getter/setter that doesn't want to be public ==========
- private void addNextWrappers(List> wrappers) {
- if (wrappers == null) {
- return;
- }
- for (WorkerWrapper, ?> wrapper : wrappers) {
- addNext(wrapper);
- }
+ T getParam() {
+ return param;
}
- private void addDependWrappers(List dependWrappers) {
- if (dependWrappers == null) {
- return;
- }
- for (DependWrapper wrapper : dependWrappers) {
- addDepend(wrapper);
- }
+ IWorker getWorker() {
+ return worker;
}
- private WorkResult defaultResult() {
- workResult.setResultState(ResultState.TIMEOUT);
- workResult.setResult(worker.defaultValue());
- return workResult;
+ void setWorker(IWorker worker) {
+ this.worker = worker;
}
- private WorkResult defaultExResult(Exception ex) {
- workResult.setResultState(ResultState.EXCEPTION);
- workResult.setResult(worker.defaultValue());
- workResult.setEx(ex);
- return workResult;
+ ICallback getCallback() {
+ return callback;
}
+ void setCallback(ICallback callback) {
+ this.callback = callback;
+ }
- private int getState() {
- return state.get();
+ void setState(int state) {
+ this.state.set(state);
}
- public String getId() {
- return id;
+ Map> getForParamUseWrappers() {
+ return forParamUseWrappers;
}
- private boolean compareAndSetState(int expect, int update) {
- return this.state.compareAndSet(expect, update);
+ void setForParamUseWrappers(Map> forParamUseWrappers) {
+ this.forParamUseWrappers = forParamUseWrappers;
}
- private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
- this.needCheckNextWrapperResult = needCheckNextWrapperResult;
+ void setWorkResult(WorkResult workResult) {
+ this.workResult = workResult;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- WorkerWrapper, ?> that = (WorkerWrapper, ?>) o;
- return needCheckNextWrapperResult == that.needCheckNextWrapperResult &&
- Objects.equals(param, that.param) &&
- Objects.equals(worker, that.worker) &&
- Objects.equals(callback, that.callback) &&
- Objects.equals(nextWrappers, that.nextWrappers) &&
- Objects.equals(dependWrappers, that.dependWrappers) &&
- Objects.equals(state, that.state) &&
- Objects.equals(workResult, that.workResult);
+ abstract void setNextWrappers(Set> nextWrappers);
+
+ abstract Set> getDependWrappers();
+
+ abstract void setDependWrappers(Set> dependWrappers);
+
+ WrapperStrategy getWrapperStrategy() {
+ return wrapperStrategy;
}
+ // ========== toString ==========
+
@Override
- public int hashCode() {
- return Objects.hash(param, worker, callback, nextWrappers, dependWrappers, state, workResult, needCheckNextWrapperResult);
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(150)
+ .append("WorkerWrapper{id=").append(id)
+ .append(", param=").append(param)
+ .append(", worker=").append(worker)
+ .append(", callback=").append(callback)
+ .append(", state=").append(state)
+ .append(", workResult=").append(workResult)
+ // 防止循环引用,这里只输出相关Wrapper的id
+ .append(", forParamUseWrappers::getId=");
+ getForParamUseWrappers().keySet().forEach(wrapperId -> sb.append(wrapperId).append(", "));
+ if (getForParamUseWrappers().keySet().size() > 0) {
+ sb.delete(sb.length() - 2, sb.length());
+ }
+ sb
+ .append(", dependWrappers::getId=[");
+ getDependWrappers().stream().map(WorkerWrapper::getId).forEach(wrapperId -> sb.append(wrapperId).append(", "));
+ if (getDependWrappers().size() > 0) {
+ sb.delete(sb.length() - 2, sb.length());
+ }
+ sb
+ .append("], nextWrappers::getId=[");
+ getNextWrappers().stream().map(WorkerWrapper::getId).forEach(wrapperId -> sb.append(wrapperId).append(", "));
+ if (getNextWrappers().size() > 0) {
+ sb.delete(sb.length() - 2, sb.length());
+ }
+ sb
+ .append("]")
+ .append(", wrapperStrategy=").append(getWrapperStrategy())
+ .append('}');
+ return sb.toString();
}
- public static class Builder {
- /**
- * 该wrapper的唯一标识
- */
- private String id = UUID.randomUUID().toString();
- /**
- * worker将来要处理的param
- */
- private W param;
- private IWorker worker;
- private ICallback callback;
+ public static class WrapperStrategy implements DependenceStrategy, SkipStrategy {
+
+ // ========== 这三个属性用来判断是否要开始工作 ==========
+
+ // 从前往后依次判断的顺序为 dependWrapperStrategyMapper -> dependMustStrategyMapper -> dependenceStrategy
+
/**
- * 自己后面的所有
+ * 对特殊Wrapper专用的依赖响应策略。
+ * 该值允许为null
*/
- private List> nextWrappers;
+ private DependWrapperStrategyMapper dependWrapperStrategyMapper;
/**
- * 自己依赖的所有
+ * 对必须完成的(must的)Wrapper的依赖响应策略。
+ * 该值允许为null
+ *
+ * 这是一个不得不向历史妥协的属性。用于适配must开关方式。
*/
- private List dependWrappers;
+ private DependMustStrategyMapper dependMustStrategyMapper;
/**
- * 存储强依赖于自己的wrapper集合
+ * 依赖响应全局策略。
*/
- private Set> selfIsMustSet;
-
- private boolean needCheckNextWrapperResult = true;
-
- public Builder worker(IWorker worker) {
- this.worker = worker;
- return this;
+ private DependenceStrategy dependenceStrategy;
+
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ // 如果存在依赖,则调用三层依赖响应策略进行判断
+ DependenceStrategy strategy = dependWrapperStrategyMapper;
+ if (dependMustStrategyMapper != null) {
+ strategy = strategy == null ? dependMustStrategyMapper : strategy.thenJudge(dependMustStrategyMapper);
+ }
+ if (dependenceStrategy != null) {
+ strategy = strategy == null ? dependenceStrategy : strategy.thenJudge(dependenceStrategy);
+ }
+ if (strategy == null) {
+ throw new IllegalStateException("配置无效,三层判断策略均为null,请开发者检查自己的Builder是否逻辑错误!");
+ }
+ return strategy.judgeAction(dependWrappers, thisWrapper, fromWrapper);
}
- public Builder param(W w) {
- this.param = w;
- return this;
+ public DependWrapperStrategyMapper getDependWrapperStrategyMapper() {
+ return dependWrapperStrategyMapper;
}
- public Builder id(String id) {
- if (id != null) {
- this.id = id;
- }
- return this;
+ public void setDependWrapperStrategyMapper(DependWrapperStrategyMapper dependWrapperStrategyMapper) {
+ this.dependWrapperStrategyMapper = dependWrapperStrategyMapper;
}
- public Builder needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
- this.needCheckNextWrapperResult = needCheckNextWrapperResult;
- return this;
+ public DependMustStrategyMapper getDependMustStrategyMapper() {
+ return dependMustStrategyMapper;
}
- public Builder callback(ICallback callback) {
- this.callback = callback;
- return this;
+ public void setDependMustStrategyMapper(DependMustStrategyMapper dependMustStrategyMapper) {
+ this.dependMustStrategyMapper = dependMustStrategyMapper;
}
- public Builder depend(WorkerWrapper, ?>... wrappers) {
- if (wrappers == null) {
- return this;
- }
- for (WorkerWrapper, ?> wrapper : wrappers) {
- depend(wrapper);
- }
- return this;
+ public DependenceStrategy getDependenceStrategy() {
+ return dependenceStrategy;
}
- public Builder depend(WorkerWrapper, ?> wrapper) {
- return depend(wrapper, true);
+ public void setDependenceStrategy(DependenceStrategy dependenceStrategy) {
+ this.dependenceStrategy = dependenceStrategy;
}
- public Builder depend(WorkerWrapper, ?> wrapper, boolean isMust) {
- if (wrapper == null) {
- return this;
- }
- DependWrapper dependWrapper = new DependWrapper(wrapper, isMust);
- if (dependWrappers == null) {
- dependWrappers = new ArrayList<>();
- }
- dependWrappers.add(dependWrapper);
- return this;
- }
+ // ========== 跳过策略 ==========
- public Builder next(WorkerWrapper, ?> wrapper) {
- return next(wrapper, true);
- }
+ private SkipStrategy skipStrategy;
- public Builder next(WorkerWrapper, ?> wrapper, boolean selfIsMust) {
- if (nextWrappers == null) {
- nextWrappers = new ArrayList<>();
- }
- nextWrappers.add(wrapper);
+ @Override
+ public boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper) {
+ return skipStrategy != null && skipStrategy.shouldSkip(nextWrappers, thisWrapper, fromWrapper);
+ }
- //强依赖自己
- if (selfIsMust) {
- if (selfIsMustSet == null) {
- selfIsMustSet = new HashSet<>();
- }
- selfIsMustSet.add(wrapper);
- }
- return this;
+ public SkipStrategy getSkipStrategy() {
+ return skipStrategy;
}
- public Builder next(WorkerWrapper, ?>... wrappers) {
- if (wrappers == null) {
- return this;
- }
- for (WorkerWrapper, ?> wrapper : wrappers) {
- next(wrapper);
- }
- return this;
+ public void setSkipStrategy(SkipStrategy skipStrategy) {
+ this.skipStrategy = skipStrategy;
}
- public WorkerWrapper build() {
- WorkerWrapper wrapper = new WorkerWrapper<>(id, worker, param, callback);
- wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
- if (dependWrappers != null) {
- for (DependWrapper workerWrapper : dependWrappers) {
- workerWrapper.getDependWrapper().addNext(wrapper);
- wrapper.addDepend(workerWrapper);
- }
- }
- if (nextWrappers != null) {
- for (WorkerWrapper, ?> workerWrapper : nextWrappers) {
- boolean must = false;
- if (selfIsMustSet != null && selfIsMustSet.contains(workerWrapper)) {
- must = true;
- }
- workerWrapper.addDepend(wrapper, must);
- wrapper.addNext(workerWrapper);
- }
- }
+ // ========== toString ==========
- return wrapper;
- }
+ @Override
+ public String toString() {
+ return "WrapperStrategy{" +
+ "dependWrapperStrategyMapper=" + dependWrapperStrategyMapper +
+ ", dependMustStrategyMapper=" + dependMustStrategyMapper +
+ ", dependenceStrategy=" + dependenceStrategy +
+ ", skipStrategy=" + skipStrategy +
+ '}';
+ }
}
}
diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java
new file mode 100644
index 0000000..c861cb1
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java
@@ -0,0 +1,237 @@
+package com.jd.platform.async.wrapper;
+
+import com.jd.platform.async.callback.ICallback;
+import com.jd.platform.async.callback.IWorker;
+import com.jd.platform.async.worker.WorkResult;
+import com.jd.platform.async.wrapper.actionstrategy.DependWrapperActionStrategy;
+import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
+import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
+
+import java.util.Collection;
+
+/**
+ * 作为优化编排依赖策略后,新增的Builder接口。
+ *
+ * 该接口中不再开放很多过时的api。
+ *
+ * @author create by TcSnZh on 2021/5/4-下午1:26
+ */
+public interface WorkerWrapperBuilder {
+ /**
+ * 设置唯一id。
+ * 如果不设置,{@link StableWorkerWrapperBuilder}会使用UUID
+ */
+ WorkerWrapperBuilder id(String id);
+
+ /**
+ * 设置{@link IWorker}执行方法。
+ *
+ * @param worker 传入接口实现类/lambda
+ */
+ WorkerWrapperBuilder worker(IWorker worker);
+
+ /**
+ * wrapper启动后的传入参数。
+ *
+ * @param t 参数
+ */
+ WorkerWrapperBuilder param(T t);
+
+ /**
+ * 设置{@link ICallback}回调方法。
+ */
+ WorkerWrapperBuilder callback(ICallback callback);
+
+ /**
+ * 设置跳过策略。通常用于检查下游Wrapper是否已经完成。
+ *
+ * 允许不设置。{@link StableWorkerWrapperBuilder}将会默认设置为检查深度为1的下游Wrapper是否执行完成。
+ *
+ * @param strategy 跳过策略函数。
+ */
+ WorkerWrapperBuilder setSkipStrategy(SkipStrategy strategy);
+
+ /**
+ * 设置上游Wrapper依赖关系的选项。
+ */
+ SetDepend setDepend();
+
+ interface SetDepend {
+ /**
+ * 设置在本Wrapper之前的上游Wrapper。
+ *
+ * @param wrapper 允许传入null。
+ */
+ SetDepend wrapper(WorkerWrapper, ?> wrapper);
+
+ default SetDepend wrapper(WorkerWrapper... wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ for (WorkerWrapper, ?> wrapper : wrappers) {
+ wrapper(wrapper);
+ }
+ return this;
+ }
+
+ default SetDepend wrapper(Collection extends WorkerWrapper> wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ wrappers.forEach(this::wrapper);
+ return this;
+ }
+
+ /**
+ * 设置必须要执行成功的Wrapper,当所有被该方法设为的上游Wrapper执行成功时,本Wrapper才能执行
+ */
+ SetDepend mustRequireWrapper(WorkerWrapper, ?> wrapper);
+
+ default SetDepend mustRequireWrapper(WorkerWrapper... wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ for (WorkerWrapper, ?> wrapper : wrappers) {
+ mustRequireWrapper(wrapper);
+ }
+ return this;
+ }
+
+ /**
+ * 一个用于动态判断是否must的方法,与旧的{@code .depend(WorkerWrapper,boolean)}效果相同。
+ *
+ * @param must 如果为true,则等同于{@link #mustRequireWrapper(WorkerWrapper)},否则等同于{@link #wrapper(WorkerWrapper)}
+ */
+ default SetDepend requireWrapper(WorkerWrapper, ?> wrapper, boolean must) {
+ return must ? mustRequireWrapper(wrapper) : wrapper(wrapper);
+ }
+
+ /**
+ * 对单个Wrapper设置特殊策略。
+ *
+ * @param wrapper 需要设置特殊策略的Wrapper。
+ * @param strategy 特殊策略。
+ */
+ SetDepend specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper, ?> wrapper);
+
+ default SetDepend specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper... wrappers) {
+ if (strategy == null || wrappers == null) {
+ return this;
+ }
+ for (WorkerWrapper, ?> workerWrapper : wrappers) {
+ specialDependWrapper(strategy, workerWrapper);
+ }
+ return this;
+ }
+
+ /**
+ * 设置基本策略并返回。
+ *
+ * 如果从未调用该方法,则在{@link #build()}时使用{@link #defaultStrategy()}作为默认策略。
+ *
+ *
+ * @param dependenceStrategy 根据上游Wrapper判断本Wrapper是否启动的最终策略。
+ */
+ SetDepend strategy(DependenceStrategy dependenceStrategy);
+
+ /**
+ * 默认策略为{@link DependenceStrategy#ALL_DEPENDENCIES_ALL_SUCCESS}
+ */
+ default SetDepend defaultStrategy() {
+ return strategy(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS);
+ }
+
+ /**
+ * 结束依赖关系设置。返回到所属的{@link WorkerWrapperBuilder}
+ */
+ WorkerWrapperBuilder end();
+ }
+
+ /**
+ * 便捷式设置依赖的上游Wrapper。
+ *
+ * @param wrappers 上游Wrapper
+ */
+ default WorkerWrapperBuilder depends(WorkerWrapper... wrappers) {
+ return setDepend().wrapper(wrappers).end();
+ }
+
+ default WorkerWrapperBuilder depends(Collection wrappers) {
+ return setDepend().wrapper(wrappers).end();
+ }
+
+ default WorkerWrapperBuilder depends(DependenceStrategy strategy, WorkerWrapper... wrappers) {
+ return setDepend().wrapper(wrappers).strategy(strategy).end();
+ }
+
+ default WorkerWrapperBuilder depends(DependenceStrategy strategy, Collection wrappers) {
+ return setDepend().wrapper(wrappers).strategy(strategy).end();
+ }
+
+ /**
+ * 设置下游Wrapper依赖关系的选项。
+ */
+ SetNext setNext();
+
+ interface SetNext {
+ /**
+ * 设置在本Wrapper之后的下游Wrapper。
+ */
+ SetNext wrapper(WorkerWrapper, ?> wrapper);
+
+ default SetNext wrapper(WorkerWrapper... wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ for (WorkerWrapper, ?> wrapper : wrappers) {
+ wrapper(wrapper);
+ }
+ return this;
+ }
+
+ default SetNext wrapper(Collection extends WorkerWrapper> wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ wrappers.forEach(this::wrapper);
+ return this;
+ }
+
+ /**
+ * 调用该方法将会让传入的此下游workerWrappers对本Wrapper强依赖(must)
+ *
+ * @param wrapper 下游Wrapper
+ */
+ SetNext mustToNextWrapper(WorkerWrapper, ?> wrapper);
+
+ default SetNext requireToNextWrapper(WorkerWrapper, ?> wrapper, boolean must) {
+ return must ? mustToNextWrapper(wrapper) : wrapper(wrapper);
+ }
+
+ /**
+ * 调用该方法将会让传入的此下游workerWrappers对本Wrapper进行特殊策略判断,
+ *
+ * @param strategy 对本Wrapper的特殊策略。
+ * @param wrapper 依赖本Wrapper的下游Wrapper。
+ * @return 返回Builder自身。
+ */
+ SetNext specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper, ?> wrapper);
+
+ WorkerWrapperBuilder end();
+ }
+
+ /**
+ * 便捷式设置本Wrapper被依赖的下游Wrapper。
+ *
+ * @param wrappers 下游Wrapper
+ */
+ default WorkerWrapperBuilder nextOf(WorkerWrapper... wrappers) {
+ return setNext().wrapper(wrappers).end();
+ }
+
+ default WorkerWrapperBuilder nextOf(Collection wrappers) {
+ return setNext().wrapper(wrappers).end();
+ }
+
+ WorkerWrapper build();
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependMustStrategyMapper.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependMustStrategyMapper.java
new file mode 100644
index 0000000..8ad1019
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependMustStrategyMapper.java
@@ -0,0 +1,99 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.worker.ResultState;
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * 这是一个“向历史妥协”的策略器。以兼容must开关模式。
+ *
+ * @author create by TcSnZh on 2021/5/4-下午1:24
+ */
+public class DependMustStrategyMapper implements DependenceStrategy {
+
+ private final Set> mustDependSet = new LinkedHashSet<>();
+
+ /**
+ * 在{@link #mustDependSet} 中的must依赖。
+ *
+ * 如果{@code mustDependSet == null || mustDependSet.size() < 1},返回{@link DependenceAction#JUDGE_BY_AFTER}
+ *
+ * 如果所有的Wrapper已经完成,本Wrapper将会开始工作。
+ *
+ * 如果任一{@link #mustDependSet}中的Wrapper失败,则返回{@link DependenceAction#FAST_FAIL}。
+ * 具体超时/异常则根据{@link com.jd.platform.async.worker.ResultState}的值进行判断。
+ *
+ * 如果存在Wrapper未完成 且 所有的Wrapper都未失败,则返回{@link DependenceAction#JUDGE_BY_AFTER}。
+ *
+ */
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ if (mustDependSet.size() < 1) {
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ }
+ boolean allSuccess = true;
+ for (WorkerWrapper, ?> wrapper : mustDependSet) {
+ switch (wrapper.getWorkResult().getResultState()) {
+ case TIMEOUT:
+ return DependenceAction.FAST_FAIL.fastFailException(ResultState.TIMEOUT, null);
+ case EXCEPTION:
+ return DependenceAction.FAST_FAIL.fastFailException(ResultState.EXCEPTION, wrapper.getWorkResult().getEx());
+ case DEFAULT:
+ allSuccess = false;
+ case SUCCESS:
+ default:
+ }
+ }
+ if (allSuccess) {
+ return DependenceAction.START_WORK.emptyProperty();
+ }
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ }
+
+ /**
+ * 新增must依赖。
+ *
+ * @param mustDependWrapper WorkerWrapper
+ * @return 返回自身
+ */
+ public DependMustStrategyMapper addDependMust(WorkerWrapper, ?> mustDependWrapper) {
+ if (mustDependWrapper == null) {
+ return this;
+ }
+ mustDependSet.add(mustDependWrapper);
+ return this;
+ }
+
+ public DependMustStrategyMapper addDependMust(Collection> wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ mustDependSet.addAll(wrappers);
+ return this;
+ }
+
+ public DependMustStrategyMapper addDependMust(WorkerWrapper, ?>... wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ return addDependMust(Arrays.asList(wrappers));
+ }
+
+ public Set> getMustDependSet() {
+ return mustDependSet;
+ }
+
+ @Override
+ public String toString() {
+ return "DependMustStrategyMapper{" +
+ "mustDependSet::getId=" + mustDependSet.stream().map(WorkerWrapper::getId).collect(Collectors.toList()) +
+ '}';
+ }
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperActionStrategy.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperActionStrategy.java
new file mode 100644
index 0000000..745168f
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperActionStrategy.java
@@ -0,0 +1,74 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+/**
+ * 单参数策略。
+ *
+ * @author create by TcSnZh on 2021/5/1-下午11:16
+ */
+@FunctionalInterface
+public interface DependWrapperActionStrategy {
+ /**
+ * 仅使用一个参数的判断方法
+ *
+ * @param fromWrapper 调用本Wrapper的上游Wrapper
+ * @return 返回 {@link DependenceAction.WithProperty}
+ */
+ DependenceAction.WithProperty judge(WorkerWrapper, ?> fromWrapper);
+
+ // ========== 送几个供链式调用的默认值 ==========
+
+ /**
+ * 成功时,交给下一个策略器判断。
+ * 未运行时,休息。
+ * 失败时,失败。
+ */
+ DependWrapperActionStrategy SUCCESS_CONTINUE = new DependWrapperActionStrategy() {
+ @Override
+ public DependenceAction.WithProperty judge(WorkerWrapper, ?> ww) {
+ switch (ww.getWorkResult().getResultState()) {
+ case SUCCESS:
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ case DEFAULT:
+ return DependenceAction.TAKE_REST.emptyProperty();
+ case EXCEPTION:
+ case TIMEOUT:
+ return DependenceAction.FAST_FAIL.fastFailException(ww.getWorkResult().getResultState(), ww.getWorkResult().getEx());
+ default:
+ }
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + ww.getWorkResult().getResultState());
+ }
+
+ @Override
+ public String toString() {
+ return "SUCCESS_CONTINUE";
+ }
+ };
+ /**
+ * 成功时,开始工作。
+ * 未运行时,交给下一个策略器判断。
+ * 失败时,失败。
+ */
+ DependWrapperActionStrategy SUCCESS_START_INIT_CONTINUE = new DependWrapperActionStrategy() {
+ @Override
+ public DependenceAction.WithProperty judge(WorkerWrapper, ?> ww) {
+ switch (ww.getWorkResult().getResultState()) {
+ case SUCCESS:
+ return DependenceAction.START_WORK.emptyProperty();
+ case DEFAULT:
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ case EXCEPTION:
+ case TIMEOUT:
+ return DependenceAction.FAST_FAIL.fastFailException(ww.getWorkResult().getResultState(), ww.getWorkResult().getEx());
+ default:
+ }
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + ww.getWorkResult().getResultState());
+ }
+
+ @Override
+ public String toString() {
+ return "SUCCESS_START_INIT_CONTINUE";
+ }
+ };
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperStrategyMapper.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperStrategyMapper.java
new file mode 100644
index 0000000..5c2cd56
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperStrategyMapper.java
@@ -0,0 +1,69 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略。
+ *
+ * 使用{@link DependWrapperStrategyMapper}本实现类对{@link DependenceStrategy}进行增强,
+ *
+ * @author create by TcSnZh on 2021/5/1-下午11:12
+ */
+public class DependWrapperStrategyMapper implements DependenceStrategy {
+ private final Map, DependWrapperActionStrategy> mapper = new ConcurrentHashMap<>(4);
+
+ /**
+ * 设置对应策略
+ *
+ * @param targetWrapper 要设置策略的WorkerWrapper
+ * @param strategy 要设置的策略
+ * @return 返回this,链式调用。
+ */
+ public DependWrapperStrategyMapper putMapping(WorkerWrapper, ?> targetWrapper, DependWrapperActionStrategy strategy) {
+ mapper.put(targetWrapper, strategy);
+ toStringCache = null;
+ return this;
+ }
+
+ /**
+ * 判断方法。
+ *
+ * 如果fromWrapper在{@link #mapper}中,则返回{@link DependWrapperActionStrategy}的判断返回值。否则返回{@link DependenceAction#JUDGE_BY_AFTER}
+ *
+ * @param dependWrappers (这里不会使用该值)thisWrapper.dependWrappers的属性值。
+ * @param thisWrapper (这里不会使用该值)thisWrapper,即为“被催促”的WorkerWrapper
+ * @param fromWrapper 调用来源Wrapper。
+ * @return 如果在mapper中有对fromWrapper的处理策略,则使用其进行判断。否则返回JUDGE_BY_AFTER交给下一个进行判断。
+ */
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ DependWrapperActionStrategy strategy = mapper.get(fromWrapper);
+ if (strategy == null) {
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ }
+ return strategy.judge(fromWrapper);
+ }
+
+ /**
+ * 缓存toString
+ */
+ private String toStringCache;
+
+ @Override
+ public String toString() {
+ if (toStringCache == null) {
+ toStringCache = "DependWrapperStrategyMapper{mapper=" + mapper.entrySet().stream()
+ .map(entry -> "{" + entry.getKey().getId() + ":" + entry.getValue() + "}")
+ .collect(Collectors.toList())
+ + "}";
+ }
+ return toStringCache;
+ }
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceAction.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceAction.java
new file mode 100644
index 0000000..acbf5d5
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceAction.java
@@ -0,0 +1,99 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.worker.ResultState;
+
+/**
+ * 返回执行工作类型的枚举。
+ *
+ * @author create by TcSnZh on 2021/5/1-下午10:47
+ */
+public enum DependenceAction {
+ /**
+ * 开始工作。WorkerWrapper会执行工作方法。
+ */
+ START_WORK,
+ /**
+ * 还没轮到,休息一下。WorkerWrapper中的调用栈会返回,以等待可能发生的下次调用。
+ */
+ TAKE_REST,
+ /**
+ * 立即失败。WorkerWrapper会去执行快速失败的方法。
+ */
+ FAST_FAIL,
+ /**
+ * 交给下层{@link DependenceStrategy}进行判断。
+ * 在WorkerWrapper中不需要考虑此值,因为配置正常的情况下不会返回这个值。
+ */
+ JUDGE_BY_AFTER;
+
+ // 空值单例
+
+ public WithProperty emptyProperty() {
+ return empty;
+ }
+
+ private final WithProperty empty = new WithProperty() {
+ @Override
+ public void setResultState(ResultState resultState) {
+ throw new UnsupportedOperationException("empty not support modify");
+ }
+
+ @Override
+ public void setFastFailException(Exception fastFailException) {
+ throw new UnsupportedOperationException("empty not support modify");
+ }
+
+ private final String toString = getDependenceAction() + ".empty";
+
+ @Override
+ public String toString() {
+ return toString;
+ }
+ };
+
+ // 携带异常信息、ResultState的返回值
+
+ public WithProperty fastFailException(ResultState resultState, Exception e) {
+ WithProperty withProperty = this.new WithProperty();
+ withProperty.setResultState(resultState);
+ withProperty.setFastFailException(e);
+ return withProperty;
+ }
+
+ /**
+ * 有时需要封装一些参数来返回,则使用本内部类进行返回。
+ *
+ * 所有的构造方法权限均为private,请在父枚举类{@link DependenceAction}的方法中选择合适的模板生成内部类WithProperty。
+ */
+ public class WithProperty {
+ private ResultState resultState;
+ private Exception fastFailException;
+
+ // getter setter
+
+ public ResultState getResultState() {
+ return resultState;
+ }
+
+ public void setResultState(ResultState resultState) {
+ this.resultState = resultState;
+ }
+
+ public Exception getFastFailException() {
+ return fastFailException;
+ }
+
+ public void setFastFailException(Exception fastFailException) {
+ this.fastFailException = fastFailException;
+ }
+
+ public DependenceAction getDependenceAction() {
+ return DependenceAction.this;
+ }
+
+ // constructor always private.
+
+ private WithProperty() {
+ }
+ }
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java
new file mode 100644
index 0000000..493148c
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java
@@ -0,0 +1,235 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.executor.WrapperEndingInspector;
+import com.jd.platform.async.worker.ResultState;
+import com.jd.platform.async.worker.WorkResult;
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/**
+ * 依赖策略接口。
+ *
+ * 提供了多个默认值可以作为单例模式使用。
+ *
+ * 工作原理示例:
+ *
+ * ==== 一个简单示例 ====
+ * 现有三个WorkerWrapper:A、B、C,其中 {@code A{dependWrappers=[B,C],} }
+ * 当B执行完成后调用A时,根据依赖关系ALL_DEPENDENCIES_ALL_SUCCESS,还需等待C的结果。
+ * 然后,当C执行完成后调用A时,根据依赖关系ALL_DEPENDENCIES_ALL_SUCCESS: 此时如果C成功了,A就开工,此时如果C失败了,A就失败。
+ * ==== 简单示例2 ====
+ *
+ *
+ *
+ * @author create by TcSnZh on 2021/5/1-下午10:48
+ */
+@FunctionalInterface
+public interface DependenceStrategy {
+ /**
+ * 核心判断策略
+ *
+ * @param dependWrappers thisWrapper.dependWrappers的属性值。
+ * @param thisWrapper thisWrapper,即为“被催促”的WorkerWrapper
+ * @param fromWrapper 调用来源Wrapper。
+ *
+ * 该参数不会为null。
+ * 因为在{@link WorkerWrapper#work(ExecutorService, long, Map, WrapperEndingInspector)}方法中传入的的第一批无依赖的Wrapper,
+ * 不会被该策略器所判断,而是不论如何直接执行。
+ *
+ * @return 返回枚举值内部类,WorkerWrapper将会根据其值来决定自己如何响应这次调用。 {@link DependenceAction.WithProperty}
+ */
+ DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper);
+
+ /**
+ * 如果本策略器的judge方法返回了JUDGE_BY_AFTER,则交给下一个策略器来判断。
+ *
+ * @param after 下层策略器
+ * @return 返回一个“封装的多层策略器”
+ */
+ default DependenceStrategy thenJudge(DependenceStrategy after) {
+ DependenceStrategy that = this;
+ return new DependenceStrategy() {
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ DependenceAction.WithProperty judge = that.judgeAction(dependWrappers, thisWrapper, fromWrapper);
+ if (judge.getDependenceAction() == DependenceAction.JUDGE_BY_AFTER) {
+ return after.judgeAction(dependWrappers, thisWrapper, fromWrapper);
+ }
+ return judge;
+ }
+
+ @Override
+ public String toString() {
+ return that + " ----> " + after;
+ }
+ };
+ }
+
+ // ========== 以下是一些默认实现 ==========
+
+ /**
+ * 被依赖的所有Wrapper都必须成功才能开始工作。
+ * 如果其中任一Wrapper还没有执行且不存在失败,则休息。
+ * 如果其中任一Wrapper失败则立即失败。
+ */
+ DependenceStrategy ALL_DEPENDENCIES_ALL_SUCCESS = new DependenceStrategy() {
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ boolean hasWaiting = false;
+ for (final WorkerWrapper, ?> dependWrapper : dependWrappers) {
+ WorkResult> workResult = dependWrapper.getWorkResult();
+ switch (workResult.getResultState()) {
+ case DEFAULT:
+ hasWaiting = true;
+ break;
+ case SUCCESS:
+ break;
+ case TIMEOUT:
+ case EXCEPTION:
+ return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
+ default:
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + workResult.getResultState());
+ }
+ }
+ if (hasWaiting) {
+ return DependenceAction.TAKE_REST.emptyProperty();
+ }
+ return DependenceAction.START_WORK.emptyProperty();
+ }
+
+ @Override
+ public String toString() {
+ return "ALL_DEPENDENCIES_ALL_SUCCESS";
+ }
+ };
+
+ /**
+ * 被依赖的Wrapper中任意一个成功了就可以开始工作。
+ * 如果其中所有Wrapper还没有执行,则休息。
+ * 如果其中一个Wrapper失败且不存在成功则立即失败。
+ */
+ DependenceStrategy ALL_DEPENDENCIES_ANY_SUCCESS = new DependenceStrategy() {
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ boolean hasFailed = false;
+ Exception fastFailException = null;
+ ResultState resultState = null;
+ for (final WorkerWrapper, ?> dependWrapper : dependWrappers) {
+ WorkResult> workResult = dependWrapper.getWorkResult();
+ switch (workResult.getResultState()) {
+ case DEFAULT:
+ break;
+ case SUCCESS:
+ return DependenceAction.START_WORK.emptyProperty();
+ case TIMEOUT:
+ case EXCEPTION:
+ resultState = !hasFailed ? workResult.getResultState() : resultState;
+ fastFailException = !hasFailed ? workResult.getEx() : fastFailException;
+ hasFailed = true;
+ break;
+ default:
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + workResult.getResultState());
+ }
+ }
+ if (hasFailed) {
+ return DependenceAction.FAST_FAIL.fastFailException(resultState, fastFailException);
+ }
+ return DependenceAction.TAKE_REST.emptyProperty();
+ }
+
+ @Override
+ public String toString() {
+ return "ALL_DEPENDENCIES_ANY_SUCCESS";
+ }
+ };
+
+ /**
+ * 如果被依赖的工作中任一失败,则立即失败。否则就开始工作(不论之前的工作有没有开始)。
+ */
+ DependenceStrategy ALL_DEPENDENCIES_NONE_FAILED = new DependenceStrategy() {
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ for (WorkerWrapper, ?> dependWrapper : dependWrappers) {
+ WorkResult> workResult = dependWrapper.getWorkResult();
+ switch (workResult.getResultState()) {
+ case TIMEOUT:
+ case EXCEPTION:
+ return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
+ default:
+ }
+ }
+ return DependenceAction.START_WORK.emptyProperty();
+ }
+
+ @Override
+ public String toString() {
+ return "ALL_DEPENDENCIES_NONE_FAILED";
+ }
+ };
+
+ /**
+ * 只有当指定的这些Wrapper都成功时,才会开始工作。
+ * 任一失败会快速失败。
+ * 任一还没有执行且不存在失败,则休息。
+ *
+ * @param theseWrapper 该方法唯一有效参数。
+ * @return 返回生成的 {@link DependenceAction.WithProperty)
+ */
+ static DependenceStrategy theseWrapperAllSuccess(Set> theseWrapper) {
+ return new DependenceStrategy() {
+ private final Set> theseWrappers;
+ private final String toString;
+
+ {
+ theseWrappers = Collections.unmodifiableSet(theseWrapper);
+ toString = "THESE_WRAPPER_MUST_SUCCESS:" + theseWrappers.stream().map(WorkerWrapper::getId).collect(Collectors.toList());
+ }
+
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ boolean hasWaiting = false;
+ for (WorkerWrapper, ?> wrapper : theseWrappers) {
+ ResultState resultState = wrapper.getWorkResult().getResultState();
+ switch (resultState) {
+ case DEFAULT:
+ hasWaiting = true;
+ break;
+ case SUCCESS:
+ break;
+ case TIMEOUT:
+ case EXCEPTION:
+ return DependenceAction.FAST_FAIL.fastFailException(resultState, wrapper.getWorkResult().getEx());
+ default:
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + resultState);
+ }
+ }
+ if (hasWaiting) {
+ return DependenceAction.TAKE_REST.emptyProperty();
+ }
+ return DependenceAction.START_WORK.emptyProperty();
+ }
+
+
+ @Override
+ public String toString() {
+ return toString;
+ }
+ };
+ }
+
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/skipstrategy/SkipStrategy.java b/src/main/java/com/jd/platform/async/wrapper/skipstrategy/SkipStrategy.java
new file mode 100644
index 0000000..d98aaa2
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/skipstrategy/SkipStrategy.java
@@ -0,0 +1,183 @@
+package com.jd.platform.async.wrapper.skipstrategy;
+
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author create by TcSnZh on 2021/5/6-下午3:02
+ */
+@FunctionalInterface
+public interface SkipStrategy {
+ /**
+ * 跳过策略函数。返回true将会使WorkerWrapper跳过执行。
+ *
+ * @param nextWrappers 下游WrapperSet
+ * @param thisWrapper 本WorkerWrapper
+ * @param fromWrapper 呼叫本Wrapper的上游Wrapper
+ * @return 返回true将会使WorkerWrapper跳过执行。
+ */
+ boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper);
+
+ /**
+ * 不跳过
+ */
+ SkipStrategy NOT_SKIP = new SkipStrategy() {
+ @Override
+ public boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper) {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "NOT_SKIP";
+ }
+ };
+
+ SkipStrategy CHECK_ONE_LEVEL = new SkipStrategy() {
+ private final SkipStrategy searchNextOneLevel = searchNextWrappers(SearchNextWrappers.SearchType.DFS, 1);
+
+ @Override
+ public boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper) {
+ return searchNextOneLevel.shouldSkip(nextWrappers, thisWrapper, fromWrapper);
+ }
+
+ @Override
+ public String toString() {
+ return "CHECK_ONE_LEVEL";
+ }
+ };
+
+ default SearchNextWrappers searchNextWrappers(SearchNextWrappers.SearchType searchType, int searchLevel) {
+ return new SearchNextWrappers(searchType, searchLevel);
+ }
+
+ /**
+ * 检查之后的Wrapper是否不在INIT状态
+ */
+ class SearchNextWrappers implements SkipStrategy {
+ /**
+ * 搜索策略
+ */
+ enum SearchType {
+ DFS, BFS;
+ }
+
+ private final SearchType searchType;
+
+ /**
+ * 搜索深度
+ */
+ private final int searchLevel;
+
+ public SearchNextWrappers(SearchType searchType, int searchLevel) {
+ this.searchType = Objects.requireNonNull(searchType);
+ this.searchLevel = searchLevel;
+ }
+
+ @Override
+ public boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper) {
+ Set> nextSet;
+ if ((nextSet = nextWrappers) == null || nextSet.isEmpty()) {
+ return false;
+ }
+ switch (searchType) {
+ case DFS:
+ return nextSet.stream().allMatch(next ->
+ next.getState() != WorkerWrapper.INIT || dfsSearchShouldSkip(next, 1));
+ case BFS:
+ LinkedList queue = nextSet.stream().map(ww -> new BfsNode(ww, 0)).collect(Collectors.toCollection(LinkedList::new));
+ HashSet> existed = new HashSet<>(nextSet);
+ while (!queue.isEmpty()) {
+ BfsNode node = queue.poll();
+ if (node.atLevel > searchLevel) {
+ continue;
+ }
+ if (node.wrapper.getState() != WorkerWrapper.INIT) {
+ return true;
+ }
+ if (node.atLevel < searchLevel) {
+ // 如果不是深度的最大值,则往队列里添加
+ node.wrapper.getNextWrappers().forEach(nextWrapper -> {
+ if (existed.contains(nextWrapper)) {
+ return;
+ }
+ queue.offer(new BfsNode(nextWrapper, node.atLevel + 1));
+ existed.add(nextWrapper);
+ });
+ }
+ }
+ return false;
+ default:
+ throw new IllegalStateException("searchType type illegal : " + searchType);
+ }
+ }
+
+ private boolean dfsSearchShouldSkip(WorkerWrapper, ?> currentWrapper, int currentLevel) {
+ if (currentLevel + 1 > searchLevel || currentWrapper == null) {
+ return false;
+ }
+ for (WorkerWrapper, ?> nextWrapper : currentWrapper.getNextWrappers()) {
+ if (nextWrapper != null &&
+ (nextWrapper.getState() != WorkerWrapper.INIT
+ || dfsSearchShouldSkip(nextWrapper, currentLevel + 1))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ static class BfsNode {
+ final WorkerWrapper, ?> wrapper;
+ final int atLevel;
+
+ public BfsNode(WorkerWrapper, ?> wrapper, int atLevel) {
+ this.wrapper = wrapper;
+ this.atLevel = atLevel;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BfsNode bfsNode = (BfsNode) o;
+ return Objects.equals(wrapper, bfsNode.wrapper);
+ }
+
+ @Override
+ public int hashCode() {
+ return wrapper.hashCode();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SearchNextWrappers that = (SearchNextWrappers) o;
+ return searchLevel == that.searchLevel && searchType == that.searchType;
+ }
+
+ @Override
+ public int hashCode() {
+ return searchLevel ^ searchType.ordinal();
+ }
+
+ @Override
+ public String toString() {
+ return "CheckNextWrapper{" +
+ "searchType=" + searchType +
+ ", searchLevel=" + searchLevel +
+ '}';
+ }
+ }
+}
diff --git a/src/test/java/dependnew/DeWorker.java b/src/test/java/beforev14/depend/DeWorker.java
similarity index 90%
rename from src/test/java/dependnew/DeWorker.java
rename to src/test/java/beforev14/depend/DeWorker.java
index 6ae011f..e9f2b82 100755
--- a/src/test/java/dependnew/DeWorker.java
+++ b/src/test/java/beforev14/depend/DeWorker.java
@@ -1,4 +1,4 @@
-package dependnew;
+package beforev14.depend;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker implements IWorker, ICallback {
+class DeWorker implements IWorker, ICallback {
@Override
public User action(String object, Map allWrappers) {
diff --git a/src/test/java/depend/DeWorker1.java b/src/test/java/beforev14/depend/DeWorker1.java
similarity index 89%
rename from src/test/java/depend/DeWorker1.java
rename to src/test/java/beforev14/depend/DeWorker1.java
index 6cafc30..b958768 100755
--- a/src/test/java/depend/DeWorker1.java
+++ b/src/test/java/beforev14/depend/DeWorker1.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.depend;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker1 implements IWorker, User>, ICallback, User> {
+class DeWorker1 implements IWorker, User>, ICallback, User> {
@Override
public User action(WorkResult result, Map allWrappers) {
diff --git a/src/test/java/depend/DeWorker2.java b/src/test/java/beforev14/depend/DeWorker2.java
similarity index 89%
rename from src/test/java/depend/DeWorker2.java
rename to src/test/java/beforev14/depend/DeWorker2.java
index 3dd73e7..c2a48c4 100755
--- a/src/test/java/depend/DeWorker2.java
+++ b/src/test/java/beforev14/depend/DeWorker2.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.depend;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker2 implements IWorker, String>, ICallback, String> {
+class DeWorker2 implements IWorker, String>, ICallback, String> {
@Override
public String action(WorkResult result, Map allWrappers) {
diff --git a/src/test/java/depend/LambdaTest.java b/src/test/java/beforev14/depend/LambdaTest.java
similarity index 98%
rename from src/test/java/depend/LambdaTest.java
rename to src/test/java/beforev14/depend/LambdaTest.java
index 42c1bb2..93037a5 100644
--- a/src/test/java/depend/LambdaTest.java
+++ b/src/test/java/beforev14/depend/LambdaTest.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.depend;
import java.util.Map;
@@ -10,7 +10,7 @@ import com.jd.platform.async.wrapper.WorkerWrapper;
* @author sjsdfg
* @since 2020/6/14
*/
-public class LambdaTest {
+class LambdaTest {
public static void main(String[] args) throws Exception {
WorkerWrapper, String> workerWrapper2 = new WorkerWrapper.Builder, String>()
.worker((WorkResult result, Map allWrappers) -> {
diff --git a/src/test/java/depend/Test.java b/src/test/java/beforev14/depend/Test.java
similarity index 98%
rename from src/test/java/depend/Test.java
rename to src/test/java/beforev14/depend/Test.java
index 971fdcf..a877047 100644
--- a/src/test/java/depend/Test.java
+++ b/src/test/java/beforev14/depend/Test.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.depend;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.worker.WorkResult;
@@ -12,7 +12,7 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
-public class Test {
+class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeWorker w = new DeWorker();
diff --git a/src/test/java/depend/User.java b/src/test/java/beforev14/depend/User.java
similarity index 91%
rename from src/test/java/depend/User.java
rename to src/test/java/beforev14/depend/User.java
index dfd6277..8481a49 100644
--- a/src/test/java/depend/User.java
+++ b/src/test/java/beforev14/depend/User.java
@@ -1,11 +1,11 @@
-package depend;
+package beforev14.depend;
/**
* 一个包装类
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
-public class User {
+class User {
private String name;
public User(String name) {
diff --git a/src/test/java/depend/DeWorker.java b/src/test/java/beforev14/dependnew/DeWorker.java
similarity index 90%
rename from src/test/java/depend/DeWorker.java
rename to src/test/java/beforev14/dependnew/DeWorker.java
index e963816..135b6b3 100755
--- a/src/test/java/depend/DeWorker.java
+++ b/src/test/java/beforev14/dependnew/DeWorker.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.dependnew;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker implements IWorker, ICallback {
+class DeWorker implements IWorker, ICallback {
@Override
public User action(String object, Map allWrappers) {
diff --git a/src/test/java/dependnew/DeWorker1.java b/src/test/java/beforev14/dependnew/DeWorker1.java
similarity index 92%
rename from src/test/java/dependnew/DeWorker1.java
rename to src/test/java/beforev14/dependnew/DeWorker1.java
index 0a56fdf..ba02503 100755
--- a/src/test/java/dependnew/DeWorker1.java
+++ b/src/test/java/beforev14/dependnew/DeWorker1.java
@@ -1,4 +1,4 @@
-package dependnew;
+package beforev14.dependnew;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker1 implements IWorker, ICallback {
+class DeWorker1 implements IWorker, ICallback {
@Override
public User action(String object, Map allWrappers) {
diff --git a/src/test/java/dependnew/DeWorker2.java b/src/test/java/beforev14/dependnew/DeWorker2.java
similarity index 92%
rename from src/test/java/dependnew/DeWorker2.java
rename to src/test/java/beforev14/dependnew/DeWorker2.java
index c4f61bc..304df06 100755
--- a/src/test/java/dependnew/DeWorker2.java
+++ b/src/test/java/beforev14/dependnew/DeWorker2.java
@@ -1,4 +1,4 @@
-package dependnew;
+package beforev14.dependnew;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker2 implements IWorker, ICallback {
+class DeWorker2 implements IWorker, ICallback {
@Override
public String action(User object, Map allWrappers) {
diff --git a/src/test/java/dependnew/Test.java b/src/test/java/beforev14/dependnew/Test.java
similarity index 97%
rename from src/test/java/dependnew/Test.java
rename to src/test/java/beforev14/dependnew/Test.java
index 731e42b..657bb7f 100644
--- a/src/test/java/dependnew/Test.java
+++ b/src/test/java/beforev14/dependnew/Test.java
@@ -1,4 +1,4 @@
-package dependnew;
+package beforev14.dependnew;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.wrapper.WorkerWrapper;
@@ -11,7 +11,7 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
-public class Test {
+class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeWorker w = new DeWorker();
diff --git a/src/test/java/dependnew/User.java b/src/test/java/beforev14/dependnew/User.java
similarity index 91%
rename from src/test/java/dependnew/User.java
rename to src/test/java/beforev14/dependnew/User.java
index bbef801..e133e3d 100644
--- a/src/test/java/dependnew/User.java
+++ b/src/test/java/beforev14/dependnew/User.java
@@ -1,11 +1,11 @@
-package dependnew;
+package beforev14.dependnew;
/**
* 一个包装类
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
-public class User {
+class User {
private String name;
public User(String name) {
diff --git a/src/test/java/parallel/ParTimeoutWorker.java b/src/test/java/beforev14/parallel/ParTimeoutWorker.java
similarity index 92%
rename from src/test/java/parallel/ParTimeoutWorker.java
rename to src/test/java/beforev14/parallel/ParTimeoutWorker.java
index 7f7b9aa..f0a2f3a 100755
--- a/src/test/java/parallel/ParTimeoutWorker.java
+++ b/src/test/java/beforev14/parallel/ParTimeoutWorker.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParTimeoutWorker implements IWorker, ICallback {
+class ParTimeoutWorker implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/parallel/ParWorker.java b/src/test/java/beforev14/parallel/ParWorker.java
similarity index 92%
rename from src/test/java/parallel/ParWorker.java
rename to src/test/java/beforev14/parallel/ParWorker.java
index b174c51..b28b7e6 100755
--- a/src/test/java/parallel/ParWorker.java
+++ b/src/test/java/beforev14/parallel/ParWorker.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker implements IWorker, ICallback {
+class ParWorker implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/parallel/ParWorker1.java b/src/test/java/beforev14/parallel/ParWorker1.java
similarity index 93%
rename from src/test/java/parallel/ParWorker1.java
rename to src/test/java/beforev14/parallel/ParWorker1.java
index 7f13081..414851c 100755
--- a/src/test/java/parallel/ParWorker1.java
+++ b/src/test/java/beforev14/parallel/ParWorker1.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker1 implements IWorker, ICallback {
+class ParWorker1 implements IWorker, ICallback {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
diff --git a/src/test/java/parallel/ParWorker2.java b/src/test/java/beforev14/parallel/ParWorker2.java
similarity index 93%
rename from src/test/java/parallel/ParWorker2.java
rename to src/test/java/beforev14/parallel/ParWorker2.java
index 0e89e45..87cc0ac 100755
--- a/src/test/java/parallel/ParWorker2.java
+++ b/src/test/java/beforev14/parallel/ParWorker2.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker2 implements IWorker, ICallback {
+class ParWorker2 implements IWorker, ICallback {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
diff --git a/src/test/java/parallel/ParWorker3.java b/src/test/java/beforev14/parallel/ParWorker3.java
similarity index 93%
rename from src/test/java/parallel/ParWorker3.java
rename to src/test/java/beforev14/parallel/ParWorker3.java
index 4284b0f..82b6299 100755
--- a/src/test/java/parallel/ParWorker3.java
+++ b/src/test/java/beforev14/parallel/ParWorker3.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker3 implements IWorker, ICallback {
+class ParWorker3 implements IWorker, ICallback {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
diff --git a/src/test/java/parallel/ParWorker4.java b/src/test/java/beforev14/parallel/ParWorker4.java
similarity index 92%
rename from src/test/java/parallel/ParWorker4.java
rename to src/test/java/beforev14/parallel/ParWorker4.java
index 723c5f2..7f9c267 100755
--- a/src/test/java/parallel/ParWorker4.java
+++ b/src/test/java/beforev14/parallel/ParWorker4.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker4 implements IWorker, ICallback {
+class ParWorker4 implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/parallel/TestPar.java b/src/test/java/beforev14/parallel/TestPar.java
similarity index 99%
rename from src/test/java/parallel/TestPar.java
rename to src/test/java/beforev14/parallel/TestPar.java
index e13ccc9..c031c53 100755
--- a/src/test/java/parallel/TestPar.java
+++ b/src/test/java/beforev14/parallel/TestPar.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.executor.Async;
@@ -14,7 +14,7 @@ import java.util.concurrent.Executors;
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("ALL")
-public class TestPar {
+class TestPar {
public static void main(String[] args) throws Exception {
// testNormal();
diff --git a/src/test/java/seq/SeqTimeoutWorker.java b/src/test/java/beforev14/seq/SeqTimeoutWorker.java
similarity index 92%
rename from src/test/java/seq/SeqTimeoutWorker.java
rename to src/test/java/beforev14/seq/SeqTimeoutWorker.java
index 0de5e0a..80a5c7b 100755
--- a/src/test/java/seq/SeqTimeoutWorker.java
+++ b/src/test/java/beforev14/seq/SeqTimeoutWorker.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class SeqTimeoutWorker implements IWorker, ICallback {
+class SeqTimeoutWorker implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/seq/SeqWorker.java b/src/test/java/beforev14/seq/SeqWorker.java
similarity index 93%
rename from src/test/java/seq/SeqWorker.java
rename to src/test/java/beforev14/seq/SeqWorker.java
index 18c3457..c2bc392 100755
--- a/src/test/java/seq/SeqWorker.java
+++ b/src/test/java/beforev14/seq/SeqWorker.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class SeqWorker implements IWorker, ICallback {
+class SeqWorker implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/seq/SeqWorker1.java b/src/test/java/beforev14/seq/SeqWorker1.java
similarity index 93%
rename from src/test/java/seq/SeqWorker1.java
rename to src/test/java/beforev14/seq/SeqWorker1.java
index ae445c6..b3ded50 100755
--- a/src/test/java/seq/SeqWorker1.java
+++ b/src/test/java/beforev14/seq/SeqWorker1.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class SeqWorker1 implements IWorker, ICallback {
+class SeqWorker1 implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/seq/SeqWorker2.java b/src/test/java/beforev14/seq/SeqWorker2.java
similarity index 93%
rename from src/test/java/seq/SeqWorker2.java
rename to src/test/java/beforev14/seq/SeqWorker2.java
index 34853ee..458db80 100755
--- a/src/test/java/seq/SeqWorker2.java
+++ b/src/test/java/beforev14/seq/SeqWorker2.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class SeqWorker2 implements IWorker, ICallback {
+class SeqWorker2 implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/seq/TestSequential.java b/src/test/java/beforev14/seq/TestSequential.java
similarity index 97%
rename from src/test/java/seq/TestSequential.java
rename to src/test/java/beforev14/seq/TestSequential.java
index d4e1c67..586c0c4 100755
--- a/src/test/java/seq/TestSequential.java
+++ b/src/test/java/beforev14/seq/TestSequential.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.executor.Async;
@@ -11,7 +11,7 @@ import java.util.concurrent.ExecutionException;
* 串行测试
* @author wuweifeng wrote on 2019-11-20.
*/
-public class TestSequential {
+class TestSequential {
public static void main(String[] args) throws InterruptedException, ExecutionException {
diff --git a/src/test/java/seq/TestSequentialTimeout.java b/src/test/java/beforev14/seq/TestSequentialTimeout.java
similarity index 97%
rename from src/test/java/seq/TestSequentialTimeout.java
rename to src/test/java/beforev14/seq/TestSequentialTimeout.java
index f2b02de..ccd2423 100755
--- a/src/test/java/seq/TestSequentialTimeout.java
+++ b/src/test/java/beforev14/seq/TestSequentialTimeout.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.executor.Async;
@@ -12,7 +12,7 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("Duplicates")
-public class TestSequentialTimeout {
+class TestSequentialTimeout {
public static void main(String[] args) throws InterruptedException, ExecutionException {
testFirstTimeout();
}
diff --git a/src/test/java/v15/dependnew/Test.java b/src/test/java/v15/dependnew/Test.java
new file mode 100644
index 0000000..28b636c
--- /dev/null
+++ b/src/test/java/v15/dependnew/Test.java
@@ -0,0 +1,227 @@
+package v15.dependnew;
+
+import com.jd.platform.async.callback.IWorker;
+import com.jd.platform.async.executor.Async;
+import com.jd.platform.async.executor.timer.SystemClock;
+import com.jd.platform.async.worker.ResultState;
+import com.jd.platform.async.wrapper.actionstrategy.DependenceAction;
+import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
+import com.jd.platform.async.wrapper.WorkerWrapper;
+import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
+import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
+
+import java.io.PrintStream;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+
+/**
+ * @author create by TcSnZh on 2021/5/2-下午9:25
+ */
+class Test {
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+// ExecutorService pool = Executors.newFixedThreadPool(3);
+ ExecutorService pool = Async.getCommonPool();
+ try {
+ testNew2(pool);
+ System.out.println("\n\n\n");
+ testNew1(pool);
+ System.out.println("\n\n\n");
+ testNew2(pool);
+ System.out.println("\n\n\n");
+ testThreadPolling_Speed(pool);
+ System.out.println("\n\n\n");
+ testThreadPolling_V14Bug();
+ } finally {
+ //Async.shutDownCommonPool();
+ pool.shutdown();
+ }
+ }
+
+ /**
+ * 简简单单的测试一下新的编排方式
+ *
+ * .A ===> B1 ===> C1 ----> D1
+ * . ||> B2 | || \--> D2
+ * . ||> B3 | ``========v
+ * . ||> B4 |---> C2 ====> E1
+ * . \--> E2
+ */
+ private static void testNew1(ExecutorService pool) throws ExecutionException, InterruptedException {
+ WorkerWrapper