dal-job是一个去中心化的轻量的分布式Job框架。它没有master结点,代码是在各个模块上运行的。
帮助开发人员在分布式环境下开发job时,只用关注业务,而不用去关心job被重复执行的问题。
它主要提供如下功能:
问题思考
在分布式环境下,本地去中心化的分布式job需要解决的问题:
使用数据库的行级锁来保证同一时刻只有一台机在执行任务。
具体:使用 【悲观锁 + JobStatusCheck + TimeLimit】 实现在多线程与多进程(主要是多进程)环境下,一个job在运行过程中,只会有一台机在执行job
使用Quartz + MySQL。同时与Spring友好融合。提供注解(@TimedTask)形式的job配置
dal-job支持分布式环境下单台启动 和 多台启动。其中多台启动会在所有的实例上运行,需要自己解决取数问题。
dal-job提供了注解形式的job配置,具体可以参考com.kvn.dal.core.single_node.SingleNodeJob.java
@Target({ TYPE })
@Retention(RUNTIME)
public @interface TimedTask {
String corn();
boolean isGlobalSingle() default true; // 分布式环境下,是否单台启动
String desc() default "";
}
定时调度例子:
@TimedTask(corn = "0 0/1 * * * ?", desc = "测试job222")
@Service
public class MyTestJob2 implements ExecutableTask {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println(DateTime.now() + "--" + Thread.currentThread().getName() + "---------------doBizJob2222--------");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(new Random().nextInt() % 2 == 0){
throw new RuntimeException("biz执行MyTestJob2异常,xxxxxxxxx");
}
}
}
dal-job提供了内置的重试调度实现,可以方便的对异常数据进行定时重试。
重试分为两种:一是,事前重试;二是,事后重试
事前重试,即不管业务是否执行成功,都去记录执行日志(表:job_beforehand_retry),如果出现指定的异常,则标记记录为需要重试。待重试job执行时,就分发至相应的重试方法去执行。
**原理:**使用aop的方式,对需要重试的方法(含有@BeforehandRetry的方法)进行拦截
@BeforehandRetry:
/**
* 事前补偿,确保每次执行业务时都有留底。会牺牲一性的性能。
* @author wzy
* @date 2017年7月14日 下午5:03:45
*/
@Target({ METHOD })
@Retention(RUNTIME)
@Inherited
public @interface BeforehandRetry {
/**
* 执行重试的异常,默认是对BizRetryNeedException才去执行重试逻辑。业务异常是不需要重试的!!!
*/
Class<? extends Throwable> retryFor() default BizRetryNeedException.class;
/**
* 最大重试次数
*/
int maxRetryCount() default 3;
}
**例子参考:**com.kvn.dal.core.beforehand_retry.BeforehandRetryBizService.java
@Service
public class BeforehandRetryBizService {
@BeforehandRetry
public String doBiz(Foo foo, String param){
System.out.println("--->isRetryThread:" + ThreadContext.getContext().isRetryThread());
System.out.println("参数:Foo=" + JSON.toJSONString(foo) + ", param=" + param);
System.out.println("执行业务失败>>>>>>>>");
throw new BizRetryNeedException("业务失败,需要重试!!!");
}
}
事后重试,即执行业务出现异常后,对于我们需要重试的异常,将重试参数持久化到DB(表:job_retry),然后通过事后重试调度定时发起重试。
对于需要重试的类,可以通过实现 IRetrySupport 接口,或者继承 AbstractRetrySupport 类。
IRetrySupport.java
public interface IRetrySupport {
/**
* 重试
* @param retryContext 重试上下文
* @return 返回重试结果:true | false
*/
Boolean retry(AfterwardRetryContext retryContext);
}
例子参考:实现接口方式com.kvn.dal.core.afterward_retry.AfterwardRetryBizService.java
@Service
public class AfterwardRetryBizService implements IRetrySupport {
@Resource
IJobRetryDao jobRetryDao;
public void executeBiz() {
System.out.println(DateTime.now() + "--" + Thread.currentThread().getName() + "---------------doBizJob2222--------");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
Foo foo = new Foo(1001, "xxx");
try {
throw new RuntimeException("executeBiz异常,xxxxxxxxx");
} catch (Exception e) {
ArrayList<RetryParam> retryLs = new RetryParamListWrapper().buildRetryParam(foo).buildRetryParam("xxx").buildRetryParam("hehehe").toArrayList();
JobRetry retry = JobRetry.createJobRetry(this.getClass(), "key001", retryLs);
jobRetryDao.add(retry);
throw e;
}
}
@Override
public Boolean retry(AfterwardRetryContext retryContext) {
/** 实现重试逻辑 */
String retryDataKey = retryContext.getRetryDataKey();
List<RetryParam> paramLs = retryContext.getRetryParamLs();
Foo foo = paramLs.get(0).retoreParam(Foo.class);
String originParam1 = paramLs.get(1).retoreParam(String.class);
String originParam2 = paramLs.get(2).retoreParam(String.class);
// 或者
Foo foo2 = retryContext.getRetryParamValueMap().get(Foo.class).get(0);
String originParam_1 = retryContext.getRetryParamValueMap().get(String.class).get(0);
String originParam_2 = retryContext.getRetryParamValueMap().get(String.class).get(1);
return true;
}
}
例子参考:继承类的方式com.kvn.dal.core.afterward_retry.AfterwardRetryBestPracticeService.java
@Service
public class AfterwardRetryBestPracticeService extends AbstractRetrySupport {
public void executeBiz() {
System.out.println(DateTime.now() + "--" + Thread.currentThread().getName() + "---------------doBizJob2222--------");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
Foo foo = new Foo(1001, "xxx");
try {
throw new RuntimeException("executeBiz异常,xxxxxxxxx");
} catch (Exception e) {
this.retryEnqueue("key001", foo, "hehe", "morning");
throw e; // 出异常后,终止业务
}
}
@Override
public Boolean retry(AfterwardRetryContext retryContext) {
// 实现重试逻辑
return true;
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
1. 开源生态
2. 协作、人、软件
3. 评估模型