Quartz分布式调度的实现是去中心化的,需要依赖数据库在集群间同步调度状态,基于分布式锁实现一致性调度,而我们当前使用的xxl-job版本(1.9.x)的分布式调度又是基于Quartz实现,因此我们所了解到的xxl-job调度性能差,本质上就是Quartz的调度性能差。
在同一时刻需要触发的Job只有少量的情况下,我们看不到Quartz的性能缺陷,在Job数量明显增加情况下,我们就会发现,调度延迟会有明显增加。尽管横向扩展节点,调度延迟也不会降低,且整体调度性能没有明显好转,反而更糟糕。
我们将带着这些疑惑,分析Quartz 的分布式调度原理,以及过时恢复、故障恢复的实现原理。
核心调度流程分析
核心流程代码在org.quartz.core.QuartzSchedulerThread类的run方法,即首先是调度线程(只有一个线程)启动,QuartzSchedulerThread实例的run执行,源码如下。
class QuartzSchedulerThread{
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
// .....
(1)
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) {
// ....
(2)
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
} //.....
if (triggers != null && !triggers.isEmpty()) {
(3)
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
// wait......
}
// .....
if(goAhead) {
(4)
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} //....
}
(5)
for (int i = 0; i < bndles.size(); i++) {
// ..........
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
(6)
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
//....
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
}
流程分析:
- 检查当前线程池可用数量是否大于0,大于0才执行调度逻辑。
- 获取分布式锁,根据配置的maxBatchSize(默认maxBatchSize为1),获取30秒内需要调度的Job,根据执行时间排序,最后释放分布式锁。在获取Job时,会将状态由WAITING更新为ACQUIRED,更新成功才表示当时节点获取到了调度该Job的权限。
- 遍历调度这一批次任务,休眠等待到当前毫秒”至少有一个”Job(批量)需要触发执行。
- 获取分布式锁,批量触发调度,for遍历修改Job状态,由ACQUIRED改为EXECUTING状态,如果修改成功,则根据cron更新Job下一次执行时间,以及状态恢复为WAITING,最后释放分布式锁。
- for遍历执行这一批次Job,第(4)步将Job状态由ACQUIRED改为EXECUTING状态成功的Job,获取线程池,将Job放入线程池执行。
- 如果Job放入线程池失败,就获取一次分布式锁,修改Job状态为ERROR,然后释放锁。
假设只部署一个节点,且maxBatchSize最大为1,即不考虑分布式锁、不考虑批量的情况,那么quartz的调度流程可以简化为:
- 查询数据库,获取需要调度的job,并更新job的状态,由WAITING更新为ACQUIRED。
- 更新job状态,由ACQUIRED改为EXECUTING状态,如果修改成功,则根据cron更新Job下一次执行时间,以及状态恢复为WAITING。
- 如果第(2)步成功,则获取线程池,将Job放入线程池,线程池分配线程调用Job的execute方法执行Job。
- 继续循环步骤1。
故障恢复流程分析
故障恢复与错过恢复的启动入口都在JobStoreSupport#schedulerStarted方法中,由QuartzScheduler#start方法调用,源码如下。
class JobStoreSupport{
public void schedulerStarted()throws SchedulerException{
(1)
if(isClustered()){
//...
clusterManagementThread.initialize();
}else{
(2)
try{
recoverJobs();
}//....
}
(3)
misfireHandler=new MisfireHandler();
// ...
misfireHandler.initialize();
schedulerRunning=true;
//...
}
}
- (1)判断是否是集群模式,如果是则启动集群管理线程;
- (2)非集群模式当前线程执行recoverJobs方法恢复;
- (3)启动过时恢复处理线程;
分布式调度下的故障恢复流程:
- 定时每clusterCheckinInterval毫秒检测掉线的节点;
- 如果有节点掉线,加分布式锁(LOCK_TRIGGER_ACCESS),获取掉线的节点的调度记录;
- 如果调度状态为ACQUIRED,则将Job状态设置为WAITING;
- 释放分布式锁(LOCK_TRIGGER_ACCESS);
错过恢复流程分析
错过的原因主要有以下几种情况:
- 1、暂停调度后恢复调度;
- 2、线程池被用满之后可能会出现;
- 3、单节点重启;
错过恢复流程:
- 定时每misfireThreshold毫秒(超时阈值),获取分布式锁(LOCK_TRIGGER_ACCESS);
- 查询*WAITING状态且下次调度时间小于(当前时间 - misfireThreshold)的,截取maxMisfiresToHandleAtATime(数量)条记录;
- 根据策略:MISFIRE_INSTRUCTION_DO_NOTHING或MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,更新这些misfires记录的下次调度时间,如果是MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,则下次执行时间为当前时间,即立即执行一次,这样QuartzSchedulerThread线程就能马上获取到这个job触发调度;
- 释放分布式锁(LOCK_TRIGGER_ACCESS);