quartz调度器,elastic-job原理

一句话点题

quartz是通过线程等待方式实现的任务调度。

  1. 显示地通过Scheduler工厂创建Scheduler实例

ScheduledFactory有2个实现类,通常按照预定义的配置方式使用StdSchedulerFactory,这里使用Stard By模式进行调试和源码分析。

1
2
3
4
5
6
7
8
protected Scheduler createScheduler(String name, int threadPoolSize) throws SchedulerException {
Properties config = new Properties();
config.setProperty("org.quartz.scheduler.instanceName", name + "Scheduler");
config.setProperty("org.quartz.scheduler.instanceId", "AUTO");
config.setProperty("org.quartz.threadPool.threadCount", Integer.toString(threadPoolSize));
config.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
return new StdSchedulerFactory(config).getScheduler();
}

另一个DirectSchedulerFactor在集成式的工程中被使用到,通常需要对定时器有着明确的认识和定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void testPlugins() throws Exception {
final StringBuffer result = new StringBuffer();
SchedulerPlugin testPlugin = new SchedulerPlugin() {
public void initialize(String name, org.quartz.Scheduler scheduler, ClassLoadHelper classLoadHelper) throws org.quartz.SchedulerException {
result.append(name).append("|").append(scheduler.getSchedulerName());
};
public void start() {
result.append("|start");
};
public void shutdown() {
result.append("|shutdown");
};
};
ThreadPool threadPool = new SimpleThreadPool(1, 5);
threadPool.initialize();
DirectSchedulerFactory.getInstance().createScheduler(
"MyScheduler", "Instance1", threadPool,
new RAMJobStore(), Collections.singletonMap("TestPlugin", testPlugin),
null, -1, 0, 0, false, null);
Scheduler scheduler = DirectSchedulerFactory.getInstance().getScheduler("MyScheduler");
scheduler.start();
scheduler.shutdown();
}

拿到Scheduler实例StdScheduler后,看看Scheduer的工厂方法主要包括哪些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* <p>
* Returns a handle to the Scheduler produced by this factory.
* </p>
*
* <p>
* If one of the <code>initialize</code> methods has not be previously
* called, then the default (no-arg) <code>initialize()</code> method
* will be called by this method.
* </p>
*/
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
}
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
sched = instantiate();
return sched;
}
/**
* <p>
* Returns a handle to the default Scheduler, creating it if it does not
* yet exist.
* </p>
*
* @see #initialize()
*/
public static Scheduler getDefaultScheduler() throws SchedulerException {
StdSchedulerFactory fact = new StdSchedulerFactory();
return fact.getScheduler();
}

getScheduler中明确了一个注册器,注册器的作用和内存回收有关系,持有Scheduler的引用唯一且不被回收,并且允许lookups都在一个类加载空间中。这个类在所有的方法上都加了锁,来确保实例的唯一和线程安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* <p>
* Holds references to Scheduler instances - ensuring uniqueness, and
* preventing garbage collection, and allowing 'global' lookups - all within a
* ClassLoader space.
* </p>
*
* @author James House
*/
public class SchedulerRepository {
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Data members.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
private HashMap<String, Scheduler> schedulers;
private static SchedulerRepository inst;
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Constructors.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
private SchedulerRepository() {
schedulers = new HashMap<String, Scheduler>();
}
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Interface.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
public static synchronized SchedulerRepository getInstance() {
if (inst == null) {
inst = new SchedulerRepository();
}
return inst;
}
public synchronized void bind(Scheduler sched) throws SchedulerException {
if ((Scheduler) schedulers.get(sched.getSchedulerName()) != null) {
throw new SchedulerException("Scheduler with name '"
+ sched.getSchedulerName() + "' already exists.");
}
schedulers.put(sched.getSchedulerName(), sched);
}
public synchronized boolean remove(String schedName) {
return (schedulers.remove(schedName) != null);
}
public synchronized Scheduler lookup(String schedName) {
return schedulers.get(schedName);
}
public synchronized Collection<Scheduler> lookupAll() {
return java.util.Collections
.unmodifiableCollection(schedulers.values());
}
}

