指尖代码 发表于 2021-9-21 00:00:00

Spring定时任务Quartz执行全过程源码解读

一、前言介绍

在日常开辟中常常会用到定时任务,用来;库表扫描发送MQ、T+n账单结算、缓存数据更新、秒杀活动状态变更,等等。由于有了Spring的Schedule极大的方便了我们对这类场景的使用。那么,除了应用你还了解它多少呢;

[*]默认初始化多少个任务线程
[*]JobStore有几种实现,你平时用的都是哪个
[*]一个定时任务的执行流程简述下
蒙圈了吧,是不感觉平时只是使用了,根本没关注过这些。有种冲动赶紧搜索答案吧!但只是知道答案是没有多少意义的,扛不住问不说,也不了解原理。所以,假如你想真的提升自己技能,还是要从根本搞定。
二、案例工程

为了更好的做源码分析,我们将平时用的定时任务服务单独抽离出来。工程下载,关注公众号:bugstack虫洞栈,回复:源码分析
itstack-demo-code-schedule└── src    ├── main    │   ├── java    │   │   └── org.itstack.demo    │   │       ├── DemoTask.java    │   │       └── JobImpl.java       │   └── resources          │       ├── props          │       │   └── config.properties    │       ├── spring    │       │   └── spring-config-schedule-task.xml    │       ├── logback.xml    │       └── spring-config.xml    └── test         └── java             └── org.itstack.demo.test               ├── ApiTest.java               ├── MyQuartz.java                                                 └── MyTask.java三、环境设置


[*]JDK 1.8
[*]IDEA 2019.3.1
[*]Spring 4.3.24.RELEASE
[*]quartz 2.3.2 {不同版本略有代码差异}
四、源码分析

    org.quartz-scheduler    quartz    2.3.2依靠于Spring版本升级quartz选择2.3.2,同时假如你如本文案例中所示使用xml设置任务。那么会有如下更改;
Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean
            Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean
            在正式分析前,可以看下quartz的默认设置,很多初始化动作都要从这里取得参数,同样你可以设置自己的设置文件。例如,当你的任务很多时,默认初始化的10个线程组不满足你的业务需求,就可以按需调整。
quart.properties
# Default Properties file for use by StdSchedulerFactory# to create a Quartz Scheduler Instance, if a different# properties file is not explicitly specified.#org.quartz.scheduler.instanceName: DefaultQuartzSchedulerorg.quartz.scheduler.rmi.export: falseorg.quartz.scheduler.rmi.proxy: falseorg.quartz.scheduler.wrapJobExecutionInUserTransaction: falseorg.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPoolorg.quartz.threadPool.threadCount: 10org.quartz.threadPool.threadPriority: 5org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: trueorg.quartz.jobStore.misfireThreshold: 60000org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore1. 从一个简单案例开始

