分布式定时任务调度。

xxl-job

https://www.xuxueli.com/xxl-job/

基于xxl-job二次开发

  • i-scheduler-spring-boot-starter
    其它web服务只需引用此starter包即可。
    依赖i-scheduler-core。
  • i-scheduler-core
    starter组件核心模块,提供其它服务需要实现的BaseSchedule,HandlerPoolHelper通过反射的方式调用具体的Schedule任务。
  • i-scheduler-admin
    分布式任务调度中心。
  • i-scheduler-web
    管理页面。

i-scheduler-spring-boot-starter

ScheduleAutoConfiguration

1
2
// springboot自动装配
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.haier.schedule.starter.ScheduleAutoConfiguration
  1. 声明i-scheduler组件所需的Bean
  2. 添加SchedulerInterceptor拦截器,以便任务调度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Configuration
public class ScheduleAutoConfiguration implements WebMvcConfigurer {
@Value("${server.port:8080}")
private String port;

private String adminAddress = "http://scheduler-admin.i-scheduler:8080/api";

@Value("${server.servlet.context-path:}")
private String contextPath;

@Value("${scheduler.admin.open:true}")
private Boolean open;

@Value("${scheduler.job.open:true}")
private Boolean jobOpen;

@Lazy
@Autowired
private SchedulerInterceptor schedulerInterceptor;

public void addInterceptors(InterceptorRegistry registry) {
if (this.open != null && !this.open.booleanValue())
return;
if (this.jobOpen != null && !this.jobOpen.booleanValue())
return;
registry.addInterceptor(this.schedulerInterceptor)

.addPathPatterns(new String[] { "/**" });
}

// 拦截器
@Bean
public SchedulerInterceptor schedulerInterceptor() {
return new SchedulerInterceptor();
}

@Bean
public ReportPoolHelper reportPoolHelper() {
return new ReportPoolHelper(this.adminAddress);
}
@Bean
public HandlerPoolHelper handlerPoolHelper() {
return new HandlerPoolHelper();
}

// 业务服务定时任务执行器
@Bean
public ScheduleExecutor executor() {
return new ScheduleExecutor(this.port, this.adminAddress, this.contextPath, this.open, this.jobOpen);
}

// 切面
@Bean
public ScheduleAspect aspect() {
return new ScheduleAspect(this.adminAddress);
}

// 运行器
@Bean
public SchedulerRunner runner() {
return new SchedulerRunner(this.adminAddress, this.contextPath, this.port, this.open);
}
}

SchedulerInterceptor

拦截器,处理i-scheduler-admin传递的调度请求,以反射的方式调用实际需要执行的任务方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class SchedulerInterceptor implements HandlerInterceptor {
@Autowired
private HandlerPoolHelper handlerPoolHelper;

public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String servletPath = request.getServletPath();
if (!servletPath.startsWith("/i_scheduler"))
// 非调度请求,执行其它的拦截器
return true;
String path = servletPath.replaceAll("/i_scheduler/", "");
if ("ping".equals(path)) {
response.setStatus(200);
return false;
}
if (path.startsWith("stop")) {
Long id;
String idStr = path.substring(path.indexOf('/') + 1);
try {
id = Long.valueOf(Long.parseLong(idStr));
} catch (Exception e) {
return false;
}
SchedulerContext schedulerContext = ScheduleAspect.getSchedulerContext(id);
if (schedulerContext != null)
schedulerContext.setStop(true);
return false;
}

// 反射 获取BaseSchedule
MethodHandler methodHandler = ScheduleExecutor.getMethod(path);
if (methodHandler == null) {
response.setStatus(404);
return false;
}
byte[] bodyBytes = StreamUtils.copyToByteArray((InputStream)request.getInputStream());
String body = new String(bodyBytes, request.getCharacterEncoding());
Parameter parameter = (Parameter)JacksonUtil.readValue(body, Parameter.class);
// 调度任务线程池执行任务
this.handlerPoolHelper.addBiz(methodHandler, new Object[] { parameter });
response.setStatus(200);

// 停止拦截器的链式调用
return false;
}
}

ScheduleExecutor

  1. ApplicationContextAware
    应用程序上下文感知接口。通过实现该接口,我们可以获取ApplicationContext对象,进而获取BeanDefinitionRegistry等底层结构。
    通过重写 void setApplicationContext(ApplicationContext context) 方法,我们可以在Bean初始化之后立即获取ApplicationContext。
  2. SmartInitializingSingleton
    Spring的一个感知接口,用于在Spring容器完成所有的初始化Bean之后,执行一段代码。
    通过实现SmartInitializingSingleton接口并重写afterSingletonsInstantiated()方法,我们可以在容器初始化完成后立即执行一段逻辑。
  3. DisposableBean
    在Bean生命周期结束前调用destory()方法做一些收尾工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class ScheduleExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static Logger log = LoggerFactory.getLogger(ScheduleExecutor.class);

private static ApplicationContext applicationContext;

private static Map<String, MethodHandler> schedulerMethods = new ConcurrentHashMap<>();

private String port;

private String adminAddress;

private String contextPath;

private Boolean open;

private Boolean jobOpen;

@Autowired
private ReportPoolHelper reportPoolHelper;

@Autowired
private HandlerPoolHelper handlerPoolHelper;

public ScheduleExecutor() {}

public ScheduleExecutor(String port, String adminAddress, String contextPath, Boolean open, Boolean jobOpen) {
this.port = port;
this.adminAddress = adminAddress;
this.contextPath = contextPath;
this.open = open;
this.jobOpen = jobOpen;
}

@Override
public void destroy() {
// 结束执行器
this.handlerPoolHelper.stop();
// 结束上报器
this.reportPoolHelper.stop();
}