这里使用了锁加判断的形式实现的单例,而并没有使用常说的volatile+双重锁定单例。

这还没有完,接下来就是所有细节中的重点,重点中的细节,初始化过程,很长。

1
2
3
4
5
6
7
8
/**
* <p>
* Initialize the <code>{@link org.quartz.SchedulerFactory}</code> with
* the contents of the <code>Properties</code> file with the given
* name.
* </p>
*/
public void initialize(String filename) throws SchedulerException;

拿到最后的StdScheduler实例后,接下来立即使用start()方法启动,start方法会去调用真正的QuartzScheduler核心对象,所以接下来的重点便是QuartzScheduler的控制,但是这个对象属于StdScheduler成员变量,所以所有的外部调用都通过StdScheduler来实现,包括这里的start。

1
2
3
4
5
6
7
8
/**
* <p>
* Calls the equivalent method on the 'proxied' <code>QuartzScheduler</code>.
* </p>
*/
public void start() throws SchedulerException {
sched.start();
}

从QuartzScheduler的初始化中可以得到一些重要的成员变量以及初始化的构造方法中对这些成员变量的操作;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* <p>
* Create a <code>QuartzScheduler</code> with the given configuration
* properties.
* </p>
*
* @see QuartzSchedulerResources
*/
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}
jobMgr = new ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger = new ErrorLogger();
addInternalSchedulerListener(errLogger);
signaler = new SchedulerSignalerImpl(this, this.schedThread);
getLog().info("Quartz Scheduler v." + getVersion() + " created.");
}

可以看出QuartzScheduler负责管理整个过程,但是这个过程的调用交给了stdScheduler作为代理。

  1. 初始化,配置、调度器
  2. 启动,执行调度器
  3. 停止,停止调度器

接下来便是作为调度器核心的类:QuartzSchedulerThread,几个重要的方法如下:

  1. 初始化 -> 构造
  2. 被执行 -> run
  3. 被调用 -> toggle

分别看一下这3个阶段做了哪些操作:

初始化:

1
2
3
4
5
6
7
8
9
//初始化
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//执行run方法
schedThreadExecutor.execute(this.schedThread);
if (idleWaitTime > 0) {
this.schedThread.setIdleWaitTime(idleWaitTime);
}

然后再看一下线程类的内部明细,包括成员变量及初始化的动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Data members.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
private QuartzScheduler qs;
private QuartzSchedulerResources qsRsrcs;
private final Object sigLock = new Object();
private boolean signaled;
private long signaledNextFireTime;
private boolean paused;
private AtomicBoolean halted;
private Random random = new Random(System.currentTimeMillis());
// When the scheduler finds there is no current trigger to fire, how long
// it should wait until checking again...
private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;
private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;
private int idleWaitVariablness = 7 * 1000;
private final Logger log = LoggerFactory.getLogger(getClass());
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Constructors.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
/**
* <p>
* Construct a new <code>QuartzSchedulerThread</code> for the given
* <code>QuartzScheduler</code> as a non-daemon <code>Thread</code>
* with normal priority.
* </p>
*/
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs) {
this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY);
}
/**
* <p>
* Construct a new <code>QuartzSchedulerThread</code> for the given
* <code>QuartzScheduler</code> as a <code>Thread</code> with the given
* attributes.
* </p>
*/
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
this.qs = qs;
this.qsRsrcs = qsRsrcs;
this.setDaemon(setDaemon);
if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
}
this.setPriority(threadPrio);
// start the underlying thread, but put this object into the 'paused'
// state
// so processing doesn't start yet...
paused = true;
halted = new AtomicBoolean(false);
}

//启动执行toggle方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
//在这里调用
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}

接下来便是整个调度器的核心算法了,首先是run动作,让线程启动起来,然后是QuartzScheduler调用start方法执行的togglePause方法改变线程循环条件。

便于阅读,我们先从togglePause看起

1
2
3
4
5
6
7
8
9
10
11
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
}

