When it comes to timeout job in Java, there exists many utility class to do so: java.util.Timer
or Future
.
Java 5.0 introduced a more versitle utility class in concurrent pakage, ScheduledThreadPoolExecutor
, to replace the original Timer
class. It can execute a give task at given delay or rate and:
This class is preferable to Timer when multiple worker threads are needed
Today, we focus on how this executor is implemented.
Static View
As a subclass of ThreadPoolExecutor
, it is composed of two main parts:
- Job queue
- Thread pool
- size
- max size
- keep live time
- thread factory
- rejection handler
So, we will dive into those parts of implementations in the following section.
Job Queue
The job queue of ScheduledThreadPoolExecutor
is a class called DelayedWorkQueue
. This queue is based on priority queue which keeps the most urgent job on the top of heap. So every time, the thread will pick up the top of heap and run it if time is up without worry about other jobs.
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
}
Furthermore, every element in heap (ScheduledFutureTask
) keep the index in heap stored, which is used to speed up removal (down from O(n) to O(log n)).
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/**
* Index into delay queue, to support faster cancellation.
*/
int heapIndex;
}
Thread Pool
The thread pool configuration can be customized as other thread pool and has no special requirement.
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
Dynamic View
The static view show how this executor is composed, but doesn’t show how executor works, i.e. when and how a job is get from queue and handled. This section will focus on it.
Submit Job
The first step is to submit our job to executor:
executor.submit(xxx);// or execute()
=>
ScheduledThreadPoolExecutor#schedule()
=>
Queue#add() && ThreadPoolExecutor#ensurePrestart()
Prestart
In the pre-start phase, thread pool will add worker (with no actual job to this worker, job is taken from queue) to pool and start the worker thread. The started thread will call the workers’ run()
, and try to get real job from queue.
ThreadPoolExecutor#addWorker()
=>
new Worker()
Thread#start()
=>
ThreadPoolExecutor#Worker#run()
=>
ThreadPoolExecutor#Worker#runWorker()
=>
ThreadPoolExecutor#getTask()
=>
Queue#take() // or Queue#poll()
Take from Queue
The take()
operation may blocks if our queue is empty or more time is left to wait.
DelayedWorkQueue#take()
=>
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
Here we can see a variable called leader
, which represents leader follower pattern.
As the doc of source cod said:
This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
Only the leader wait for a given delay, other worker threads wait indefinitely. When the leader returning from take()
, it should notify other thread to try to be leader and wait for the first element again.
End of Story
After the waiting in the queue, worker thread get the job, and run task’s run()
, i.e. our scheduled job. If our job is periodic, scheduler will add our job back to queue again.
ScheduledFutureTask#run()
=>
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
Written with StackEdit.
评论
发表评论