Spring定时任务Quartz执行全过程源码解读
一、前言介绍在日常开辟中常常会用到定时任务,用来;库表扫描发送MQ、T+n账单结算、缓存数据更新、秒杀活动状态变更,等等。由于有了Spring的Schedule极大的方便了我们对这类场景的使用。那么,除了应用你还了解它多少呢;
[*]默认初始化多少个任务线程
[*]JobStore有几种实现,你平时用的都是哪个
[*]一个定时任务的执行流程简述下
蒙圈了吧,是不感觉平时只是使用了,根本没关注过这些。有种冲动赶紧搜索答案吧!但只是知道答案是没有多少意义的,扛不住问不说,也不了解原理。所以,假如你想真的提升自己技能,还是要从根本搞定。
二、案例工程
为了更好的做源码分析,我们将平时用的定时任务服务单独抽离出来。工程下载,关注公众号:bugstack虫洞栈,回复:源码分析
itstack-demo-code-schedule└── src ├── main │ ├── java │ │ └── org.itstack.demo │ │ ├── DemoTask.java │ │ └── JobImpl.java │ └── resources │ ├── props │ │ └── config.properties │ ├── spring │ │ └── spring-config-schedule-task.xml │ ├── logback.xml │ └── spring-config.xml └── test └── java └── org.itstack.demo.test ├── ApiTest.java ├── MyQuartz.java └── MyTask.java三、环境设置
[*]JDK 1.8
[*]IDEA 2019.3.1
[*]Spring 4.3.24.RELEASE
[*]quartz 2.3.2 {不同版本略有代码差异}
四、源码分析
org.quartz-scheduler quartz 2.3.2依靠于Spring版本升级quartz选择2.3.2,同时假如你如本文案例中所示使用xml设置任务。那么会有如下更改;
Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean
Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean
在正式分析前,可以看下quartz的默认设置,很多初始化动作都要从这里取得参数,同样你可以设置自己的设置文件。例如,当你的任务很多时,默认初始化的10个线程组不满足你的业务需求,就可以按需调整。
quart.properties
# Default Properties file for use by StdSchedulerFactory# to create a Quartz Scheduler Instance, if a different# properties file is not explicitly specified.#org.quartz.scheduler.instanceName: DefaultQuartzSchedulerorg.quartz.scheduler.rmi.export: falseorg.quartz.scheduler.rmi.proxy: falseorg.quartz.scheduler.wrapJobExecutionInUserTransaction: falseorg.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPoolorg.quartz.threadPool.threadCount: 10org.quartz.threadPool.threadPriority: 5org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: trueorg.quartz.jobStore.misfireThreshold: 60000org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore1. 从一个简单案例开始
平时我们使用Schedule基本都是注解或者xml设置文件,但是为了可以更简单的分析代码,我们从一个简单的Demo入手,放到main函数中。
DemoTask.java & 定义一个等候被执行的任务
public class DemoTask { private Logger logger = LoggerFactory.getLogger(DemoTask.class); public void execute() throws Exception{ logger.info("定时处理用户信息任务:0/5 * * * * ?"); }}MyTask.java & 测试类,将设置在xml中的代码抽离出来
public class MyTask { public static void main(String[] args) throws Exception { DemoTask demoTask = new DemoTask(); // 定义了;执行的内容 MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean(); methodInvokingJobDetailFactoryBean.setTargetObject(demoTask); methodInvokingJobDetailFactoryBean.setTargetMethod("execute"); methodInvokingJobDetailFactoryBean.setConcurrent(true); methodInvokingJobDetailFactoryBean.setName("demoTask"); methodInvokingJobDetailFactoryBean.afterPropertiesSet(); // 定义了;执行的筹划 CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean(); cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject()); cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?"); cronTriggerFactoryBean.setName("demoTask"); cronTriggerFactoryBean.afterPropertiesSet(); // 实现了;执行的功能 SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject()); schedulerFactoryBean.setAutoStartup(true); schedulerFactoryBean.afterPropertiesSet(); schedulerFactoryBean.start(); // 暂停住 System.in.read(); }}假如一切顺遂,那么会有如下效果:
2021-09-21 10:47:16.369 INFOorg.quartz.impl.StdSchedulerFactory - Using default implementation for ThreadExecutor2021-09-21 10:47:16.421 INFOorg.quartz.core.SchedulerSignalerImpl - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl2021-09-21 10:47:16.422 INFOorg.quartz.core.QuartzScheduler - Quartz Scheduler v.2.3.2 created.2021-09-21 10:47:16.423 INFOorg.quartz.simpl.RAMJobStore - RAMJobStore initialized.2021-09-21 10:47:16.424 INFOorg.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.2) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.NOT STARTED.Currently in standby mode.Number of jobs executed: 0Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.2021-09-21 10:47:16.424 INFOorg.quartz.impl.StdSchedulerFactory - Quartz scheduler 'QuartzScheduler' initialized from an externally provided properties instance.2021-09-21 10:47:16.424 INFOorg.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.22021-09-21 10:47:16.426 INFOorg.quartz.core.QuartzScheduler - JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@3e9b10102021-09-21 10:47:16.651 INFOorg.quartz.core.QuartzScheduler - Scheduler QuartzScheduler_$_NON_CLUSTERED started.九月 21, 2021 10:47:16 上午 org.springframework.scheduling.quartz.SchedulerFactoryBean startScheduler信息: Starting Quartz Scheduler now2021-09-21 10:47:20.321 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:25.001 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:30.000 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:35.001 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:40.000 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?Process finished with exit code -12. 定义执行内容(MethodInvokingJobDetailFactoryBean)
// 定义了;执行的内容MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);methodInvokingJobDetailFactoryBean.setTargetMethod("execute");methodInvokingJobDetailFactoryBean.setConcurrent(true);methodInvokingJobDetailFactoryBean.setName("demoTask");methodInvokingJobDetailFactoryBean.afterPropertiesSet();这块内容主要将我们的任务体(即待执行任务DemoTask)交给MethodInvokingJobDetailFactoryBean管理,首先设置须要信息;
[*]targetObject:目标对象bean,也就是demoTask
[*]targetMethod:目标方法name,也就是execute
[*]concurrent:是否并行执行,非并行执行任务,假如上一个任务没有执行完,下一刻不会执行
[*]name:xml设置非必传,源码中可以获取beanName
最后我们通过手动调用 afterPropertiesSet() 来模仿初始化。假如我们的类是交给 Spring 管理的,那么在实现了 InitializingBean 接口的类,在类设置信息加载后会自动执行 afterPropertiesSet() 。一般实现了 InitializingBean 接口的类,同时也会去实现 FactoryBean 接口,由于这个接口实现后就可以通过 T getObject() 获取自己自定义初始化的类。这也常常用在一些框架开辟中。
MethodInvokingJobDetailFactoryBean.afterPropertiesSet()
public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException { prepare(); // Use specific name if given, else fall back to bean name. String name = (this.name != null ? this.name : this.beanName); // Consider the concurrent flag to choose between stateful and stateless job. Class jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class); // Build JobDetail instance. JobDetailImpl jdi = new JobDetailImpl(); jdi.setName(name); jdi.setGroup(this.group); jdi.setJobClass((Class) jobClass); jdi.setDurability(true); jdi.getJobDataMap().put("methodInvoker", this); this.jobDetail = jdi; postProcessJobDetail(this.jobDetail);}
[*]源码168行: 根据是否并行执行选择任务类,这两个类都是MethodInvokingJobDetailFactoryBean的内部类,非并行执行的StatefulMethodInvokingJob只是继承MethodInvokingJob添加了标记注解。
[*]源码171行: 创建JobDetailImpl,添加任务明细信息,注意这类的jdi.setJobClass((Class) jobClass)实际就是MethodInvokingJob。MethodInvokingJob也是我们最终要反射调用执行的内容。
[*]源码177行: 初始化任务后赋值给this.jobDetail = jdi,也就是最终的类对象MethodInvokingJobDetailFactoryBean.getObject() @Override public JobDetail getObject() { return this.jobDetail; }
[*]源码:220行: 获取对象时返回 this.jobDetail,这也就解释了为什么 MethodInvokingJobDetailFactoryBean 初始化后直接赋值给了一个 JobDetail ;
3. 定义执行筹划(CronTriggerFactoryBeann)
// 定义了;执行的筹划CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");cronTriggerFactoryBean.setName("demoTask");cronTriggerFactoryBean.afterPropertiesSet();这一块主要定义任务的执行筹划,并将任务执行内容交给 CronTriggerFactoryBean 管理,同时设置须要信息;
[*]jobDetail:设置任务体,xml 中可以直接将对象赋值,硬编码中设置执行的 JobDetail 对象信息。也就是我们上面设置的 JobDetailImpl ,通过 getObject() 获取出来。
[*]cronExpression:筹划表达式;秒、分、时、日、月、周、年
CronTriggerFactoryBean.afterPropertiesSet()
@Overridepublic void afterPropertiesSet() throws ParseException { // ... 校验属性信息 CronTriggerImpl cti = new CronTriggerImpl(); cti.setName(this.name); cti.setGroup(this.group); if (this.jobDetail != null) { cti.setJobKey(this.jobDetail.getKey()); } cti.setJobDataMap(this.jobDataMap); cti.setStartTime(this.startTime); cti.setCronExpression(this.cronExpression); cti.setTimeZone(this.timeZone); cti.setCalendarName(this.calendarName); cti.setPriority(this.priority); cti.setMisfireInstruction(this.misfireInstruction); cti.setDescription(this.description); this.cronTrigger = cti;}
[*]源码237行: 创建触发器 CronTriggerImpl 并设置干系属性信息
[*]源码245行: 天生执行筹划类 cti.setCronExpression(this.cronExpression);
public void setCronExpression(String cronExpression) throws ParseException { TimeZone origTz = getTimeZone(); this.cronEx = new CronExpression(cronExpression); this.cronEx.setTimeZone(origTz);}CronExpression.java & 解析Cron表达式
protected void buildExpression(String expression) throws ParseException { expressionParsed = true; try { // ... 初始化 TreeSet xxx = new TreeSet(); int exprOn = SECOND; StringTokenizer exprsTok = new StringTokenizer(expression, " \t", false); while (exprsTok.hasMoreTokens() && exprOn instantiate()<li data-track="159">源码1323行: tp.initialize();</ul>SimpleThreadPool.initialize() & 这里的count是默认设置中的数量,可以更改
// create the worker threads and start them Iterator workerThreads = createWorkerThreads(count).iterator(); while(workerThreads.hasNext()) { WorkerThread wt = workerThreads.next(); wt.start(); availWorkers.add(wt); }5. 启动定时任务
案例中使用硬编码方式调用 schedulerFactoryBean.start() 启动线程服务。线程的协作通过Object sigLock来实现,关于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock唤醒的是只有线程QuartzSchedulerThread。焦点流程如下;
https://p6.toutiaoimg.com/large/pgc-image/79c76cec08654e24a15c063ea02bd327
这个启动过程中,焦点的代码类,如下;
[*]StdScheduler
[*]QuartzScheduler
[*]QuartzSchedulerThread
[*]ThreadPool
[*]RAMJobStore
[*]CronTriggerImpl
[*]JobRunShellFactory
QuartzScheduler.start() & 启动
public void start() throws SchedulerException { if (shuttingDown|| closed) { throw new SchedulerException( "The Scheduler cannot be restarted after shutdown() has been called."); } // QTZ-212 : calling new schedulerStarting() method on the listeners // right after entering start() notifySchedulerListenersStarting(); if (initialStart == null) { initialStart = new Date(); this.resources.getJobStore().schedulerStarted(); startPlugins(); } else { resources.getJobStore().schedulerResumed(); } // 唤醒线程 schedThread.togglePause(false); getLog().info( "Scheduler " + resources.getUniqueIdentifier() + " started."); notifySchedulerListenersStarted();}QuartzSchedulerThread.run() & 执行过程
@Overridepublic void run() { int acquiresFailed = 0; // 只有调用了halt()方法,才会退出这个死循环 while (!halted.get()) { try { // 一、假如是暂停状态,则循环超时等候1000毫秒 // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting).. // 阻塞直到有空闲的线程可用并返回可用的数量 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { List triggers; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { // 二、获取acquire状态的Trigger列表,也就是即将执行的任务 triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers } catch(){//...} if (triggers != null && !triggers.isEmpty()) { // 三:获取List第一个Trigger的下次触发时候 long triggerTime = triggers.get(0).getNextFireTime().getTime(); // 四:获取任务触发聚集 List res = qsRsrcs.getJobStore().triggersFired(triggers); // 五:设置Triggers为'executing'状态 qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); // 六:创建JobRunShell qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); // 七:执行Job qsRsrcs.getThreadPool().runInThread(shell) continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows con continue; // while (!halted) } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } qs = null; qsRsrcs = null;}
[*]源码391行: 创建JobRunShell,JobRunShell实例在initialize()方法就会把包含业务逻辑类的JobDetailImpl设置为它的成员属性,为后面执行业务逻辑代码做预备。执行业务逻辑代码在runInThread(shell)方法里面。
[*]QuartzSchedulerThread.run() & 部分代码
JobRunShell shell = null;try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs);} catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue;}
[*]源码398行: qsRsrcs.getThreadPool().runInThread(shell)
[*]SimpleThreadPool.runInThread
// 保存所有WorkerThread的聚集private List workers;// 空闲的WorkerThread聚集private LinkedList availWorkers = new LinkedList();// 任务的WorkerThread聚集private LinkedList busyWorkers = new LinkedList();/** * 维护workers、availWorkers和busyWorkers三个列表数据 * 有任务需要一个线程出来执行:availWorkers.removeFirst();busyWorkers.add() * 然后调用WorkThread.run(runnable)方法 */public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the pool). WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true;}
[*]源码428行: WorkerThread ,是一个内部类,主要是赋值并唤醒lock对象的等候线程队列
[*]WorkerThread.run(Runnable newRunnable)
public void run(Runnable newRunnable) { synchronized(lock) { if(runnable != null) { throw new IllegalStateException("Already running a Runnable!"); } runnable = newRunnable; lock.notifyAll(); }}
[*]源码561行: WorkerThread 的run方法,方法执行lock.notifyAll()后,对应的WorkerThread就会来到run()方法。到这!接近曙光了!终于来到了执行业务的execute()方法的倒数第二步,runnable对象是一个JobRunShell对象,下面在看JobRunShell.run()方法。
[*]WorkerThread.run()
@Overridepublic void run() { boolean ran = false; while (run.get()) { try { synchronized(lock) { while (runnable == null && run.get()) { lock.wait(500); } if (runnable != null) { ran = true; // 启动真正执行的内容,runnable就是JobRunShell runnable.run(); } } } cache(){//...} } //if (log.isDebugEnabled()) try { getLog().debug("WorkerThread is shut down."); } catch(Exception e) { // ignore to help with a tomcat glitch }}JobRunShell.run() & 从上面WorkerThread.run(),调用到这里执行
public void run() { qs.addInternalSchedulerListener(this); try { OperableTrigger trigger = (OperableTrigger) jec.getTrigger(); JobDetail jobDetail = jec.getJobDetail(); do { // ... long startTime = System.currentTimeMillis(); long endTime = startTime; // execute the job try { log.debug("Calling execute on job " + jobDetail.getKey()); // 执行业务代码,也就是我们的task job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { endTime = System.currentTimeMillis(); jobExEx = jee; getLog().info("Job " + jobDetail.getKey() + " threw a JobExecutionException: ", jobExEx); } catch (Throwable e) { endTime = System.currentTimeMillis(); getLog().error("Job " + jobDetail.getKey() + " threw an unhandled Exception: ", e); SchedulerException se = new SchedulerException( "Job threw an unhandled exception.", e); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se); jobExEx = new JobExecutionException(se, false); } jec.setJobRunTime(endTime - startTime); // 其他代码 } while (true); } finally { qs.removeInternalSchedulerListener(this); }}QuartzJobBean.execte() & 继续往下走
public final void execute(JobExecutionContext context) throws JobExecutionException { try { BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this); MutablePropertyValues pvs = new MutablePropertyValues(); pvs.addPropertyValues(context.getScheduler().getContext()); pvs.addPropertyValues(context.getMergedJobDataMap()); bw.setPropertyValues(pvs, true); } catch (SchedulerException ex) { throw new JobExecutionException(ex); } executeInternal(context);}MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)
protected void executeInternal(JobExecutionContext context) throws JobExecutionException { try { // 反射执行业务代码 context.setResult(this.methodInvoker.invoke()); } catch (InvocationTargetException ex) { if (ex.getTargetException() instanceof JobExecutionException) { // -> JobExecutionException, to be logged at info level by Quartz throw (JobExecutionException) ex.getTargetException(); } else { // -> "unhandled exception", to be logged at error level by Quartz throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException()); } } catch (Exception ex) { // -> "unhandled exception", to be logged at error level by Quartz throw new JobMethodInvocationFailedException(this.methodInvoker, ex); }}五、综上总结
[*]quartz,即石英的意思,隐喻如石英钟般对时间的准确把握。
[*]源码分析是一个很快乐的过程,这个快乐是分析完才能获得的快乐。纵横交互的背后是面向对象的高度解耦,对线程精彩的使用,将任务执行做成筹划单,简直是一个超级棒的作品。
[*]对于quartz.properties,简单场景下,开辟者不用自定义设置,使用quartz默认设置即可,但在要求较高的使用场景中还是要自定义设置,比如通过org.quartz.threadPool.threadCount设置充足的线程数可提高多job场景下的运行性能。
[*]quartz 对任务处理高度解耦,job与trigger解藕,将任务本身和任务执行计谋解藕,这样可以方便实现N个任务和M个执行计谋自由组合。
[*]scheduler单独分离出来,相当于一个指挥官,可以从全局做调度,比如监听哪些trigger已经ready、分配线程等等。
[*]外部链接:
http://www.quartz-scheduler.org
quartz-2.1.x/configuration
页:
[1]