根据这个条件的改变,run方法执行的时候的循环条件如果在满足的情况下,则run方法内部的代码应该已经处于执行状态了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/**
* <p>
* The main processing loop of the <code>QuartzSchedulerThread</code>.
* </p>
*/
@Override
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}

QuartzSchedulerThread负责整个调度器的调度线程,默认每30s一次获取下一个30s内要执行的任务到treeMap,通过循环去拿到需要执行的任务,然后委派到执行线程池去执行。

这个概念和nio的概念有点相似。

调度器通过调度线程完成领导的管理工作,最后的执行肯定是通过一个线程池的形式去执行对应的任务了,然后再看看具体的执行是如何管理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}

runInTrhead交给对应的线程池进行的执行动作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}

这里SimpleThreadPool就是核心的任务执行线程池的执行过程。

JobRunShell就是线程池需要执行的线程对象,这个线程是经过任务包装过的,所以分别从初始化和run方法两个层面去看这个线程是如何和对应的Job进行关联的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void initialize(QuartzScheduler sched)
throws SchedulerException {
this.qs = sched;
Job job = null;
JobDetail jobDetail = firedTriggerBundle.getJobDetail();
try {
job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
} catch (SchedulerException se) {
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
} catch (Throwable ncdfe) { // such as NoClassDefFoundError
SchedulerException se = new SchedulerException(
"Problem instantiating class '"
+ jobDetail.getJobClass().getName() + "' - ", ncdfe);
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
}
this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
}

对应的执行方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
public void run() {
qs.addInternalSchedulerListener(this);
try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();
try {
begin();
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't begin execution.", se);
break;
}
// notify job & trigger listeners...
try {
if (!notifyListenersBeginning(jec)) {
break;
}
} catch(VetoedException ve) {
try {
CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
// QTZ-205
// Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not.
if (jec.getTrigger().getNextFireTime() == null) {
qs.notifySchedulerListenersFinalized(jec.getTrigger());
}
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error during veto of Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
break;
}
long startTime = System.currentTimeMillis();
long endTime = startTime;
// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getKey());
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getKey() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getKey() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getKey()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
}
jec.setJobRunTime(endTime - startTime);
// notify all job listeners
if (!notifyJobListenersComplete(jec, jobExEx)) {
break;
}
CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
// update the trigger
try {
instCode = trigger.executionComplete(jec, jobExEx);
} catch (Exception e) {
// If this happens, there's a bug in the trigger...
SchedulerException se = new SchedulerException(
"Trigger threw an unhandled exception.", e);
qs.notifySchedulerListenersError(
"Please report this error to the Quartz developers.",
se);
}
// notify all trigger listeners
if (!notifyTriggerListenersComplete(jec, instCode)) {
break;
}
// update job/trigger or re-execute job
if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
jec.incrementRefireCount();
try {
complete(false);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
continue;
}
try {
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
continue;
}
qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
break;
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
}

抛开线程之间互相竞争的那部分功能而言,单个任务的完整执行过结束了。如果在基于数据库存储的情况下,这个过程会更加复杂点,负载在于所有的任务都需要和数据库交互。

调度器的幂等性决定了任务的可重复执行的特性,否则在实时任务中可能需要事务的支持,或者需要在业务上支撑。

elastic-job

light-task-scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (start.compareAndSet(false, true)) {
loadSize = appContext.getConfig().getParameter(ExtConfig.JOB_TRACKER_PRELOADER_SIZE, 300);
factor = appContext.getConfig().getParameter(ExtConfig.JOB_TRACKER_PRELOADER_FACTOR, 0.2);
long interval = appContext.getConfig().getParameter(ExtConfig.JOB_TRACKER_PRELOADER_SIGNAL_CHECK_INTERVAL, 100);
scheduledFuture = LOAD_EXECUTOR_SERVICE.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
doLoad();
}
}, interval, interval, TimeUnit.MILLISECONDS);
NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
@Override
public void call() throws Exception {
scheduledFuture.cancel(true);
LOAD_EXECUTOR_SERVICE.shutdown();
start.set(false);
}
});
}