平时我们使用Schedule基本都是注解或者xml设置文件,但是为了可以更简单的分析代码,我们从一个简单的Demo入手,放到main函数中。
DemoTask.java & 定义一个等候被执行的任务
public class DemoTask {    private Logger logger = LoggerFactory.getLogger(DemoTask.class);    public void execute() throws Exception{      logger.info("定时处理用户信息任务:0/5 * * * * ?");    }}MyTask.java & 测试类,将设置在xml中的代码抽离出来
public class MyTask {    public static void main(String[] args) throws Exception {      DemoTask demoTask = new DemoTask();      // 定义了;执行的内容      MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();      methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);      methodInvokingJobDetailFactoryBean.setTargetMethod("execute");      methodInvokingJobDetailFactoryBean.setConcurrent(true);      methodInvokingJobDetailFactoryBean.setName("demoTask");      methodInvokingJobDetailFactoryBean.afterPropertiesSet();      // 定义了;执行的筹划      CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();      cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());      cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");      cronTriggerFactoryBean.setName("demoTask");      cronTriggerFactoryBean.afterPropertiesSet();      // 实现了;执行的功能      SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();      schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());      schedulerFactoryBean.setAutoStartup(true);      schedulerFactoryBean.afterPropertiesSet();      schedulerFactoryBean.start();      // 暂停住      System.in.read();    }}假如一切顺遂,那么会有如下效果:
2021-09-21 10:47:16.369 INFOorg.quartz.impl.StdSchedulerFactory - Using default implementation for ThreadExecutor2021-09-21 10:47:16.421 INFOorg.quartz.core.SchedulerSignalerImpl - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl2021-09-21 10:47:16.422 INFOorg.quartz.core.QuartzScheduler - Quartz Scheduler v.2.3.2 created.2021-09-21 10:47:16.423 INFOorg.quartz.simpl.RAMJobStore - RAMJobStore initialized.2021-09-21 10:47:16.424 INFOorg.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.2) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.NOT STARTED.Currently in standby mode.Number of jobs executed: 0Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.2021-09-21 10:47:16.424 INFOorg.quartz.impl.StdSchedulerFactory - Quartz scheduler 'QuartzScheduler' initialized from an externally provided properties instance.2021-09-21 10:47:16.424 INFOorg.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.22021-09-21 10:47:16.426 INFOorg.quartz.core.QuartzScheduler - JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@3e9b10102021-09-21 10:47:16.651 INFOorg.quartz.core.QuartzScheduler - Scheduler QuartzScheduler_$_NON_CLUSTERED started.九月 21, 2021 10:47:16 上午 org.springframework.scheduling.quartz.SchedulerFactoryBean startScheduler信息: Starting Quartz Scheduler now2021-09-21 10:47:20.321 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:25.001 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:30.000 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:35.001 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?2021-09-21 10:47:40.000 INFOorg.itstack.demo.DemoTask - 定时处理用户信息任务:0/5 * * * * ?Process finished with exit code -12. 定义执行内容(MethodInvokingJobDetailFactoryBean)

// 定义了;执行的内容MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);methodInvokingJobDetailFactoryBean.setTargetMethod("execute");methodInvokingJobDetailFactoryBean.setConcurrent(true);methodInvokingJobDetailFactoryBean.setName("demoTask");methodInvokingJobDetailFactoryBean.afterPropertiesSet();这块内容主要将我们的任务体(即待执行任务DemoTask)交给MethodInvokingJobDetailFactoryBean管理,首先设置须要信息;

[*]targetObject:目标对象bean,也就是demoTask
[*]targetMethod:目标方法name,也就是execute
[*]concurrent:是否并行执行,非并行执行任务,假如上一个任务没有执行完,下一刻不会执行
[*]name:xml设置非必传,源码中可以获取beanName
最后我们通过手动调用 afterPropertiesSet() 来模仿初始化。假如我们的类是交给 Spring 管理的,那么在实现了 InitializingBean 接口的类,在类设置信息加载后会自动执行 afterPropertiesSet() 。一般实现了 InitializingBean 接口的类,同时也会去实现 FactoryBean 接口,由于这个接口实现后就可以通过 T getObject() 获取自己自定义初始化的类。这也常常用在一些框架开辟中。
MethodInvokingJobDetailFactoryBean.afterPropertiesSet()
public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {        prepare();        // Use specific name if given, else fall back to bean name.        String name = (this.name != null ? this.name : this.beanName);        // Consider the concurrent flag to choose between stateful and stateless job.        Class jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);        // Build JobDetail instance.        JobDetailImpl jdi = new JobDetailImpl();        jdi.setName(name);        jdi.setGroup(this.group);        jdi.setJobClass((Class) jobClass);        jdi.setDurability(true);        jdi.getJobDataMap().put("methodInvoker", this);        this.jobDetail = jdi;                postProcessJobDetail(this.jobDetail);}

[*]源码168行: 根据是否并行执行选择任务类,这两个类都是MethodInvokingJobDetailFactoryBean的内部类,非并行执行的StatefulMethodInvokingJob只是继承MethodInvokingJob添加了标记注解。
[*]源码171行: 创建JobDetailImpl,添加任务明细信息,注意这类的jdi.setJobClass((Class) jobClass)实际就是MethodInvokingJob。MethodInvokingJob也是我们最终要反射调用执行的内容。
[*]源码177行: 初始化任务后赋值给this.jobDetail = jdi,也就是最终的类对象MethodInvokingJobDetailFactoryBean.getObject() @Override public JobDetail getObject() { return this.jobDetail; }
[*]源码:220行: 获取对象时返回 this.jobDetail,这也就解释了为什么 MethodInvokingJobDetailFactoryBean 初始化后直接赋值给了一个 JobDetail ;
3. 定义执行筹划(CronTriggerFactoryBeann)

// 定义了;执行的筹划CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");cronTriggerFactoryBean.setName("demoTask");cronTriggerFactoryBean.afterPropertiesSet();这一块主要定义任务的执行筹划,并将任务执行内容交给 CronTriggerFactoryBean 管理,同时设置须要信息;

[*]jobDetail:设置任务体,xml 中可以直接将对象赋值,硬编码中设置执行的 JobDetail 对象信息。也就是我们上面设置的 JobDetailImpl ,通过 getObject() 获取出来。
[*]cronExpression:筹划表达式;秒、分、时、日、月、周、年
CronTriggerFactoryBean.afterPropertiesSet()
@Overridepublic void afterPropertiesSet() throws ParseException {            // ... 校验属性信息                CronTriggerImpl cti = new CronTriggerImpl();        cti.setName(this.name);        cti.setGroup(this.group);        if (this.jobDetail != null) {                cti.setJobKey(this.jobDetail.getKey());        }        cti.setJobDataMap(this.jobDataMap);        cti.setStartTime(this.startTime);        cti.setCronExpression(this.cronExpression);        cti.setTimeZone(this.timeZone);        cti.setCalendarName(this.calendarName);        cti.setPriority(this.priority);        cti.setMisfireInstruction(this.misfireInstruction);        cti.setDescription(this.description);        this.cronTrigger = cti;}

[*]源码237行: 创建触发器 CronTriggerImpl 并设置干系属性信息
[*]源码245行: 天生执行筹划类 cti.setCronExpression(this.cronExpression);
public void setCronExpression(String cronExpression) throws ParseException {      TimeZone origTz = getTimeZone();      this.cronEx = new CronExpression(cronExpression);      this.cronEx.setTimeZone(origTz);}CronExpression.java & 解析Cron表达式
protected void buildExpression(String expression) throws ParseException {      expressionParsed = true;      try {                                // ... 初始化 TreeSet xxx = new TreeSet();                                int exprOn = SECOND;          StringTokenizer exprsTok = new StringTokenizer(expression, " \t",                  false);                                                while (exprsTok.hasMoreTokens() && exprOn instantiate()<li data-track="159">源码1323行: tp.initialize();</ul>SimpleThreadPool.initialize() & 这里的count是默认设置中的数量,可以更改
// create the worker threads and start them   Iterator workerThreads = createWorkerThreads(count).iterator();   while(workerThreads.hasNext()) {       WorkerThread wt = workerThreads.next();       wt.start();       availWorkers.add(wt);   }5. 启动定时任务

案例中使用硬编码方式调用 schedulerFactoryBean.start() 启动线程服务。线程的协作通过Object sigLock来实现,关于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock唤醒的是只有线程QuartzSchedulerThread。焦点流程如下;
https://p6.toutiaoimg.com/large/pgc-image/79c76cec08654e24a15c063ea02bd327
这个启动过程中,焦点的代码类,如下;

[*]StdScheduler
[*]QuartzScheduler
[*]QuartzSchedulerThread
[*]ThreadPool
[*]RAMJobStore
[*]CronTriggerImpl
[*]JobRunShellFactory
QuartzScheduler.start() & 启动
public void start() throws SchedulerException {    if (shuttingDown|| closed) {      throw new SchedulerException(                "The Scheduler cannot be restarted after shutdown() has been called.");    }          // QTZ-212 : calling new schedulerStarting() method on the listeners    // right after entering start()    notifySchedulerListenersStarting();            if (initialStart == null) {      initialStart = new Date();      this.resources.getJobStore().schedulerStarted();                  startPlugins();    } else {      resources.getJobStore().schedulerResumed();    }          // 唤醒线程        schedThread.togglePause(false);          getLog().info(            "Scheduler " + resources.getUniqueIdentifier() + " started.");      notifySchedulerListenersStarted();}QuartzSchedulerThread.run() & 执行过程
@Overridepublic void run() {    int acquiresFailed = 0;                // 只有调用了halt()方法,才会退出这个死循环    while (!halted.get()) {      try {                                                // 一、假如是暂停状态,则循环超时等候1000毫秒            // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..                           // 阻塞直到有空闲的线程可用并返回可用的数量            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();            if(availThreadCount > 0) {                                      List triggers;                long now = System.currentTimeMillis();                clearSignaledSchedulingChange();                                                try {                                        // 二、获取acquire状态的Trigger列表,也就是即将执行的任务                  triggers = qsRsrcs.getJobStore().acquireNextTriggers(                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat                  acquiresFailed = 0;                  if (log.isDebugEnabled())                        log.debug("batch acquisition of " + (triggers == null ? 0 : triggers                } catch(){//...}                                              if (triggers != null && !triggers.isEmpty()) {                                                          // 三:获取List第一个Trigger的下次触发时候                                        long triggerTime = triggers.get(0).getNextFireTime().getTime();                                                          // 四:获取任务触发聚集                                        List res = qsRsrcs.getJobStore().triggersFired(triggers);                                                                                // 五:设置Triggers为&#39;executing&#39;状态                                        qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));                                                          // 六:创建JobRunShell                                        qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);                                                                                // 七:执行Job                                        qsRsrcs.getThreadPool().runInThread(shell)                                                            continue; // while (!halted)                }            } else { // if(availThreadCount > 0)                // should never happen, if threadPool.blockForAvailableThreads() follows con                continue; // while (!halted)            }                                            } catch(RuntimeException re) {            getLog().error("Runtime error occurred in main trigger firing loop.", re);      }    }      qs = null;    qsRsrcs = null;}

[*]源码391行: 创建JobRunShell,JobRunShell实例在initialize()方法就会把包含业务逻辑类的JobDetailImpl设置为它的成员属性,为后面执行业务逻辑代码做预备。执行业务逻辑代码在runInThread(shell)方法里面。
[*]QuartzSchedulerThread.run() & 部分代码
JobRunShell shell = null;try {      shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);      shell.initialize(qs);} catch (SchedulerException se) {      qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);      continue;}

[*]源码398行: qsRsrcs.getThreadPool().runInThread(shell)
[*]SimpleThreadPool.runInThread
// 保存所有WorkerThread的聚集private List workers;// 空闲的WorkerThread聚集private LinkedList availWorkers = new LinkedList();// 任务的WorkerThread聚集private LinkedList busyWorkers = new LinkedList();/**   * 维护workers、availWorkers和busyWorkers三个列表数据   * 有任务需要一个线程出来执行:availWorkers.removeFirst();busyWorkers.add()   * 然后调用WorkThread.run(runnable)方法   */public boolean runInThread(Runnable runnable) {      if (runnable == null) {          return false;      }      synchronized (nextRunnableLock) {          handoffPending = true;          // Wait until a worker thread is available          while ((availWorkers.size() < 1) && !isShutdown) {            try {                  nextRunnableLock.wait(500);            } catch (InterruptedException ignore) {            }          }          if (!isShutdown) {            WorkerThread wt = (WorkerThread)availWorkers.removeFirst();            busyWorkers.add(wt);            wt.run(runnable);          } else {            // If the thread pool is going down, execute the Runnable            // within a new additional worker thread (no thread from the pool).                                              WorkerThread wt = new WorkerThread(this, threadGroup,                      "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);            busyWorkers.add(wt);            workers.add(wt);            wt.start();          }          nextRunnableLock.notifyAll();          handoffPending = false;      }      return true;}

[*]源码428行: WorkerThread ,是一个内部类,主要是赋值并唤醒lock对象的等候线程队列
[*]WorkerThread.run(Runnable newRunnable)
public void run(Runnable newRunnable) {      synchronized(lock) {          if(runnable != null) {            throw new IllegalStateException("Already running a Runnable!");          }          runnable = newRunnable;          lock.notifyAll();      }}

[*]源码561行: WorkerThread 的run方法,方法执行lock.notifyAll()后,对应的WorkerThread就会来到run()方法。到这!接近曙光了!终于来到了执行业务的execute()方法的倒数第二步,runnable对象是一个JobRunShell对象,下面在看JobRunShell.run()方法。
[*]WorkerThread.run()
@Overridepublic void run() {      boolean ran = false;                      while (run.get()) {          try {            synchronized(lock) {                  while (runnable == null && run.get()) {                      lock.wait(500);                  }                  if (runnable != null) {                      ran = true;                      // 启动真正执行的内容,runnable就是JobRunShell                      runnable.run();                  }            }          } cache(){//...}      }      //if (log.isDebugEnabled())      try {          getLog().debug("WorkerThread is shut down.");      } catch(Exception e) {          // ignore to help with a tomcat glitch      }}JobRunShell.run() & 从上面WorkerThread.run(),调用到这里执行
public void run() {    qs.addInternalSchedulerListener(this);    try {      OperableTrigger trigger = (OperableTrigger) jec.getTrigger();      JobDetail jobDetail = jec.getJobDetail();      do {            // ...            long startTime = System.currentTimeMillis();            long endTime = startTime;            // execute the job            try {                log.debug("Calling execute on job " + jobDetail.getKey());                                                // 执行业务代码,也就是我们的task                                job.execute(jec);                                                endTime = System.currentTimeMillis();            } catch (JobExecutionException jee) {                endTime = System.currentTimeMillis();                jobExEx = jee;                getLog().info("Job " + jobDetail.getKey() +                        " threw a JobExecutionException: ", jobExEx);            } catch (Throwable e) {                endTime = System.currentTimeMillis();                getLog().error("Job " + jobDetail.getKey() +                        " threw an unhandled Exception: ", e);                SchedulerException se = new SchedulerException(                        "Job threw an unhandled exception.", e);                qs.notifySchedulerListenersError("Job ("                        + jec.getJobDetail().getKey()                        + " threw an exception.", se);                jobExEx = new JobExecutionException(se, false);            }            jec.setJobRunTime(endTime - startTime);            // 其他代码      } while (true);    } finally {      qs.removeInternalSchedulerListener(this);    }}QuartzJobBean.execte() & 继续往下走
public final void execute(JobExecutionContext context) throws JobExecutionException {        try {                BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);                MutablePropertyValues pvs = new MutablePropertyValues();                pvs.addPropertyValues(context.getScheduler().getContext());                pvs.addPropertyValues(context.getMergedJobDataMap());                bw.setPropertyValues(pvs, true);        }        catch (SchedulerException ex) {                throw new JobExecutionException(ex);        }        executeInternal(context);}MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {        try {                // 反射执行业务代码                context.setResult(this.methodInvoker.invoke());        }        catch (InvocationTargetException ex) {                if (ex.getTargetException() instanceof JobExecutionException) {                        // -> JobExecutionException, to be logged at info level by Quartz                        throw (JobExecutionException) ex.getTargetException();                }                else {                        // -> "unhandled exception", to be logged at error level by Quartz                        throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());                }        }        catch (Exception ex) {                // -> "unhandled exception", to be logged at error level by Quartz                throw new JobMethodInvocationFailedException(this.methodInvoker, ex);        }}五、综上总结


[*]quartz,即石英的意思,隐喻如石英钟般对时间的准确把握。
[*]源码分析是一个很快乐的过程,这个快乐是分析完才能获得的快乐。纵横交互的背后是面向对象的高度解耦,对线程精彩的使用,将任务执行做成筹划单,简直是一个超级棒的作品。
[*]对于quartz.properties,简单场景下,开辟者不用自定义设置,使用quartz默认设置即可,但在要求较高的使用场景中还是要自定义设置,比如通过org.quartz.threadPool.threadCount设置充足的线程数可提高多job场景下的运行性能。
[*]quartz 对任务处理高度解耦,job与trigger解藕,将任务本身和任务执行计谋解藕,这样可以方便实现N个任务和M个执行计谋自由组合。
[*]scheduler单独分离出来,相当于一个指挥官,可以从全局做调度,比如监听哪些trigger已经ready、分配线程等等。
[*]外部链接:
http://www.quartz-scheduler.org
quartz-2.1.x/configuration
页: [1]
查看完整版本: Spring定时任务Quartz执行全过程源码解读