// 在容器初始化完成后立即执行一段逻辑
@Override
public void afterSingletonsInstantiated() {
if (this.open != null && !this.open.booleanValue())
return;
Map<String, BaseSchedule> schedulerBeans = null;
if (this.jobOpen == null || this.jobOpen.booleanValue())
schedulerBeans = applicationContext.getBeansOfType(BaseSchedule.class);
RegisterParam register = new RegisterParam();
register.setPort(this.port);
register.setAppName(System.getenv("APP_NAME"));
register.setClusterName(System.getenv("CLUSTER_NAME"));
register.setProject(System.getenv("TENANT_NAME"));
register.setNamespace(System.getenv("POD_NAMESPACE"));
register.setContextPath(this.contextPath);
register.setSchedulerList(new ArrayList());
if (schedulerBeans != null && !schedulerBeans.isEmpty())
for (BaseSchedule scheduler : schedulerBeans.values()) {
try {
Method doWork = scheduler.getClass().getDeclaredMethod("doWork", new Class[] { Parameter.class });
if (doWork == null)
continue;
String name = null;
String cron = null;
boolean cover = false;
Scheduler annotation = (Scheduler)AnnotationUtils.findAnnotation(doWork, Scheduler.class);
if (annotation != null) {
name = annotation.value();
cron = annotation.cron();
cover = annotation.cover();
}
if (name == null || name.length() == 0) {
name = ClassUtils.getUserClass(scheduler.getClass()).getSimpleName();
name = decapitalize(name);
}
if (schedulerMethods.containsKey(name))
throw new RuntimeException("存在重名任务,请修改");
register.getSchedulerList().add(new SchedulerConf(name, cron, Boolean.valueOf(cover)));
schedulerMethods.put(name, new MethodHandler(scheduler, doWork));
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
}

// 向i-scheduler-admin注册任务信息
HttpUtil.post(this.adminAddress + "/register", register);
log.info("定时任务信息注册完成");
this.reportPoolHelper.start();
this.handlerPoolHelper.start();
}

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this;
ScheduleExecutor.applicationContext = applicationContext;
}

public static ApplicationContext getApplicationContext() {
return applicationContext;
}

public static MethodHandler getMethod(String key) {
return schedulerMethods.get(key);
}

public String decapitalize(String name) {
if (name == null || name.length() == 0)
return name;
if (name.length() > 1 && Character.isUpperCase(name.charAt(1)) &&
Character.isUpperCase(name.charAt(0)))
return name;
char[] chars = name.toCharArray();
chars[0] = Character.toLowerCase(chars[0]);
return new String(chars);
}
}

SchedulerRunner

  • ApplicationRunner
    应用程序运行器接口。通过实现该接口,我们可以在应用启动后立即运行一段代码。
    需要重写 void run(ApplicationArguments args)方法,该方法会在SpringApplication启动后直接调用。

SpringBootApplication启动成功,向i-scheduler-admin调度中心注册应用程序执行器信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SchedulerRunner implements ApplicationRunner {
private static Logger log = LoggerFactory.getLogger(SchedulerRunner.class);

private String adminUrl;

private String contextPath;

private String port;

private Boolean open;

public SchedulerRunner() {}

public SchedulerRunner(String adminUrl, String contextPath, String port, Boolean open) {
this.adminUrl = adminUrl;
this.contextPath = contextPath;
this.port = port;
this.open = open;
}

public void run(ApplicationArguments args) throws Exception {
if (this.open != null && !this.open.booleanValue())
return;
RegisterParam register = new RegisterParam();
register.setPort(this.port);
register.setContextPath(this.contextPath);
register.setAppName(System.getenv("APP_NAME"));
register.setClusterName(System.getenv("CLUSTER_NAME"));
register.setAddress(String.format("http://%s:%s", new Object[] { System.getenv("POD_IP"), this.port }));
register.setProject(System.getenv("TENANT_NAME"));
register.setNamespace(System.getenv("POD_NAMESPACE"));
register.setPodName(System.getenv("POD_NAME"));
HttpUtil.post(this.adminUrl + "/exec/register", register);
log.info("执行器信息注册完成");
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Aspect
public class ScheduleAspect {
private String adminAddress;

private static Map<Long, SchedulerContext> schedulerContextMap = new ConcurrentHashMap<>();

public ScheduleAspect() {}

public ScheduleAspect(String adminAddress) {
this.adminAddress = adminAddress;
}

public static SchedulerContext getSchedulerContext(Long id) {
if (id == null)
return null;
return schedulerContextMap.get(id);
}

// @Scheduler注解切面
@Around("@annotation(com.haier.schedule.core.annotation.Scheduler)")
public void around(ProceedingJoinPoint point) {
Long execId = null;
CallbackParam callbackParam = new CallbackParam();
try {
Object arg = point.getArgs()[0];
if (!(arg instanceof Parameter)) {
point.proceed();
return;
}
Parameter param = (Parameter)arg;
callbackParam.setId(param.getId());
callbackParam.setTriggerId(param.getTriggerId());
callbackParam.setJobId(param.getJobId());
SchedulerContext schedulerContext = new SchedulerContext(param.getId().longValue(), param.getShardIndex().intValue(), param.getShardTotal().intValue());
SchedulerContext.setScheduleContext(schedulerContext);
execId = param.getId();
schedulerContextMap.put(execId, schedulerContext);

// 执行过程
point.proceed();

callbackParam.setStatus(Integer.valueOf(200));
} catch (Throwable e) {
callbackParam.setStatus(Integer.valueOf(500));
e.printStackTrace();
} finally {
if (execId != null)
schedulerContextMap.remove(execId);
}
// callback 回传本次任务执行情况
HttpUtil.post(this.adminAddress + "/callback", callbackParam);
}
}

i-scheduler-admin

i-scheduler-core