文章目录
  1. 1. 一.quartz概述
  2. 2. 二.quartz启动流程
  3. 3. 三.QuartzSchedulerThread线程
  4. 4. 四.misfireHandler线程
  5. 5. 五.clusterManager线程
  6. 6. 六. 参考文献

一.quartz概述

quartz是一个用JAVA实现的开源的任务调度框架。quartz可以用来创建简单或复杂的任务调度,它包括了许多企业级的功能,如支持JTA transactions和集群等。quartz是现在最流行的JAVA任务调度框架。

quartz具有如下的特点

  1. 强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊的需求;
  2. 灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;
  3. 分布式和集群能力,支持集群运行,具有负载均衡、故障恢复等功能,伸缩性高,高可用。

此外,quartz是spring的默认调度框架,能够很容易的与spring集成。本文基于spring3.2.2和quartz2.2.1来讨论。注意 spring3.1以下的版本必须使用quartz1.x系列,3.1以上的版本才支持quartz 2.x,因为quartz1.x和quartz2.x有些地方不兼容。

quartz调度核心元素

  1. Scheduler:任务调度器,是实际执行任务调度的控制器。在spring中通过SchedulerFactoryBean封装起来。
  2. Trigger:触发器,用于定义任务调度的时间规则,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比较多,本文主要介绍这种方式。CronTrigger在spring中封装在CronTriggerFactoryBean中。
  3. Calendar:它是一些日历特定时间点的集合。一个trigger可以包含多个Calendar,以便排除或包含某些时间点。
  4. JobDetail:用来描述Job实现类及其它相关的静态信息,如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean两种实现,如果任务调度只需要执行某个类的某个方法,就可以通过MethodInvokingJobDetailFactoryBean来调用。
  5. Job:是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务,默认是无状态的,若要将Job设置成有状态的,在quartz中是给实现的Job添加@DisallowConcurrentExecution注解(以前是实现StatefulJob接口,现在已被Deprecated),在与spring结合中可以在spring配置文件的job detail中配置concurrent参数。

quartz集群配置:
quartz集群是通过数据库表来感知其他的应用的,各个节点之间并没有直接的通信。只有使用持久的JobStore才能完成Quartz集群。
数据库表:以前有12张表,现在只有11张表,现在没有存储listener相关的表,多了QRTZ_SIMPROP_TRIGGERS表:

Table name Description
QRTZ_CALENDARS 存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_JOB_DETAILS 存储每一个已配置的Job的详细信息
QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger作为Blob类型存储
QRTZ_TRIGGERS 存储已配置的Trigger的信息
QRTZ_SIMPROP_TRIGGERS

QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,包括以下几个锁:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS。

二.quartz启动流程

若quartz是配置在spring中,当服务器启动时,就会装载相关的bean。SchedulerFactoryBean实现了InitializingBean接口,因此在初始化bean的时候,会执行afterPropertiesSet方法,该方法将会调用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,通常用StdSchedulerFactory)创建Scheduler。SchedulerFactory在创建quartzScheduler的过程中,将会读取配置参数,初始化各个组件,关键组件如下:

  1. ThreadPool:一般是使用SimpleThreadPool,SimpleThreadPool创建了一定数量的WorkerThread实例来使得Job能够在线程中进行处理。WorkerThread是定义在SimpleThreadPool类中的内部类,它实质上就是一个线程。在SimpleThreadPool中有三个list:workers-存放池中所有的线程引用,availWorkers-存放所有空闲的线程,busyWorkers-存放所有工作中的线程;
    线程池的配置参数如下所示:

    1
    2
    3
    org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
    org.quartz.threadPool.threadCount=3
    org.quartz.threadPool.threadPriority=5
  2. JobStore:分为存储在内存的RAMJobStore和存储在数据库的JobStoreSupport(包括JobStoreTX和JobStoreCMT两种实现,JobStoreCMT是依赖于容器来进行事务的管理,而JobStoreTX是自己管理事务),若要使用集群要使用JobStoreSupport的方式;

  3. QuartzSchedulerThread:用来进行任务调度的线程,在初始化的时候paused=true,halted=false,虽然线程开始运行了,但是paused=true,线程会一直等待,直到start方法将paused置为false;

另外,SchedulerFactoryBean还实现了SmartLifeCycle接口,因此初始化完成后,会执行start()方法,该方法将主要会执行以下的几个动作:

  1. 创建ClusterManager线程并启动线程:该线程用来进行集群故障检测和处理,将在下文详细讨论;
  2. 创建MisfireHandler线程并启动线程:该线程用来进行misfire任务的处理,将在下文详细讨论;
  3. 置QuartzSchedulerThread的paused=false,调度线程才真正开始调度;

整个启动流程如下图:
quartz启动时序图

三.QuartzSchedulerThread线程

