bb.util
Class ThreadPoolExecutor2

java.lang.Object
  extended by java.util.concurrent.AbstractExecutorService
      extended by java.util.concurrent.ThreadPoolExecutor
          extended by bb.util.ThreadPoolExecutor2
All Implemented Interfaces:
Executor, ExecutorService

public class ThreadPoolExecutor2
extends ThreadPoolExecutor

Extension of ThreadPoolExecutor that makes it more convenient to construct instances suited for a specific concurrent scenario.

The relevant scenario is when some primary thread(s) simply get data and submit it to a thread pool which will do all the remaining data processing.

Typically, there is just a single primary thread generating data because there is usually only a single source of data (e.g. hard drive or network that is read, or computational tasks that are generated). Furthermore, that single data generating thread usually does not fully utilize even 1 CPU (e.g. because disk and network bandwidth rarely approaches the data processing capabilities of modern CPUs).

In contrast, there are usually multiple threads in the thread pool, one per CPU, so that they can concurrently execute data processing tasks, which are often time consuming.

The usual motivations for having the primary thread(s) submit the actual data processing work to a thread pool all apply here. First, it allows the primary thread(s) to do minimal work, which allows them to remain responsive (e.g. for reading new incoming data). This is especially useful when spikes in the data rate occur. Second, the potential concurrency offered by the pool may enable optimal use of all CPUs, which can greatly increase data throughput.

This class adds no new methods to ThreadPoolExecutor. Instead, its public api is solely in its constructors which simplify the creation of ThreadPoolExecutors. The javadocs for the fundamental constructor (and the methods it links to) explain why it supports the above concurrent scenario.

Author:
Brent Boyer

Nested Class Summary
 
Nested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
 
Field Summary
static int maxBackupPerPoolThread_default
          Default value for the maxBackupPerPoolThread param that can be passed to the fundamental constructor.
static int numberCpusReserved_default
          Default value for the numberCpusReserved param that can be passed to the fundamental constructor.
 
Constructor Summary
ThreadPoolExecutor2()
          Simply calls this(maxBackupPerPoolThread_default).
ThreadPoolExecutor2(int maxBackupPerPoolThread)
          Simply calls this(numberCpusReserved_default, maxBackupPerPoolThread).
ThreadPoolExecutor2(int numberCpusReserved, int maxBackupPerPoolThread)
          First calls super using these params: corePoolSize == maximumPoolSize == poolSize(numberCpusReserved) (so the pool always has a constant number of threads that equals the number of free CPUs or 1 if none free) keepAliveTime == Long.MAX_VALUE (so timeout is practically infinite) unit == TimeUnit.SECONDS workQueue == new ArrayBlockingQueue with an initial capacity determined by a call to queueSize(numberCpusReserved, maxBackupPerPoolThread) (so tasks will only pile up to a specified limit before being rejected by the queue) handler == new ThreadPoolExecutor.CallerRunsPolicy (so tasks rejected by a full queue will be executed by the calling thread) Then, a call is made to the pool's prestartAllCoreThreads method.
 
Method Summary
private static int numberCpus()
          Returns the total number of CPUs that are available to this JVM.
private static int poolSize(int numberCpusReserved)
          Returns Math.max( numberCpus - numberCpusReserved, 1 ) (i.e.
private static int queueSize(int numberCpusReserved, int maxBackupPerPoolThread)
          Returns Math.max( poolSize(numberCpusReserved) * maxBackupPerPoolThread, 1 ).
 
Methods inherited from class java.util.concurrent.ThreadPoolExecutor
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, beforeExecute, execute, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, remove, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, shutdown, shutdownNow, terminated
 
Methods inherited from class java.util.concurrent.AbstractExecutorService
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
 
Methods inherited from class java.lang.Object
clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

numberCpusReserved_default

public static final int numberCpusReserved_default
Default value for the numberCpusReserved param that can be passed to the fundamental constructor. This value is 0, which is appropriate for a single primary thread reading data which is not fully using even one CPU.

See Also:
Constant Field Values

maxBackupPerPoolThread_default

public static final int maxBackupPerPoolThread_default
Default value for the maxBackupPerPoolThread param that can be passed to the fundamental constructor. This value is 3, which allows each thread in the pool to have, on average, at most 3 tasks piled up for it before the calling thread will start executing tasks.

See Also:
Constant Field Values
Constructor Detail

ThreadPoolExecutor2

public ThreadPoolExecutor2()
Simply calls this(maxBackupPerPoolThread_default).


ThreadPoolExecutor2

public ThreadPoolExecutor2(int maxBackupPerPoolThread)
                    throws IllegalArgumentException
Simply calls this(numberCpusReserved_default, maxBackupPerPoolThread).

Throws:
IllegalArgumentException - if maxBackupPerPoolThread < 0

ThreadPoolExecutor2

public ThreadPoolExecutor2(int numberCpusReserved,
                           int maxBackupPerPoolThread)
                    throws IllegalArgumentException
First calls super using these params:
  1. corePoolSize == maximumPoolSize == poolSize(numberCpusReserved) (so the pool always has a constant number of threads that equals the number of free CPUs or 1 if none free)
  2. keepAliveTime == Long.MAX_VALUE (so timeout is practically infinite)
  3. unit == TimeUnit.SECONDS
  4. workQueue == new ArrayBlockingQueue with an initial capacity determined by a call to queueSize(numberCpusReserved, maxBackupPerPoolThread) (so tasks will only pile up to a specified limit before being rejected by the queue)
  5. handler == new ThreadPoolExecutor.CallerRunsPolicy (so tasks rejected by a full queue will be executed by the calling thread)
Then, a call is made to the pool's prestartAllCoreThreads method.

Throws:
IllegalArgumentException - if numberCpusReserved < 0; maxBackupPerPoolThread < 0
Method Detail

poolSize

private static int poolSize(int numberCpusReserved)
                     throws IllegalArgumentException
Returns Math.max( numberCpus - numberCpusReserved, 1 ) (i.e. numberCpus - numberCpusReserved if numberCpus > numberCpusReserved, or 1 if numberCpus <= numberCpusReserved).

Reasoning: numberCpus - numberCpusReserved is the number of CPUs freely and fully available for the pool. Assuming that each pool thread should have its own free CPU, then that difference is also the optimal pool size. The pool, of course, has to have at least 1 thread, which is why 1 is the lower bound of the result.

Note that numberCpusReserved can be used to reserve CPUs for arbitrary reasons, but typically it is done to reserve 1 or more CPUs for the primary data reading thread(s) (see class javadocs).

Throws:
IllegalArgumentException - if numberCpusReserved < 0

queueSize

private static int queueSize(int numberCpusReserved,
                             int maxBackupPerPoolThread)
                      throws IllegalArgumentException
Returns Math.max( poolSize(numberCpusReserved) * maxBackupPerPoolThread, 1 ).

Reasoning: each pool thread should have, on average, no more than maxBackupPerPoolThread tasks in the queue, hence poolSize * maxBackupPerPoolThread should be the normal queue size. The queue, of course, has to have a capacity of at least 1, which is why 1 is the lower bound of the result.

Throws:
IllegalArgumentException - if numberCpusReserved < 0; maxBackupPerPoolThread < 0

numberCpus

private static int numberCpus()
Returns the total number of CPUs that are available to this JVM.