Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean
Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean在正式分析前,可以看下quartz的默认设置,很多初始化动作都要从这里取得参数,同样你可以设置自己的设置文件。例如,当你的任务很多时,默认初始化的10个线程组不满足你的业务需求,就可以按需调整。
quart.properties# Default Properties file for use by StdSchedulerFactory# to create a Quartz Scheduler Instance, if a different# properties file is not explicitly specified.#org.quartz.scheduler.instanceName: DefaultQuartzSchedulerorg.quartz.scheduler.rmi.export: falseorg.quartz.scheduler.rmi.proxy: falseorg.quartz.scheduler.wrapJobExecutionInUserTransaction: falseorg.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPoolorg.quartz.threadPool.threadCount: 10org.quartz.threadPool.threadPriority: 5org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: trueorg.quartz.jobStore.misfireThreshold: 60000org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore1. 从一个简单案例开始
DemoTask.java & 定义一个等候被执行的任务public class DemoTask { private Logger logger = LoggerFactory.getLogger(DemoTask.class); public void execute() throws Exception{ logger.info("定时处理用户信息任务:0/5 * * * * ?"); }}
MyTask.java & 测试类,将设置在xml中的代码抽离出来public class MyTask { public static void main(String[] args) throws Exception { DemoTask demoTask = new DemoTask(); // 定义了;执行的内容 MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean(); methodInvokingJobDetailFactoryBean.setTargetObject(demoTask); methodInvokingJobDetailFactoryBean.setTargetMethod("execute"); methodInvokingJobDetailFactoryBean.setConcurrent(true); methodInvokingJobDetailFactoryBean.setName("demoTask"); methodInvokingJobDetailFactoryBean.afterPropertiesSet(); // 定义了;执行的筹划 CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean(); cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject()); cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?"); cronTriggerFactoryBean.setName("demoTask"); cronTriggerFactoryBean.afterPropertiesSet(); // 实现了;执行的功能 SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject()); schedulerFactoryBean.setAutoStartup(true); schedulerFactoryBean.afterPropertiesSet(); schedulerFactoryBean.start(); // 暂停住 System.in.read(); }}假如一切顺遂,那么会有如下效果:
MethodInvokingJobDetailFactoryBean.afterPropertiesSet()public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException { prepare(); // Use specific name if given, else fall back to bean name. String name = (this.name != null ? this.name : this.beanName); // Consider the concurrent flag to choose between stateful and stateless job. Class jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class); // Build JobDetail instance. JobDetailImpl jdi = new JobDetailImpl(); jdi.setName(name); jdi.setGroup(this.group); jdi.setJobClass((Class) jobClass); jdi.setDurability(true); jdi.getJobDataMap().put("methodInvoker", this); this.jobDetail = jdi; postProcessJobDetail(this.jobDetail);}
CronTriggerFactoryBean.afterPropertiesSet()@Overridepublic void afterPropertiesSet() throws ParseException { // ... 校验属性信息 CronTriggerImpl cti = new CronTriggerImpl(); cti.setName(this.name); cti.setGroup(this.group); if (this.jobDetail != null) { cti.setJobKey(this.jobDetail.getKey()); } cti.setJobDataMap(this.jobDataMap); cti.setStartTime(this.startTime); cti.setCronExpression(this.cronExpression); cti.setTimeZone(this.timeZone); cti.setCalendarName(this.calendarName); cti.setPriority(this.priority); cti.setMisfireInstruction(this.misfireInstruction); cti.setDescription(this.description); this.cronTrigger = cti;}
protected void buildExpression(String expression) throws ParseException { expressionParsed = true; try { // ... 初始化 TreeSet xxx = new TreeSet(); int exprOn = SECOND; StringTokenizer exprsTok = new StringTokenizer(expression, " \t", false); while (exprsTok.hasMoreTokens() && exprOn instantiate()<li data-track="159">源码1323行: tp.initialize();</ul>CronExpression.java & 解析Cron表达式
// create the worker threads and start them Iterator workerThreads = createWorkerThreads(count).iterator(); while(workerThreads.hasNext()) { WorkerThread wt = workerThreads.next(); wt.start(); availWorkers.add(wt); }5. 启动定时任务SimpleThreadPool.initialize() & 这里的count是默认设置中的数量,可以更改
QuartzScheduler.start() & 启动public void start() throws SchedulerException { if (shuttingDown|| closed) { throw new SchedulerException( "The Scheduler cannot be restarted after shutdown() has been called."); } // QTZ-212 : calling new schedulerStarting() method on the listeners // right after entering start() notifySchedulerListenersStarting(); if (initialStart == null) { initialStart = new Date(); this.resources.getJobStore().schedulerStarted(); startPlugins(); } else { resources.getJobStore().schedulerResumed(); } // 唤醒线程 schedThread.togglePause(false); getLog().info( "Scheduler " + resources.getUniqueIdentifier() + " started."); notifySchedulerListenersStarted();}
QuartzSchedulerThread.run() & 执行过程@Overridepublic void run() { int acquiresFailed = 0; // 只有调用了halt()方法,才会退出这个死循环 while (!halted.get()) { try { // 一、假如是暂停状态,则循环超时等候1000毫秒 // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting).. // 阻塞直到有空闲的线程可用并返回可用的数量 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { List triggers; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { // 二、获取acquire状态的Trigger列表,也就是即将执行的任务 triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers } catch(){//...} if (triggers != null && !triggers.isEmpty()) { // 三:获取List第一个Trigger的下次触发时候 long triggerTime = triggers.get(0).getNextFireTime().getTime(); // 四:获取任务触发聚集 List res = qsRsrcs.getJobStore().triggersFired(triggers); // 五:设置Triggers为'executing'状态 qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); // 六:创建JobRunShell qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); // 七:执行Job qsRsrcs.getThreadPool().runInThread(shell) continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows con continue; // while (!halted) } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } qs = null; qsRsrcs = null;}
JobRunShell.run() & 从上面WorkerThread.run(),调用到这里执行public void run() { qs.addInternalSchedulerListener(this); try { OperableTrigger trigger = (OperableTrigger) jec.getTrigger(); JobDetail jobDetail = jec.getJobDetail(); do { // ... long startTime = System.currentTimeMillis(); long endTime = startTime; // execute the job try { log.debug("Calling execute on job " + jobDetail.getKey()); // 执行业务代码,也就是我们的task job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { endTime = System.currentTimeMillis(); jobExEx = jee; getLog().info("Job " + jobDetail.getKey() + " threw a JobExecutionException: ", jobExEx); } catch (Throwable e) { endTime = System.currentTimeMillis(); getLog().error("Job " + jobDetail.getKey() + " threw an unhandled Exception: ", e); SchedulerException se = new SchedulerException( "Job threw an unhandled exception.", e); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se); jobExEx = new JobExecutionException(se, false); } jec.setJobRunTime(endTime - startTime); // 其他代码 } while (true); } finally { qs.removeInternalSchedulerListener(this); }}
QuartzJobBean.execte() & 继续往下走public final void execute(JobExecutionContext context) throws JobExecutionException { try { BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this); MutablePropertyValues pvs = new MutablePropertyValues(); pvs.addPropertyValues(context.getScheduler().getContext()); pvs.addPropertyValues(context.getMergedJobDataMap()); bw.setPropertyValues(pvs, true); } catch (SchedulerException ex) { throw new JobExecutionException(ex); } executeInternal(context);}
MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)protected void executeInternal(JobExecutionContext context) throws JobExecutionException { try { // 反射执行业务代码 context.setResult(this.methodInvoker.invoke()); } catch (InvocationTargetException ex) { if (ex.getTargetException() instanceof JobExecutionException) { // -> JobExecutionException, to be logged at info level by Quartz throw (JobExecutionException) ex.getTargetException(); } else { // -> "unhandled exception", to be logged at error level by Quartz throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException()); } } catch (Exception ex) { // -> "unhandled exception", to be logged at error level by Quartz throw new JobMethodInvocationFailedException(this.methodInvoker, ex); }}五、综上总结
欢迎光临 创意电子 (https://wxcydz.cc/) | Powered by Discuz! X3.4 |