跳至主要内容

ScheduledThreadPool Implementation

When it comes to timeout job in Java, there exists many utility class to do so: java.util.Timer or Future.

Java 5.0 introduced a more versitle utility class in concurrent pakage, ScheduledThreadPoolExecutor, to replace the original Timer class. It can execute a give task at given delay or rate and:

This class is preferable to Timer when multiple worker threads are needed

Today, we focus on how this executor is implemented.

Static View

As a subclass of ThreadPoolExecutor, it is composed of two main parts:

  • Job queue
  • Thread pool
    • size
    • max size
    • keep live time
    • thread factory
    • rejection handler

So, we will dive into those parts of implementations in the following section.

Job Queue

The job queue of ScheduledThreadPoolExecutor is a class called DelayedWorkQueue. This queue is based on priority queue which keeps the most urgent job on the top of heap. So every time, the thread will pick up the top of heap and run it if time is up without worry about other jobs.

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

  private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

}

Furthermore, every element in heap (ScheduledFutureTask) keep the index in heap stored, which is used to speed up removal (down from O(n) to O(log n)).

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

  /**
   * Index into delay queue, to support faster cancellation.
   */
  int heapIndex; 
}

Thread Pool

The thread pool configuration can be customized as other thread pool and has no special requirement.

public ScheduledThreadPoolExecutor(int corePoolSize,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

Dynamic View

The static view show how this executor is composed, but doesn’t show how executor works, i.e. when and how a job is get from queue and handled. This section will focus on it.

Submit Job

The first step is to submit our job to executor:

executor.submit(xxx);// or execute()
=>
ScheduledThreadPoolExecutor#schedule()
=>
Queue#add() && ThreadPoolExecutor#ensurePrestart()

Prestart

In the pre-start phase, thread pool will add worker (with no actual job to this worker, job is taken from queue) to pool and start the worker thread. The started thread will call the workers’ run(), and try to get real job from queue.

ThreadPoolExecutor#addWorker()
=>
new Worker()
Thread#start()
=>
ThreadPoolExecutor#Worker#run()
=>
ThreadPoolExecutor#Worker#runWorker()
=>
ThreadPoolExecutor#getTask()
=>
Queue#take() // or Queue#poll()

Take from Queue

The take() operation may blocks if our queue is empty or more time is left to wait.

DelayedWorkQueue#take()
=>
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
    available.await();
else {
    long delay = first.getDelay(NANOSECONDS);
    if (delay <= 0)
        return finishPoll(first);
    first = null; // don't retain ref while waiting
    if (leader != null)
        available.await();
    else {
        Thread thisThread = Thread.currentThread();
        leader = thisThread;
        try {
            available.awaitNanos(delay);
        } finally {
            if (leader == thisThread)
                leader = null;
        }
    }
}

Here we can see a variable called leader, which represents leader follower pattern.
As the doc of source cod said:

This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.

Only the leader wait for a given delay, other worker threads wait indefinitely. When the leader returning from take(), it should notify other thread to try to be leader and wait for the first element again.

End of Story

After the waiting in the queue, worker thread get the job, and run task’s run(), i.e. our scheduled job. If our job is periodic, scheduler will add our job back to queue again.

ScheduledFutureTask#run()
=>
  boolean periodic = isPeriodic();
  if (!canRunInCurrentRunState(periodic))
      cancel(false);
  else if (!periodic)
      ScheduledFutureTask.super.run();
  else if (ScheduledFutureTask.super.runAndReset()) {
      setNextRunTime();
      reExecutePeriodic(outerTask);
  }

Written with StackEdit.

评论

此博客中的热门博文

Spring Boot: Customize Environment

Spring Boot: Customize Environment Environment variable is a very commonly used feature in daily programming: used in init script used in startup configuration used by logging etc In Spring Boot, all environment variables are a part of properties in Spring context and managed by Environment abstraction. Because Spring Boot can handle the parse of configuration files, when we want to implement a project which uses yml file as a separate config file, we choose the Spring Boot. The following is the problems we met when we implementing the parse of yml file and it is recorded for future reader. Bind to Class Property values can be injected directly into your beans using the @Value annotation, accessed via Spring’s Environment abstraction or bound to structured objects via @ConfigurationProperties. As the document says, there exists three ways to access properties in *.properties or *.yml : @Value : access single value Environment : can access multi

Elasticsearch: Join and SubQuery

Elasticsearch: Join and SubQuery Tony was bothered by the recent change of search engine requirement: they want the functionality of SQL-like join in Elasticsearch! “They are crazy! How can they think like that. Didn’t they understand that Elasticsearch is kind-of NoSQL 1 in which every index should be independent and self-contained? In this way, every index can work independently and scale as they like without considering other indexes, so the performance can boost. Following this design principle, Elasticsearch has little related supports.” Tony thought, after listening their requirements. Leader notice tony’s unwillingness and said, “Maybe it is hard to do, but the requirement is reasonable. We need to search person by his friends, didn’t we? What’s more, the harder to implement, the more you can learn from it, right?” Tony thought leader’s word does make sense so he set out to do the related implementations Application-Side Join “The first implementation

Implement isdigit

It is seems very easy to implement c library function isdigit , but for a library code, performance is very important. So we will try to implement it and make it faster. Function So, first we make it right. int isdigit ( char c) { return c >= '0' && c <= '9' ; } Improvements One – Macro When it comes to performance for c code, macro can always be tried. #define isdigit (c) c >= '0' && c <= '9' Two – Table Upper version use two comparison and one logical operation, but we can do better with more space: # define isdigit(c) table[c] This works and faster, but somewhat wasteful. We need only one bit to represent true or false, but we use a int. So what to do? There are many similar functions like isalpha(), isupper ... in c header file, so we can combine them into one int and get result by table[c]&SOME_BIT , which is what source do. Source code of ctype.h : # define _ISbit(bit) (1 << (