线程的主要逻辑代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
while (!halted.get()) {
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
List<TriggerFiredResult> bndle = qsRsrcs.getJobStore().triggersFired(triggers);
for(int i = 0;i < res.size();i++){
JobRunShell shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
qsRsrcs.getThreadPool().runInThread(shell);
}
}
  1. 先获取线程池中的可用线程数量(若没有可用的会阻塞,直到有可用的);
  2. 获取30m内要执行的trigger(即acquireNextTriggers):
    获取trigger的锁,通过select …for update方式实现;获取30m内(可配置)要执行的triggers(需要保证集群节点的时间一致),若@ConcurrentExectionDisallowed且列表存在该条trigger则跳过,否则更新trigger状态为ACQUIRED(刚开始为WAITING);插入firedTrigger表,状态为ACQUIRED;(注意:在RAMJobStore中,有个timeTriggers,排序方式是按触发时间nextFireTime排的;JobStoreSupport从数据库取出triggers时是按照nextFireTime排序);
  3. 等待直到获取的trigger中最先执行的trigger在2ms内;
  4. triggersFired:
    1)更新firedTrigger的status=EXECUTING;
    2)更新trigger下一次触发的时间;
    3)更新trigger的状态:无状态的trigger->WAITING,有状态的trigger->BLOCKED,若nextFireTime==null ->COMPLETE;
    4) commit connection,释放锁;
  5. 针对每个要执行的trigger,创建JobRunShell,并放入线程池执行:
    1)execute:执行job
    2)获取TRIGGER_ACCESS锁
    3)若是有状态的job:更新trigger状态:BLOCKED->WAITING,PAUSED_BLOCKED->BLOCKED
    4)若@PersistJobDataAfterExecution,则updateJobData
    5)删除firedTrigger
    6)commit connection,释放锁

线程执行流程如下图所示:
QuartzSchedulerThread时序图

任务调度执行过程中,trigger的状态变化如下图所示:
该图来自参考文献5

四.misfireHandler线程

下面这些原因可能造成 misfired job:

  1. 系统因为某些原因被重启。在系统关闭到重新启动之间的一段时间里,可能有些任务会被 misfire;
  2. Trigger 被暂停(suspend)的一段时间里,有些任务可能会被 misfire;
  3. 线程池中所有线程都被占用,导致任务无法被触发执行,造成 misfire;
  4. 有状态任务在下次触发时间到达时,上次执行还没有结束;为了处理 misfired job,Quartz 中为 trigger 定义了处理策略,主要有下面两种:MISFIRE_INSTRUCTION_FIRE_ONCE_NOW:针对 misfired job 马上执行一次;MISFIRE_INSTRUCTION_DO_NOTHING:忽略 misfired job,等待下次触发;默认是MISFIRE_INSTRUCTION_SMART_POLICY,该策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW线程默认1分钟执行一次;在一个事务中,默认一次最多recovery 20个;

执行流程:

  1. 若配置(默认为true,可配置)成获取锁前先检查是否有需要recovery的trigger,先获取misfireCount;
  2. 获取TRIGGER_ACCESS锁;
  3. hasMisfiredTriggersInState:获取misfired的trigger,默认一个事务里只能最大20个misfired trigger(可配置),misfired判断依据:status=waiting,next_fire_time < current_time-misfirethreshold(可配置,默认1min)
  4. notifyTriggerListenersMisfired
  5. updateAfterMisfire:获取misfire策略(默认是MISFIRE_INSTRUCTION_SMART_POLICY,该策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW),根据策略更新nextFireTime;
  6. 将nextFireTime等更新到trigger表;
  7. commit connection,释放锁8.如果还有更多的misfired,sleep短暂时间(为了集群负载均衡),否则sleep misfirethreshold时间,后继续轮询;

misfireHandler线程执行流程如下图所示:
misfireHandler线程时序图

五.clusterManager线程

初始化:
failedInstance=failed+self+firedTrigger表中的schedulerName在scheduler_state表中找不到的(孤儿)

线程执行:
每个服务器会定时(org.quartz.jobStore.clusterCheckinInterval这个时间)更新SCHEDULER_STATE表的LAST_CHECKIN_TIME,若这个字段远远超出了该更新的时间,则认为该服务器实例挂了;
注意:每个服务器实例有唯一的id,若配置为AUTO,则为hostname+current_time

线程执行的具体流程:

  1. 检查是否有超时的实例failedInstances;
  2. 更新该服务器实例的LAST_CHECKIN_TIME;
    若有超时的实例:
  3. 获取STATE_ACCESS锁;
  4. 获取超时的实例failedInstances;
  5. 获取TRIGGER_ACCESS锁;
  6. clusterRecover:
  • 针对每个failedInstances,通过instanceId获取每个实例的firedTriggers;
  • 针对每个firedTrigger:
    1) 更新trigger状态:
    BLOCKED->WAITING
    PAUSED_BLOCKED->PAUSED
    ACQUIRED->WAITING
    2) 若firedTrigger不是ACQUIRED状态(在执行状态),且jobRequestRecovery=true:
    创建一个SimpleTrigger,存储到trigger表,status=waiting,MISFIRE_INSTR=MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY.
    3) 删除firedTrigger

clusterManager线程执行时序图如下图所示:
clusterRecover线程时序图

六. 参考文献

  1. Quartz Documentation http://quartz-scheduler.org/documentation
  2. spring javadoc-api http://docs.spring.io/spring/docs/4.3.0.BUILD-SNAPSHOT/javadoc-api/
  3. 基于Quartz开发企业级任务调度应用 https://www.ibm.com/developerworks/cn/opensource/os-cn-quartz/
  4. quartz应用与集群原理分析 http://tech.meituan.com/mt-crm-quartz.html
  5. quartz详解2:quartz由浅入深 http://ecmcug.itpub.net/11627468/viewspace-1763498/
  6. quartz详解4:quartz线程管理 http://blog.itpub.net/11627468/viewspace-1766967/
  7. quartz学习笔记 http://www.cnblogs.com/yunxuange/archive/2012/08/28/2660141.html
  8. quartz集群调度机制调研及源码分析 http://demo.netfoucs.com/gklifg/article/details/27090179
文章目录
  1. 1. 一.quartz概述
  2. 2. 二.quartz启动流程
  3. 3. 三.QuartzSchedulerThread线程
  4. 4. 四.misfireHandler线程
  5. 5. 五.clusterManager线程
  6. 6. 六. 参考文献