分布式定时任务调度。
xxl-job
https://www.xuxueli.com/xxl-job/
基于xxl-job二次开发
- i-scheduler-spring-boot-starter
其它web服务只需引用此starter包即可。
依赖i-scheduler-core。
- i-scheduler-core
starter组件核心模块,提供其它服务需要实现的BaseSchedule,HandlerPoolHelper通过反射的方式调用具体的Schedule任务。
- i-scheduler-admin
分布式任务调度中心。
- i-scheduler-web
管理页面。
i-scheduler-spring-boot-starter
ScheduleAutoConfiguration
1 2
| org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.haier.schedule.starter.ScheduleAutoConfiguration
|
- 声明i-scheduler组件所需的Bean
- 添加SchedulerInterceptor拦截器,以便任务调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| @Configuration public class ScheduleAutoConfiguration implements WebMvcConfigurer { @Value("${server.port:8080}") private String port; private String adminAddress = "http://scheduler-admin.i-scheduler:8080/api"; @Value("${server.servlet.context-path:}") private String contextPath; @Value("${scheduler.admin.open:true}") private Boolean open; @Value("${scheduler.job.open:true}") private Boolean jobOpen; @Lazy @Autowired private SchedulerInterceptor schedulerInterceptor; public void addInterceptors(InterceptorRegistry registry) { if (this.open != null && !this.open.booleanValue()) return; if (this.jobOpen != null && !this.jobOpen.booleanValue()) return; registry.addInterceptor(this.schedulerInterceptor)
.addPathPatterns(new String[] { "/**" }); } @Bean public SchedulerInterceptor schedulerInterceptor() { return new SchedulerInterceptor(); } @Bean public ReportPoolHelper reportPoolHelper() { return new ReportPoolHelper(this.adminAddress); } @Bean public HandlerPoolHelper handlerPoolHelper() { return new HandlerPoolHelper(); } @Bean public ScheduleExecutor executor() { return new ScheduleExecutor(this.port, this.adminAddress, this.contextPath, this.open, this.jobOpen); } @Bean public ScheduleAspect aspect() { return new ScheduleAspect(this.adminAddress); } @Bean public SchedulerRunner runner() { return new SchedulerRunner(this.adminAddress, this.contextPath, this.port, this.open); } }
|
SchedulerInterceptor
拦截器,处理i-scheduler-admin传递的调度请求,以反射的方式调用实际需要执行的任务方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class SchedulerInterceptor implements HandlerInterceptor { @Autowired private HandlerPoolHelper handlerPoolHelper; public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String servletPath = request.getServletPath(); if (!servletPath.startsWith("/i_scheduler")) return true; String path = servletPath.replaceAll("/i_scheduler/", ""); if ("ping".equals(path)) { response.setStatus(200); return false; } if (path.startsWith("stop")) { Long id; String idStr = path.substring(path.indexOf('/') + 1); try { id = Long.valueOf(Long.parseLong(idStr)); } catch (Exception e) { return false; } SchedulerContext schedulerContext = ScheduleAspect.getSchedulerContext(id); if (schedulerContext != null) schedulerContext.setStop(true); return false; }
MethodHandler methodHandler = ScheduleExecutor.getMethod(path); if (methodHandler == null) { response.setStatus(404); return false; } byte[] bodyBytes = StreamUtils.copyToByteArray((InputStream)request.getInputStream()); String body = new String(bodyBytes, request.getCharacterEncoding()); Parameter parameter = (Parameter)JacksonUtil.readValue(body, Parameter.class); this.handlerPoolHelper.addBiz(methodHandler, new Object[] { parameter }); response.setStatus(200);
return false; } }
|
ScheduleExecutor
- ApplicationContextAware
应用程序上下文感知接口。通过实现该接口,我们可以获取ApplicationContext对象,进而获取BeanDefinitionRegistry等底层结构。
通过重写 void setApplicationContext(ApplicationContext context) 方法,我们可以在Bean初始化之后立即获取ApplicationContext。
- SmartInitializingSingleton
Spring的一个感知接口,用于在Spring容器完成所有的初始化Bean之后,执行一段代码。
通过实现SmartInitializingSingleton接口并重写afterSingletonsInstantiated()方法,我们可以在容器初始化完成后立即执行一段逻辑。
- DisposableBean
在Bean生命周期结束前调用destory()方法做一些收尾工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| public class ScheduleExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean { private static Logger log = LoggerFactory.getLogger(ScheduleExecutor.class); private static ApplicationContext applicationContext; private static Map<String, MethodHandler> schedulerMethods = new ConcurrentHashMap<>(); private String port; private String adminAddress; private String contextPath; private Boolean open; private Boolean jobOpen; @Autowired private ReportPoolHelper reportPoolHelper; @Autowired private HandlerPoolHelper handlerPoolHelper; public ScheduleExecutor() {} public ScheduleExecutor(String port, String adminAddress, String contextPath, Boolean open, Boolean jobOpen) { this.port = port; this.adminAddress = adminAddress; this.contextPath = contextPath; this.open = open; this.jobOpen = jobOpen; }
@Override public void destroy() { this.handlerPoolHelper.stop(); this.reportPoolHelper.stop(); }
@Override public void afterSingletonsInstantiated() { if (this.open != null && !this.open.booleanValue()) return; Map<String, BaseSchedule> schedulerBeans = null; if (this.jobOpen == null || this.jobOpen.booleanValue()) schedulerBeans = applicationContext.getBeansOfType(BaseSchedule.class); RegisterParam register = new RegisterParam(); register.setPort(this.port); register.setAppName(System.getenv("APP_NAME")); register.setClusterName(System.getenv("CLUSTER_NAME")); register.setProject(System.getenv("TENANT_NAME")); register.setNamespace(System.getenv("POD_NAMESPACE")); register.setContextPath(this.contextPath); register.setSchedulerList(new ArrayList()); if (schedulerBeans != null && !schedulerBeans.isEmpty()) for (BaseSchedule scheduler : schedulerBeans.values()) { try { Method doWork = scheduler.getClass().getDeclaredMethod("doWork", new Class[] { Parameter.class }); if (doWork == null) continue; String name = null; String cron = null; boolean cover = false; Scheduler annotation = (Scheduler)AnnotationUtils.findAnnotation(doWork, Scheduler.class); if (annotation != null) { name = annotation.value(); cron = annotation.cron(); cover = annotation.cover(); } if (name == null || name.length() == 0) { name = ClassUtils.getUserClass(scheduler.getClass()).getSimpleName(); name = decapitalize(name); } if (schedulerMethods.containsKey(name)) throw new RuntimeException("存在重名任务,请修改"); register.getSchedulerList().add(new SchedulerConf(name, cron, Boolean.valueOf(cover))); schedulerMethods.put(name, new MethodHandler(scheduler, doWork)); } catch (NoSuchMethodException e) { e.printStackTrace(); } } HttpUtil.post(this.adminAddress + "/register", register); log.info("定时任务信息注册完成"); this.reportPoolHelper.start(); this.handlerPoolHelper.start(); } public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this; ScheduleExecutor.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; } public static MethodHandler getMethod(String key) { return schedulerMethods.get(key); } public String decapitalize(String name) { if (name == null || name.length() == 0) return name; if (name.length() > 1 && Character.isUpperCase(name.charAt(1)) && Character.isUpperCase(name.charAt(0))) return name; char[] chars = name.toCharArray(); chars[0] = Character.toLowerCase(chars[0]); return new String(chars); } }
|
SchedulerRunner
- ApplicationRunner
应用程序运行器接口。通过实现该接口,我们可以在应用启动后立即运行一段代码。
需要重写 void run(ApplicationArguments args)方法,该方法会在SpringApplication启动后直接调用。
SpringBootApplication启动成功,向i-scheduler-admin调度中心注册应用程序执行器信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class SchedulerRunner implements ApplicationRunner { private static Logger log = LoggerFactory.getLogger(SchedulerRunner.class); private String adminUrl; private String contextPath; private String port; private Boolean open; public SchedulerRunner() {} public SchedulerRunner(String adminUrl, String contextPath, String port, Boolean open) { this.adminUrl = adminUrl; this.contextPath = contextPath; this.port = port; this.open = open; } public void run(ApplicationArguments args) throws Exception { if (this.open != null && !this.open.booleanValue()) return; RegisterParam register = new RegisterParam(); register.setPort(this.port); register.setContextPath(this.contextPath); register.setAppName(System.getenv("APP_NAME")); register.setClusterName(System.getenv("CLUSTER_NAME")); register.setAddress(String.format("http://%s:%s", new Object[] { System.getenv("POD_IP"), this.port })); register.setProject(System.getenv("TENANT_NAME")); register.setNamespace(System.getenv("POD_NAMESPACE")); register.setPodName(System.getenv("POD_NAME")); HttpUtil.post(this.adminUrl + "/exec/register", register); log.info("执行器信息注册完成"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| @Aspect public class ScheduleAspect { private String adminAddress; private static Map<Long, SchedulerContext> schedulerContextMap = new ConcurrentHashMap<>(); public ScheduleAspect() {} public ScheduleAspect(String adminAddress) { this.adminAddress = adminAddress; } public static SchedulerContext getSchedulerContext(Long id) { if (id == null) return null; return schedulerContextMap.get(id); } @Around("@annotation(com.haier.schedule.core.annotation.Scheduler)") public void around(ProceedingJoinPoint point) { Long execId = null; CallbackParam callbackParam = new CallbackParam(); try { Object arg = point.getArgs()[0]; if (!(arg instanceof Parameter)) { point.proceed(); return; } Parameter param = (Parameter)arg; callbackParam.setId(param.getId()); callbackParam.setTriggerId(param.getTriggerId()); callbackParam.setJobId(param.getJobId()); SchedulerContext schedulerContext = new SchedulerContext(param.getId().longValue(), param.getShardIndex().intValue(), param.getShardTotal().intValue()); SchedulerContext.setScheduleContext(schedulerContext); execId = param.getId(); schedulerContextMap.put(execId, schedulerContext);
point.proceed();
callbackParam.setStatus(Integer.valueOf(200)); } catch (Throwable e) { callbackParam.setStatus(Integer.valueOf(500)); e.printStackTrace(); } finally { if (execId != null) schedulerContextMap.remove(execId); } HttpUtil.post(this.adminAddress + "/callback", callbackParam); } }
|
i-scheduler-admin
i-scheduler-core