From f2d19b5920b490fe20932bd366a31118ee354b88 Mon Sep 17 00:00:00 2001 From: antergone Date: Sun, 10 Jul 2016 01:37:22 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cn/uncode/schedule/ConsoleManager.java | 6 +- .../uncode/schedule/DynamicTaskManager.java | 58 ++++-- .../cn/uncode/schedule/ZKScheduleManager.java | 24 +-- .../schedule/core/IScheduleDataManager.java | 2 +- .../core/ScheduledMethodRunnable.java | 1 + .../cn/uncode/schedule/core/TaskDefine.java | 7 +- .../java/cn/uncode/schedule/core/Version.java | 6 +- .../schedule/local/DynamicTaskManager.java | 130 ------------ .../cn/uncode/schedule/util/ScheduleUtil.java | 6 +- .../schedule/zk/IScheduleDataManager.java | 53 ----- .../schedule/zk/ScheduleDataManager4ZK.java | 19 +- .../cn/uncode/schedule/zk/ScheduleServer.java | 195 ------------------ .../cn/uncode/schedule/zk/TaskDefine.java | 101 --------- .../java/cn/uncode/schedule/zk/Version.java | 23 --- .../java/cn/uncode/schedule/zk/ZKManager.java | 20 +- .../cn/uncode/schedule/ZookeeperTest.java | 10 +- 16 files changed, 79 insertions(+), 582 deletions(-) delete mode 100644 src/main/java/cn/uncode/schedule/local/DynamicTaskManager.java delete mode 100644 src/main/java/cn/uncode/schedule/zk/IScheduleDataManager.java delete mode 100644 src/main/java/cn/uncode/schedule/zk/ScheduleServer.java delete mode 100644 src/main/java/cn/uncode/schedule/zk/TaskDefine.java delete mode 100644 src/main/java/cn/uncode/schedule/zk/Version.java diff --git a/src/main/java/cn/uncode/schedule/ConsoleManager.java b/src/main/java/cn/uncode/schedule/ConsoleManager.java index 8de4951..7a933bb 100644 --- a/src/main/java/cn/uncode/schedule/ConsoleManager.java +++ b/src/main/java/cn/uncode/schedule/ConsoleManager.java @@ -20,14 +20,16 @@ public class ConsoleManager { public static ZKScheduleManager getScheduleManager() throws Exception { if(null == ConsoleManager.scheduleManager){ - ConsoleManager.scheduleManager = (ZKScheduleManager)ZKScheduleManager.getApplicationcontext().getBean(ZKScheduleManager.class); + synchronized(ConsoleManager.class) { + ConsoleManager.scheduleManager = ZKScheduleManager.getApplicationcontext().getBean(ZKScheduleManager.class); + } } return ConsoleManager.scheduleManager; } public static void addScheduleTask(TaskDefine taskDefine) { try { - ConsoleManager.scheduleManager.getScheduleDataManager().addTask(taskDefine); + ConsoleManager.getScheduleManager().getScheduleDataManager().addTask(taskDefine); } catch (Exception e) { log.error(e.getMessage(), e); } diff --git a/src/main/java/cn/uncode/schedule/DynamicTaskManager.java b/src/main/java/cn/uncode/schedule/DynamicTaskManager.java index 8c468f4..54e3e4d 100644 --- a/src/main/java/cn/uncode/schedule/DynamicTaskManager.java +++ b/src/main/java/cn/uncode/schedule/DynamicTaskManager.java @@ -14,6 +14,7 @@ import org.springframework.aop.framework.AopProxyUtils; import org.springframework.aop.support.AopUtils; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; +import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; import cn.uncode.schedule.core.ScheduledMethodRunnable; @@ -105,34 +106,49 @@ public class DynamicTaskManager { */ private static ScheduledMethodRunnable buildScheduledRunnable(String targetBean, String targetMethod, String params){ Object bean = null; - Method method = null; ScheduledMethodRunnable scheduledMethodRunnable = null; try { - ConsoleManager.getScheduleManager(); bean = ZKScheduleManager.getApplicationcontext().getBean(targetBean); - if(bean != null){ - Class clazz = null; - if(AopUtils.isAopProxy(bean)){ - clazz = AopProxyUtils.ultimateTargetClass(bean); - //method = ReflectionUtils.findMethod(AopProxyUtils.ultimateTargetClass(bean), targetMethod); - }else{ - clazz = bean.getClass(); - //method = ReflectionUtils.findMethod(bean.getClass(), targetMethod); - } - if(params != null){ - method = ReflectionUtils.findMethod(clazz, targetMethod,String.class); - }else{ - method = ReflectionUtils.findMethod(clazz, targetMethod); - } - if(method != null){ - scheduledMethodRunnable = new ScheduledMethodRunnable(bean, method, params); - } - } + scheduledMethodRunnable = _buildScheduledRunnable(bean, targetMethod, params); } catch (Exception e) { LOGGER.debug(e.getLocalizedMessage(), e); } return scheduledMethodRunnable; } - + private static ScheduledMethodRunnable buildScheduledRunnable(Object bean, String targetMethod, String params){ + ScheduledMethodRunnable scheduledMethodRunnable = null; + try { + scheduledMethodRunnable = _buildScheduledRunnable(bean, targetMethod, params); + }catch (Exception e){ + LOGGER.debug(e.getLocalizedMessage(), e); + } + return scheduledMethodRunnable; + } + + + private static ScheduledMethodRunnable _buildScheduledRunnable(Object bean, String targetMethod, String params) throws Exception { + + Assert.notNull(bean, "target object must not be null"); + Assert.hasLength(targetMethod, "Method name must not be empty"); + + Method method; + ScheduledMethodRunnable scheduledMethodRunnable; + + Class clazz; + if (AopUtils.isAopProxy(bean)) { + clazz = AopProxyUtils.ultimateTargetClass(bean); + } else { + clazz = bean.getClass(); + } + if (params != null) { + method = ReflectionUtils.findMethod(clazz, targetMethod, String.class); + } else { + method = ReflectionUtils.findMethod(clazz, targetMethod); + } + + Assert.notNull(method, "can not find method named " + targetMethod); + scheduledMethodRunnable = new ScheduledMethodRunnable(bean, method, params); + return scheduledMethodRunnable; + } } diff --git a/src/main/java/cn/uncode/schedule/ZKScheduleManager.java b/src/main/java/cn/uncode/schedule/ZKScheduleManager.java index c18d173..c354304 100644 --- a/src/main/java/cn/uncode/schedule/ZKScheduleManager.java +++ b/src/main/java/cn/uncode/schedule/ZKScheduleManager.java @@ -77,7 +77,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic protected boolean isStopSchedule = false; protected Lock registerLock = new ReentrantLock(); - volatile String errorMessage = "No config Zookeeper connect infomation"; + private volatile String errorMessage = "No config Zookeeper connect information"; private InitialThread initialThread; public ZKScheduleManager() { @@ -93,7 +93,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic } public void reInit(Properties p) throws Exception { - if (this.start == true || this.hearBeatTimer != null) { + if (this.start || this.hearBeatTimer != null) { throw new Exception("调度器有任务处理,不能重新初始化"); } this.init(p); @@ -123,7 +123,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic public void rewriteScheduleInfo() throws Exception { registerLock.lock(); try { - if (this.isStopSchedule == true) { + if (this.isStopSchedule) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("外部命令终止调度,不在注册调度服务,避免遗留垃圾数据:" + currenScheduleServer.getUuid()); @@ -134,8 +134,8 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic if (errorMessage != null) { this.currenScheduleServer.setDealInfoDesc(errorMessage); } - if (this.scheduleDataManager - .refreshScheduleServer(this.currenScheduleServer) == false) { + if (!this.scheduleDataManager + .refreshScheduleServer(this.currenScheduleServer)) { // 更新信息失败,清除内存数据后重新注册 this.clearMemoInfo(); this.scheduleDataManager.registerScheduleServer(this.currenScheduleServer); @@ -169,8 +169,8 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic public void assignScheduleTask() throws Exception { scheduleDataManager.clearExpireScheduleServer(); List serverList = scheduleDataManager.loadScheduleServerNames(); - if (scheduleDataManager.isLeader(this.currenScheduleServer.getUuid(), - serverList) == false) { + if (!scheduleDataManager.isLeader(this.currenScheduleServer.getUuid(), + serverList)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(this.currenScheduleServer.getUuid() + ":不是负责任务分配的Leader,直接返回"); @@ -197,7 +197,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic try { rewriteScheduleInfo(); // 如果任务信息没有初始化成功,不做任务相关的处理 - if (this.isScheduleServerRegister == false) { + if (!this.isScheduleServerRegister) { return; } @@ -229,7 +229,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic public void initialData() throws Exception { this.zkManager.initial(); this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager); - if (this.start == true) { + if (this.start) { // 注册调度管理器 this.scheduleDataManager.registerScheduleServer(this.currenScheduleServer); if (hearBeatTimer == null) { @@ -256,7 +256,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic String name = ScheduleUtil.getTaskNameFormBean(beanNames[0], targetMethod.getName()); boolean isOwner = false; try { - if(isScheduleServerRegister == false){ + if(!isScheduleServerRegister){ Thread.sleep(1000); } if(zkManager.checkZookeeperState()){ @@ -317,7 +317,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic sm.initLock.lock(); try { int count = 0; - while (sm.zkManager.checkZookeeperState() == false) { + while (!sm.zkManager.checkZookeeperState()) { count = count + 1; if (count % 50 == 0) { sm.errorMessage = "Zookeeper connecting ......" @@ -326,7 +326,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic log.error(sm.errorMessage); } Thread.sleep(20); - if (this.isStop == true) { + if (this.isStop) { return; } } diff --git a/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java b/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java index f19bbb0..fede905 100644 --- a/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java +++ b/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java @@ -13,7 +13,7 @@ public interface IScheduleDataManager{ /** * 发送心跳信息 - * + * * @param server * @throws Exception */ diff --git a/src/main/java/cn/uncode/schedule/core/ScheduledMethodRunnable.java b/src/main/java/cn/uncode/schedule/core/ScheduledMethodRunnable.java index dd4ea46..625f592 100644 --- a/src/main/java/cn/uncode/schedule/core/ScheduledMethodRunnable.java +++ b/src/main/java/cn/uncode/schedule/core/ScheduledMethodRunnable.java @@ -4,6 +4,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; +import cn.uncode.schedule.DynamicTaskManager; import org.springframework.util.ReflectionUtils; public class ScheduledMethodRunnable implements Runnable { diff --git a/src/main/java/cn/uncode/schedule/core/TaskDefine.java b/src/main/java/cn/uncode/schedule/core/TaskDefine.java index 0e64cba..9bd05e5 100644 --- a/src/main/java/cn/uncode/schedule/core/TaskDefine.java +++ b/src/main/java/cn/uncode/schedule/core/TaskDefine.java @@ -46,11 +46,8 @@ public class TaskDefine { */ private String type; - public boolean begin(Date sysTime){ - if(null == sysTime){ - return false; - } - return sysTime.after(startTime); + public boolean begin(Date sysTime) { + return null != sysTime && sysTime.after(startTime); } public String getTargetBean() { diff --git a/src/main/java/cn/uncode/schedule/core/Version.java b/src/main/java/cn/uncode/schedule/core/Version.java index 051ca1e..9303682 100644 --- a/src/main/java/cn/uncode/schedule/core/Version.java +++ b/src/main/java/cn/uncode/schedule/core/Version.java @@ -13,11 +13,7 @@ public class Version { return version; } public static boolean isCompatible(String dataVersion){ - if(version.compareTo(dataVersion)>=0){ - return true; - }else{ - return false; - } + return version.compareTo(dataVersion) >= 0; } } diff --git a/src/main/java/cn/uncode/schedule/local/DynamicTaskManager.java b/src/main/java/cn/uncode/schedule/local/DynamicTaskManager.java deleted file mode 100644 index 47225a3..0000000 --- a/src/main/java/cn/uncode/schedule/local/DynamicTaskManager.java +++ /dev/null @@ -1,130 +0,0 @@ -package cn.uncode.schedule.local; - -import java.lang.reflect.Method; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledFuture; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.aop.framework.AopProxyUtils; -import org.springframework.aop.support.AopUtils; -import org.springframework.scheduling.Trigger; -import org.springframework.scheduling.support.CronTrigger; -import org.springframework.scheduling.support.ScheduledMethodRunnable; -import org.springframework.util.ReflectionUtils; - -import cn.uncode.schedule.ConsoleManager; -import cn.uncode.schedule.zk.TaskDefine; - - - -public class DynamicTaskManager { - - private static final transient Logger LOGGER = LoggerFactory.getLogger(DynamicTaskManager.class); - - - private static final Map> SCHEDULE_FUTURES = new ConcurrentHashMap>(); - - - /** - * 启动定时任务 - * @param taskDefine - * @param currentTime - */ - public static void scheduleTask(TaskDefine taskDefine, Date currentTime){ - scheduleTask(taskDefine.getTargetBean(), taskDefine.getTargetMethod(), - taskDefine.getCronExpression(), taskDefine.getStartTime(), taskDefine.getPeriod()); - } - - public static void clearLocalTask(List existsTaskName){ - for(String name:SCHEDULE_FUTURES.keySet()){ - if(!existsTaskName.contains(name)){ - SCHEDULE_FUTURES.get(name).cancel(true); - SCHEDULE_FUTURES.remove(name); - } - } - } - - /** - * 启动定时任务 - * 支持: - * 1 cron时间表达式,立即执行 - * 2 startTime + period,指定时间,定时进行 - * 3 period,定时进行,立即开始 - * 4 startTime,指定时间执行 - * - * @param targetBean - * @param targetMethod - * @param cronExpression - * @param startTime - * @param period - */ - public static void scheduleTask(String targetBean, String targetMethod, String cronExpression, Date startTime, long period){ - String scheduleKey = buildScheduleKey(targetBean, targetMethod); - try { - ScheduledFuture scheduledFuture = null; - ScheduledMethodRunnable scheduledMethodRunnable = buildScheduledRunnable(targetBean, targetMethod); - if(scheduledMethodRunnable != null){ - if (!SCHEDULE_FUTURES.containsKey(scheduleKey)) { - if(StringUtils.isNotEmpty(cronExpression)){ - Trigger trigger = new CronTrigger(cronExpression); - scheduledFuture = ConsoleManager.getScheduleManager().schedule(scheduledMethodRunnable, trigger); - }else if(startTime != null){ - if(period > 0){ - scheduledFuture = ConsoleManager.getScheduleManager().scheduleAtFixedRate(scheduledMethodRunnable, startTime, period); - }else{ - scheduledFuture = ConsoleManager.getScheduleManager().schedule(scheduledMethodRunnable, startTime); - } - }else if(period > 0){ - scheduledFuture = ConsoleManager.getScheduleManager().scheduleAtFixedRate(scheduledMethodRunnable, period); - } - SCHEDULE_FUTURES.put(scheduleKey, scheduledFuture); - LOGGER.debug("Building new schedule task, target bean "+ targetBean + " target method " + targetMethod + "."); - } - }else{ - LOGGER.debug("Bean name is not exists."); - } - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - } - } - - - private static String buildScheduleKey(String targetBean, String targetMethod){ - return targetBean + "#" + targetMethod; - } - - /** - * 封装任务对象 - * @param targetBean - * @param targetMethod - * @return - */ - private static ScheduledMethodRunnable buildScheduledRunnable(String targetBean, String targetMethod){ - Object bean = null; - Method method = null; - ScheduledMethodRunnable scheduledMethodRunnable = null; - try { - bean = ConsoleManager.getScheduleManager().getApplicationcontext().getBean(targetBean); - if(bean != null){ - if(AopUtils.isAopProxy(bean)){ - method = ReflectionUtils.findMethod(AopProxyUtils.ultimateTargetClass(bean), targetMethod); - }else{ - method = ReflectionUtils.findMethod(bean.getClass(), targetMethod); - } - if(method != null){ - scheduledMethodRunnable = new ScheduledMethodRunnable(bean, method); - } - } - } catch (Exception e) { - LOGGER.debug(e.getLocalizedMessage(), e); - } - return scheduledMethodRunnable; - } - - -} diff --git a/src/main/java/cn/uncode/schedule/util/ScheduleUtil.java b/src/main/java/cn/uncode/schedule/util/ScheduleUtil.java index d43fbfa..6f82cbd 100644 --- a/src/main/java/cn/uncode/schedule/util/ScheduleUtil.java +++ b/src/main/java/cn/uncode/schedule/util/ScheduleUtil.java @@ -56,13 +56,13 @@ public class ScheduleUtil { return FORMAT.parse(d); } public static String getTaskTypeByBaseAndOwnSign(String baseType,String ownSign){ - if(ownSign.equals(OWN_SIGN_BASE) == true){ + if(ownSign.equals(OWN_SIGN_BASE)){ return baseType; } return baseType+"$" + ownSign; } public static String splitBaseTaskTypeFromTaskType(String taskType){ - if(taskType.indexOf("$") >=0){ + if(taskType.contains("$")){ return taskType.substring(0,taskType.indexOf("$")); }else{ return taskType; @@ -70,7 +70,7 @@ public class ScheduleUtil { } public static String splitOwnsignFromTaskType(String taskType){ - if(taskType.indexOf("$") >=0){ + if(taskType.contains("$")){ return taskType.substring(taskType.indexOf("$")+1); }else{ return OWN_SIGN_BASE; diff --git a/src/main/java/cn/uncode/schedule/zk/IScheduleDataManager.java b/src/main/java/cn/uncode/schedule/zk/IScheduleDataManager.java deleted file mode 100644 index 377e4a6..0000000 --- a/src/main/java/cn/uncode/schedule/zk/IScheduleDataManager.java +++ /dev/null @@ -1,53 +0,0 @@ -package cn.uncode.schedule.zk; - -import java.util.List; - - -/** - * 调度配置中心客户端接口,可以有基于数据库的实现,可以有基于ConfigServer的实现 - * - * @author juny.ye - * - */ -public interface IScheduleDataManager{ - - /** - * 发送心跳信息 - * - * @param server - * @throws Exception - */ - public boolean refreshScheduleServer(ScheduleServer server) throws Exception; - - /** - * 注册服务器 - * - * @param server - * @throws Exception - */ - public void registerScheduleServer(ScheduleServer server) throws Exception; - - - public boolean isLeader(String uuid,List serverList); - - - public void clearExpireScheduleServer() throws Exception; - - - public List loadScheduleServerNames() throws Exception; - - public void assignTask(String currentUuid, List taskServerList) throws Exception; - - public boolean isOwner(String name, String uuid)throws Exception; - - public void addTask(TaskDefine taskDefine)throws Exception; - - public void delTask(String targetBean, String targetMethod)throws Exception; - - public List selectTask()throws Exception; - - public boolean checkLocalTask(String currentUuid)throws Exception; - - - -} \ No newline at end of file diff --git a/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java b/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java index 16b4e08..09bb6de 100644 --- a/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java +++ b/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java @@ -320,10 +320,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { @Override public boolean isExistsTask(TaskDefine taskDefine) throws Exception{ String zkPath = this.pathTask+ "/" + taskDefine.stringKey(); - if(this.getZooKeeper().exists(zkPath, false) != null){ - return true; - } - return false; + return this.getZooKeeper().exists(zkPath, false) != null; } @Override @@ -368,8 +365,8 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { String zkPath = this.pathTask; List taskDefines = new ArrayList(); if(this.getZooKeeper().exists(zkPath,false) != null){ - List childrens = this.getZooKeeper().getChildren(zkPath, false); - for(String child:childrens){ + List childes = this.getZooKeeper().getChildren(zkPath, false); + for(String child:childes){ byte[] data = this.getZooKeeper().getData(zkPath+"/"+child, null, null); TaskDefine taskDefine = null; if (null != data) { @@ -378,15 +375,16 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { taskDefine.setType("uncode task"); }else{ String[] names = child.split("#"); - if(names != null && StringUtils.isNotEmpty(names[0])){ + if(StringUtils.isNotEmpty(names[0])){ taskDefine = new TaskDefine(); taskDefine.setTargetBean(names[0]); taskDefine.setTargetMethod(names[1]); taskDefine.setType("quartz/spring task"); } } + List sers = this.getZooKeeper().getChildren(zkPath+"/"+child, false); - if(sers != null && sers.size() > 0){ + if(taskDefine != null && sers != null && sers.size() > 0){ taskDefine.setCurrentServer(sers.get(0)); } taskDefines.add(taskDefine); @@ -402,8 +400,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { List children = this.getZooKeeper().getChildren(zkPath, false); List ownerTask = new ArrayList(); if(null != children && children.size() > 0){ - for(int i = 0; i < children.size(); i++){ - String taskName = children.get(i); + for (String taskName : children) { if (isOwner(taskName, currentUuid)) { String taskPath = zkPath + "/" + taskName; byte[] data = this.getZooKeeper().getData(taskPath, null, null); @@ -413,7 +410,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { ownerTask.add(taskName); DynamicTaskManager.scheduleTask(taskDefine, new Date(getSystemTime())); } - } + } } } DynamicTaskManager.clearLocalTask(ownerTask); diff --git a/src/main/java/cn/uncode/schedule/zk/ScheduleServer.java b/src/main/java/cn/uncode/schedule/zk/ScheduleServer.java deleted file mode 100644 index 7ec1ea6..0000000 --- a/src/main/java/cn/uncode/schedule/zk/ScheduleServer.java +++ /dev/null @@ -1,195 +0,0 @@ -package cn.uncode.schedule.zk; - -import java.sql.Timestamp; -import java.util.UUID; - -import cn.uncode.schedule.util.ScheduleUtil; - - -/** - * 调度服务器信息定义 - * - * @author juny.ye - * - */ -public class ScheduleServer { - /** - * 全局唯一编号 - */ - private String uuid; - - - private String ownSign; - /** - * 机器IP地址 - */ - private String ip; - - /** - * 机器名称 - */ - private String hostName; - - /** - * 服务开始时间 - */ - private Timestamp registerTime; - /** - * 最后一次心跳通知时间 - */ - private Timestamp heartBeatTime; - /** - * 最后一次取数据时间 - */ - private Timestamp lastFetchDataTime; - /** - * 处理描述信息,例如读取的任务数量,处理成功的任务数量,处理失败的数量,处理耗时 - * FetchDataCount=4430,FetcheDataNum=438570,DealDataSucess=438570,DealDataFail=0,DealSpendTime=651066 - */ - private String dealInfoDesc; - - private String nextRunStartTime; - - private String nextRunEndTime; - /** - * 配置中心的当前时间 - */ - private Timestamp centerServerTime; - - /** - * 数据版本号 - */ - private long version; - - private boolean isRegister; - - public ScheduleServer() { - - } - - public static ScheduleServer createScheduleServer(String aOwnSign){ - long currentTime = System.currentTimeMillis(); - ScheduleServer result = new ScheduleServer(); - result.ownSign = aOwnSign; - result.ip = ScheduleUtil.getLocalIP(); - result.hostName = ScheduleUtil.getLocalHostName(); - result.registerTime = new Timestamp(currentTime); - result.heartBeatTime = null; - result.dealInfoDesc = "调度初始化"; - result.version = 0; - result.uuid = result.ip - + "$" - + (UUID.randomUUID().toString().replaceAll("-", "") - .toUpperCase()); - return result; - } - - public String getUuid() { - return uuid; - } - - public void setUuid(String uuid) { - this.uuid = uuid; - } - - public long getVersion() { - return version; - } - - public void setVersion(long version) { - this.version = version; - } - - - public Timestamp getRegisterTime() { - return registerTime; - } - - public void setRegisterTime(Timestamp registerTime) { - this.registerTime = registerTime; - } - - public Timestamp getHeartBeatTime() { - return heartBeatTime; - } - - public void setHeartBeatTime(Timestamp heartBeatTime) { - this.heartBeatTime = heartBeatTime; - } - - public Timestamp getLastFetchDataTime() { - return lastFetchDataTime; - } - - public void setLastFetchDataTime(Timestamp lastFetchDataTime) { - this.lastFetchDataTime = lastFetchDataTime; - } - - public String getDealInfoDesc() { - return dealInfoDesc; - } - - public void setDealInfoDesc(String dealInfoDesc) { - this.dealInfoDesc = dealInfoDesc; - } - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public String getHostName() { - return hostName; - } - - public void setHostName(String hostName) { - this.hostName = hostName; - } - - - public Timestamp getCenterServerTime() { - return centerServerTime; - } - - public void setCenterServerTime(Timestamp centerServerTime) { - this.centerServerTime = centerServerTime; - } - - public String getNextRunStartTime() { - return nextRunStartTime; - } - - public void setNextRunStartTime(String nextRunStartTime) { - this.nextRunStartTime = nextRunStartTime; - } - - public String getNextRunEndTime() { - return nextRunEndTime; - } - - public void setNextRunEndTime(String nextRunEndTime) { - this.nextRunEndTime = nextRunEndTime; - } - - public String getOwnSign() { - return ownSign; - } - - public void setOwnSign(String ownSign) { - this.ownSign = ownSign; - } - - - public void setRegister(boolean isRegister) { - this.isRegister = isRegister; - } - - public boolean isRegister() { - return isRegister; - } - - -} \ No newline at end of file diff --git a/src/main/java/cn/uncode/schedule/zk/TaskDefine.java b/src/main/java/cn/uncode/schedule/zk/TaskDefine.java deleted file mode 100644 index 5cb3a00..0000000 --- a/src/main/java/cn/uncode/schedule/zk/TaskDefine.java +++ /dev/null @@ -1,101 +0,0 @@ -package cn.uncode.schedule.zk; - -import java.util.Date; - -/** - * 任务定义,提供关键信息给使用者 - * @author juny.ye - * - */ -public class TaskDefine { - - /** - * 目标bean - */ - private String targetBean; - - /** - * 目标方法 - */ - private String targetMethod; - - /** - * cron表达式 - */ - private String cronExpression; - - /** - * 开始时间 - */ - private Date startTime; - - /** - * 周期(秒) - */ - private long period; - - private String currentServer; - - - - public boolean begin(Date sysTime){ - if(null == sysTime){ - return false; - } - return sysTime.after(startTime); - } - - public String getTargetBean() { - return targetBean; - } - - public void setTargetBean(String targetBean) { - this.targetBean = targetBean; - } - - public String getTargetMethod() { - return targetMethod; - } - - public void setTargetMethod(String targetMethod) { - this.targetMethod = targetMethod; - } - - public String getCronExpression() { - return cronExpression; - } - - public void setCronExpression(String cronExpression) { - this.cronExpression = cronExpression; - } - - public Date getStartTime() { - return startTime; - } - - public void setStartTime(Date startTime) { - this.startTime = startTime; - } - - public long getPeriod() { - return period; - } - - public void setPeriod(long period) { - this.period = period; - } - - public String getCurrentServer() { - return currentServer; - } - - public void setCurrentServer(String currentServer) { - this.currentServer = currentServer; - } - - - - - - -} \ No newline at end of file diff --git a/src/main/java/cn/uncode/schedule/zk/Version.java b/src/main/java/cn/uncode/schedule/zk/Version.java deleted file mode 100644 index 1c0663a..0000000 --- a/src/main/java/cn/uncode/schedule/zk/Version.java +++ /dev/null @@ -1,23 +0,0 @@ -package cn.uncode.schedule.zk; - -/** - * - * @author juny.ye - * - */ -public class Version { - - public final static String version="uncode-schedule-1.0.0"; - - public static String getVersion(){ - return version; - } - public static boolean isCompatible(String dataVersion){ - if(version.compareTo(dataVersion)>=0){ - return true; - }else{ - return false; - } - } - -} diff --git a/src/main/java/cn/uncode/schedule/zk/ZKManager.java b/src/main/java/cn/uncode/schedule/zk/ZKManager.java index 4dbcad7..2f3a1b6 100644 --- a/src/main/java/cn/uncode/schedule/zk/ZKManager.java +++ b/src/main/java/cn/uncode/schedule/zk/ZKManager.java @@ -35,7 +35,7 @@ public class ZKManager{ private ZooKeeper zk; private List acl = new ArrayList(); private Properties properties; - private boolean isCheckParentPath = true; + public enum keys { zkConnectString, rootPath, userName, password, zkSessionTimeout, autoRegisterTask, ipBlacklist } @@ -126,24 +126,19 @@ public class ZKManager{ } public void initial() throws Exception { //当zk状态正常后才能调用 + checkParent(zk,this.getRootPath()); if(zk.exists(this.getRootPath(), false) == null){ ZKTools.createPath(zk, this.getRootPath(), CreateMode.PERSISTENT, acl); - if(isCheckParentPath == true){ - checkParent(zk,this.getRootPath()); - } //设置版本信息 zk.setData(this.getRootPath(),Version.getVersion().getBytes(),-1); }else{ //先校验父亲节点,本身是否已经是schedule的目录 - if(isCheckParentPath == true){ - checkParent(zk,this.getRootPath()); - } byte[] value = zk.getData(this.getRootPath(), false, null); if(value == null){ zk.setData(this.getRootPath(),Version.getVersion().getBytes(),-1); }else{ String dataVersion = new String(value); - if(Version.isCompatible(dataVersion)==false){ + if(!Version.isCompatible(dataVersion)){ throw new Exception("TBSchedule程序版本 "+ Version.getVersion() +" 不兼容Zookeeper中的数据版本 " + dataVersion ); } log.info("当前的程序版本:" + Version.getVersion() + " 数据版本: " + dataVersion); @@ -155,18 +150,15 @@ public class ZKManager{ String zkPath = ""; for (int i =0;i< list.length -1;i++){ String str = list[i]; - if (str.equals("") == false) { + if (StringUtils.isNotEmpty(str)) { zkPath = zkPath + "/" + str; if (zk.exists(zkPath, false) != null) { byte[] value = zk.getData(zkPath, false, null); - if(value != null){ - String tmpVersion = new String(value); - if(tmpVersion.indexOf("uncode-schedule-") >=0){ + if(value != null && new String(value).contains("uncode-schedule-")){ throw new Exception("\"" + zkPath +"\" is already a schedule instance's root directory, its any subdirectory cannot as the root directory of others"); } } } - } } } @@ -174,7 +166,7 @@ public class ZKManager{ return acl; } public ZooKeeper getZooKeeper() throws Exception { - if(this.checkZookeeperState()==false){ + if(!this.checkZookeeperState()){ reConnection(); } return this.zk; diff --git a/src/test/java/cn/uncode/schedule/ZookeeperTest.java b/src/test/java/cn/uncode/schedule/ZookeeperTest.java index 0845329..eeed379 100644 --- a/src/test/java/cn/uncode/schedule/ZookeeperTest.java +++ b/src/test/java/cn/uncode/schedule/ZookeeperTest.java @@ -15,7 +15,7 @@ import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; -import cn.uncode.schedule.zk.TaskDefine; +import cn.uncode.schedule.core.TaskDefine; import cn.uncode.schedule.zk.ZKTools; @@ -31,8 +31,7 @@ public class ZookeeperTest { try { StringWriter writer = new StringWriter(); ZKTools.printTree(zk, "/uncode/schedule", writer, ""); - System.out - .println(i++ + "----" + writer.getBuffer().toString()); + System.out.println(i++ + "----" + writer.toString()); Thread.sleep(2000); } catch (Exception e) { System.out.println(e.getMessage()); @@ -45,7 +44,7 @@ public class ZookeeperTest { ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null); StringWriter writer = new StringWriter(); ZKTools.printTree(zk, "/", writer, "\n"); - System.out.println(writer.getBuffer().toString()); + System.out.println(writer.toString()); } @Test @@ -65,8 +64,7 @@ public class ZookeeperTest { List acls = new ArrayList(); zk.addAuthInfo("digest", "ScheduleAdmin:password".getBytes()); acls.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", - DigestAuthenticationProvider - .generateDigest("ScheduleAdmin:password")))); + DigestAuthenticationProvider.generateDigest("ScheduleAdmin:password")))); acls.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE)); zk.create("/uncode/schedule/task/taskObj#print", new byte[0], acls, CreateMode.PERSISTENT); zk.getData("/uncode/schedule/task/taskObj#print", false, null); -- Gitee From f1285fc8b4e377161665e5487ecbd3f4d82e3184 Mon Sep 17 00:00:00 2001 From: Antergone Date: Sun, 10 Jul 2016 02:52:52 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/cn/uncode/schedule/DynamicTaskManager.java | 2 +- .../java/cn/uncode/schedule/ZKScheduleManager.java | 5 +++-- src/main/java/cn/uncode/schedule/zk/ZKTools.java | 11 ++++++----- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main/java/cn/uncode/schedule/DynamicTaskManager.java b/src/main/java/cn/uncode/schedule/DynamicTaskManager.java index 54e3e4d..8bf606a 100644 --- a/src/main/java/cn/uncode/schedule/DynamicTaskManager.java +++ b/src/main/java/cn/uncode/schedule/DynamicTaskManager.java @@ -105,7 +105,7 @@ public class DynamicTaskManager { * @return */ private static ScheduledMethodRunnable buildScheduledRunnable(String targetBean, String targetMethod, String params){ - Object bean = null; + Object bean; ScheduledMethodRunnable scheduledMethodRunnable = null; try { bean = ZKScheduleManager.getApplicationcontext().getBean(targetBean); diff --git a/src/main/java/cn/uncode/schedule/ZKScheduleManager.java b/src/main/java/cn/uncode/schedule/ZKScheduleManager.java index c354304..9d3b6a7 100644 --- a/src/main/java/cn/uncode/schedule/ZKScheduleManager.java +++ b/src/main/java/cn/uncode/schedule/ZKScheduleManager.java @@ -179,8 +179,9 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic } //黑名单 for(String ip:zkManager.getIpBlacklist()){ - if(serverList.contains(ip)){ - serverList.remove(ip); + int index = serverList.indexOf(ip); + if (index > -1){ + serverList.remove(index); } } // 设置初始化成功标准,避免在leader转换的时候,新增的线程组初始化失败 diff --git a/src/main/java/cn/uncode/schedule/zk/ZKTools.java b/src/main/java/cn/uncode/schedule/zk/ZKTools.java index 6304426..32fb8bd 100644 --- a/src/main/java/cn/uncode/schedule/zk/ZKTools.java +++ b/src/main/java/cn/uncode/schedule/zk/ZKTools.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; @@ -21,7 +22,7 @@ public class ZKTools { String[] list = path.split("/"); String zkPath = ""; for (String str : list) { - if (str.equals("") == false) { + if (StringUtils.isNotEmpty(str)) { zkPath = zkPath + "/" + str; if (zk.exists(zkPath, false) == null) { zk.create(zkPath, null, acl, createMode); @@ -59,15 +60,15 @@ public class ZKTools { while(index < dealList.size()){ String tempPath = dealList.get(index); List children = zk.getChildren(tempPath, false); - if(tempPath.equalsIgnoreCase("/") == false){ + if(!tempPath.equalsIgnoreCase("/")){ tempPath = tempPath +"/"; } Collections.sort(children); - for(int i = children.size() -1;i>=0;i--){ - dealList.add(index+1, tempPath + children.get(i)); + for (int i = children.size() - 1; i >= 0; i--) { + dealList.add(index + 1, tempPath + children.get(i)); } index++; } - return (String[])dealList.toArray(new String[0]); + return dealList.toArray(new String[0]); } } -- Gitee From ef9032182a7fbbd51b2cfb3155debd55e94d105e Mon Sep 17 00:00:00 2001 From: Antergone Date: Sun, 10 Jul 2016 18:29:54 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 ++ .../cn/uncode/schedule/ConsoleManager.java | 2 +- .../cn/uncode/schedule/ZKScheduleManager.java | 8 ++++---- .../java/cn/uncode/schedule/core/Version.java | 2 +- .../schedule/zk/ScheduleDataManager4ZK.java | 12 +++++------- .../java/cn/uncode/schedule/zk/ZKManager.java | 19 ++++++++++++------- .../java/cn/uncode/schedule/zk/ZKTools.java | 4 ++-- 7 files changed, 27 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index aebe684..eedb2bf 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ .classpath .project .settings/ +.idea/ +*.iml \ No newline at end of file diff --git a/src/main/java/cn/uncode/schedule/ConsoleManager.java b/src/main/java/cn/uncode/schedule/ConsoleManager.java index 7a933bb..b260ef6 100644 --- a/src/main/java/cn/uncode/schedule/ConsoleManager.java +++ b/src/main/java/cn/uncode/schedule/ConsoleManager.java @@ -12,7 +12,7 @@ import cn.uncode.schedule.core.TaskDefine; public class ConsoleManager { - protected static transient Logger log = LoggerFactory.getLogger(ConsoleManager.class); + private static transient Logger log = LoggerFactory.getLogger(ConsoleManager.class); // private static Gson GSON = new GsonBuilder().create(); diff --git a/src/main/java/cn/uncode/schedule/ZKScheduleManager.java b/src/main/java/cn/uncode/schedule/ZKScheduleManager.java index 9d3b6a7..8359d26 100644 --- a/src/main/java/cn/uncode/schedule/ZKScheduleManager.java +++ b/src/main/java/cn/uncode/schedule/ZKScheduleManager.java @@ -73,9 +73,9 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic private Map isOwnerMap = new ConcurrentHashMap(); private Timer hearBeatTimer; - protected Lock initLock = new ReentrantLock(); - protected boolean isStopSchedule = false; - protected Lock registerLock = new ReentrantLock(); + private Lock initLock = new ReentrantLock(); + private boolean isStopSchedule = false; + private Lock registerLock = new ReentrantLock(); private volatile String errorMessage = "No config Zookeeper connect information"; private InitialThread initialThread; @@ -120,7 +120,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic } } - public void rewriteScheduleInfo() throws Exception { + private void rewriteScheduleInfo() throws Exception { registerLock.lock(); try { if (this.isStopSchedule) { diff --git a/src/main/java/cn/uncode/schedule/core/Version.java b/src/main/java/cn/uncode/schedule/core/Version.java index 9303682..e8a1eca 100644 --- a/src/main/java/cn/uncode/schedule/core/Version.java +++ b/src/main/java/cn/uncode/schedule/core/Version.java @@ -7,7 +7,7 @@ package cn.uncode.schedule.core; */ public class Version { - public final static String version="uncode-schedule-1.0.0"; + private final static String version="uncode-schedule-1.0.0"; public static String getVersion(){ return version; diff --git a/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java b/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java index 09bb6de..710b9f4 100644 --- a/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java +++ b/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java @@ -103,7 +103,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { @Override public void registerScheduleServer(ScheduleServer server) throws Exception { - if(server.isRegister() == true){ + if(server.isRegister()){ throw new Exception(server.getUuid() + " 被重复注册"); } //clearExpireScheduleServer(); @@ -184,7 +184,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { @Override public void assignTask(String currentUuid, List taskServerList) throws Exception { - if(this.isLeader(currentUuid,taskServerList)==false){ + if(!this.isLeader(currentUuid, taskServerList)){ if(LOG.isDebugEnabled()){ LOG.debug(currentUuid +":不是负责任务分配的Leader,直接返回"); } @@ -223,7 +223,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { } ZKTools.deleteTree(this.getZooKeeper(), taskPath + "/" + serverId); } - if(hasAssignSuccess == false){ + if(!hasAssignSuccess){ assignServer2Task(taskServerList, taskPath); } } @@ -243,9 +243,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { String serverId = taskServerList.get(index); this.getZooKeeper().create(taskPath + "/" + serverId, null, this.zkManager.getAcl(),CreateMode.PERSISTENT); if(LOG.isDebugEnabled()){ - StringBuffer buffer = new StringBuffer(); - buffer.append("Assign server [").append(serverId).append("]").append(" to task [").append(taskPath).append("]"); - LOG.debug(buffer.toString()); + LOG.debug("Assign server [" + serverId + "]" + " to task [" + taskPath + "]"); } } @@ -288,7 +286,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { try { DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date date = (Date) format.parse(json.getAsString()); + Date date = format.parse(json.getAsString()); return new Timestamp(date.getTime()); } catch (Exception e) { throw new JsonParseException(e); diff --git a/src/main/java/cn/uncode/schedule/zk/ZKManager.java b/src/main/java/cn/uncode/schedule/zk/ZKManager.java index 2f3a1b6..b06e19b 100644 --- a/src/main/java/cn/uncode/schedule/zk/ZKManager.java +++ b/src/main/java/cn/uncode/schedule/zk/ZKManager.java @@ -36,7 +36,7 @@ public class ZKManager{ private List acl = new ArrayList(); private Properties properties; - public enum keys { + private enum keys { zkConnectString, rootPath, userName, password, zkSessionTimeout, autoRegisterTask, ipBlacklist } @@ -49,7 +49,7 @@ public class ZKManager{ * 重连zookeeper * @throws Exception */ - public synchronized void reConnection() throws Exception{ + private synchronized void reConnection() throws Exception{ if (this.zk != null) { this.zk.close(); this.zk = null; @@ -100,9 +100,10 @@ public class ZKManager{ this.zk.close(); } - public String getRootPath(){ + String getRootPath(){ return this.properties.getProperty(keys.rootPath.toString()); } + public List getIpBlacklist(){ List ips = new ArrayList(); String list = this.properties.getProperty(keys.ipBlacklist.toString()); @@ -114,16 +115,19 @@ public class ZKManager{ public String getConnectStr(){ return this.properties.getProperty(keys.zkConnectString.toString()); } - public boolean isAutoRegisterTask(){ + + boolean isAutoRegisterTask(){ String autoRegisterTask = this.properties.getProperty(keys.autoRegisterTask.toString()); if(StringUtils.isNotEmpty(autoRegisterTask)){ return Boolean.valueOf(autoRegisterTask); } return true; } + public boolean checkZookeeperState() throws Exception{ return zk != null && zk.getState() == States.CONNECTED; } + public void initial() throws Exception { //当zk状态正常后才能调用 checkParent(zk,this.getRootPath()); @@ -145,7 +149,7 @@ public class ZKManager{ } } } - public static void checkParent(ZooKeeper zk, String path) throws Exception { + private static void checkParent(ZooKeeper zk, String path) throws Exception { String[] list = path.split("/"); String zkPath = ""; for (int i =0;i< list.length -1;i++){ @@ -162,10 +166,11 @@ public class ZKManager{ } } - public List getAcl() { + List getAcl() { return acl; } - public ZooKeeper getZooKeeper() throws Exception { + + ZooKeeper getZooKeeper() throws Exception { if(!this.checkZookeeperState()){ reConnection(); } diff --git a/src/main/java/cn/uncode/schedule/zk/ZKTools.java b/src/main/java/cn/uncode/schedule/zk/ZKTools.java index 32fb8bd..8695fa9 100644 --- a/src/main/java/cn/uncode/schedule/zk/ZKTools.java +++ b/src/main/java/cn/uncode/schedule/zk/ZKTools.java @@ -18,7 +18,7 @@ import org.apache.zookeeper.data.Stat; * */ public class ZKTools { - public static void createPath(ZooKeeper zk, String path,CreateMode createMode, List acl) throws Exception { + static void createPath(ZooKeeper zk, String path, CreateMode createMode, List acl) throws Exception { String[] list = path.split("/"); String zkPath = ""; for (String str : list) { @@ -50,7 +50,7 @@ public class ZKTools { } } - public static String[] getTree(ZooKeeper zk,String path) throws Exception{ + private static String[] getTree(ZooKeeper zk, String path) throws Exception{ if(zk.exists(path, false) == null){ return new String[0]; } -- Gitee From f74ce9bd8dff490d0eaccce3f5aca807ed6c6bf6 Mon Sep 17 00:00:00 2001 From: antergone Date: Mon, 11 Jul 2016 21:50:11 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20UnRegisterScheduleServ?= =?UTF-8?q?er?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../schedule/core/IScheduleDataManager.java | 3 +- .../schedule/zk/ScheduleDataManager4ZK.java | 86 +++++++++++-------- .../java/cn/uncode/schedule/zk/ZKTools.java | 2 +- 3 files changed, 52 insertions(+), 39 deletions(-) diff --git a/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java b/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java index fede905..4008780 100644 --- a/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java +++ b/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java @@ -29,7 +29,8 @@ public interface IScheduleDataManager{ public boolean isLeader(String uuid,List serverList); - + + public void unRegisterScheduleServer(ScheduleServer server) throws Exception; public void clearExpireScheduleServer() throws Exception; diff --git a/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java b/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java index 09bb6de..e461b1d 100644 --- a/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java +++ b/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java @@ -103,11 +103,11 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { @Override public void registerScheduleServer(ScheduleServer server) throws Exception { - if(server.isRegister() == true){ + if(server.isRegister()){ throw new Exception(server.getUuid() + " 被重复注册"); } //clearExpireScheduleServer(); - String realPath = null; + String realPath; //此处必须增加UUID作为唯一性保障 StringBuffer id = new StringBuffer(); id.append(server.getIp()).append("$") @@ -149,20 +149,38 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { } } } - - public List loadScheduleServerNames(String taskType)throws Exception { - String zkPath = this.pathServer; - if (this.getZooKeeper().exists(zkPath, false) == null) { - return new ArrayList(); - } - List serverList = this.getZooKeeper().getChildren(zkPath, false); - Collections.sort(serverList, new Comparator() { - public int compare(String u1, String u2) { - return u1.substring(u1.lastIndexOf("$") + 1).compareTo( - u2.substring(u2.lastIndexOf("$") + 1)); + + + @Override + public void unRegisterScheduleServer(ScheduleServer server) throws Exception { + List serverList = this.loadScheduleServerNames(); + + if(server.isRegister() && this.isLeader(server.getUuid(), serverList)){ + //delete task + String zkPath = this.pathTask; + String serverPath = this.pathServer; + + if(this.getZooKeeper().exists(zkPath,false)== null){ + this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.PERSISTENT); } - }); - return serverList; + + //get all task + List children = this.getZooKeeper().getChildren(zkPath, false); + if(null != children && children.size() > 0){ + for (String taskName : children) { + String taskPath = zkPath + "/" + taskName; + if (this.getZooKeeper().exists(taskPath, false) != null) { + ZKTools.deleteTree(this.getZooKeeper(), taskPath + "/" + server.getUuid()); + } + } + } + + //删除 + if (this.getZooKeeper().exists(this.pathServer, false) == null) { + ZKTools.deleteTree(this.getZooKeeper(), serverPath + serverPath + "/" + server.getUuid()); + } + server.setRegister(false); + } } public List loadScheduleServerNames() throws Exception { @@ -180,11 +198,11 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { }); return serverList; } - + @Override public void assignTask(String currentUuid, List taskServerList) throws Exception { - if(this.isLeader(currentUuid,taskServerList)==false){ + if(!this.isLeader(currentUuid, taskServerList)){ if(LOG.isDebugEnabled()){ LOG.debug(currentUuid +":不是负责任务分配的Leader,直接返回"); } @@ -204,30 +222,29 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { } List children = this.getZooKeeper().getChildren(zkPath, false); if(null != children && children.size() > 0){ - for(int i = 0; i < children.size(); i++){ - String taskName = children.get(i); + for (String taskName : children) { String taskPath = zkPath + "/" + taskName; - if(this.getZooKeeper().exists(taskPath, false) == null){ - this.getZooKeeper().create(taskPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT); + if (this.getZooKeeper().exists(taskPath, false) == null) { + this.getZooKeeper().create(taskPath, null, this.zkManager.getAcl(), CreateMode.PERSISTENT); } - + List taskServerIds = this.getZooKeeper().getChildren(taskPath, false); - if(null == taskServerIds || taskServerIds.size() == 0){ + if (null == taskServerIds || taskServerIds.size() == 0) { assignServer2Task(taskServerList, taskPath); - }else{ + } else { boolean hasAssignSuccess = false; - for(String serverId:taskServerIds){ - if(taskServerList.contains(serverId)){ + for (String serverId : taskServerIds) { + if (taskServerList.contains(serverId)) { hasAssignSuccess = true; continue; } ZKTools.deleteTree(this.getZooKeeper(), taskPath + "/" + serverId); } - if(hasAssignSuccess == false){ + if (!hasAssignSuccess) { assignServer2Task(taskServerList, taskPath); } } - + } }else{ if(LOG.isDebugEnabled()){ @@ -243,9 +260,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { String serverId = taskServerList.get(index); this.getZooKeeper().create(taskPath + "/" + serverId, null, this.zkManager.getAcl(),CreateMode.PERSISTENT); if(LOG.isDebugEnabled()){ - StringBuffer buffer = new StringBuffer(); - buffer.append("Assign server [").append(serverId).append("]").append(" to task [").append(taskPath).append("]"); - LOG.debug(buffer.toString()); + LOG.debug("Assign server [" + serverId + "]" + " to task [" + taskPath + "]"); } } @@ -253,7 +268,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { return uuid.equals(getLeader(serverList)); } - public String getLeader(List serverList){ + private String getLeader(List serverList){ if(serverList == null || serverList.size() ==0){ return ""; } @@ -270,11 +285,11 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { return leader; } - public long getSystemTime(){ + private long getSystemTime(){ return this.zkBaseTime + ( System.currentTimeMillis() - this.loclaBaseTime); } - class TimestampTypeAdapter implements JsonSerializer, JsonDeserializer{ + private class TimestampTypeAdapter implements JsonSerializer, JsonDeserializer{ public JsonElement serialize(Timestamp src, Type arg1, JsonSerializationContext arg2) { DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateFormatAsString = format.format(new Date(src.getTime())); @@ -326,9 +341,6 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { @Override public void addTask(TaskDefine taskDefine) throws Exception { String zkPath = this.pathTask; - if(this.getZooKeeper().exists(zkPath,false)== null){ - this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.PERSISTENT); - } zkPath = zkPath + "/" + taskDefine.stringKey(); if(this.getZooKeeper().exists(zkPath, false) == null){ this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.PERSISTENT); diff --git a/src/main/java/cn/uncode/schedule/zk/ZKTools.java b/src/main/java/cn/uncode/schedule/zk/ZKTools.java index 6304426..c4bc581 100644 --- a/src/main/java/cn/uncode/schedule/zk/ZKTools.java +++ b/src/main/java/cn/uncode/schedule/zk/ZKTools.java @@ -59,7 +59,7 @@ public class ZKTools { while(index < dealList.size()){ String tempPath = dealList.get(index); List children = zk.getChildren(tempPath, false); - if(tempPath.equalsIgnoreCase("/") == false){ + if(!tempPath.equalsIgnoreCase("/")){ tempPath = tempPath +"/"; } Collections.sort(children); -- Gitee