On ThreadPoolExecutor
- NOT recommend to create via Executors static factory, instead, use
ThreadPoolExecutor
. Recommend to use newCacheThreadExecutor, defaults toThreadPoolExecutor
- Note that in general try NOT to create new threads explicitly. Let TP provide one.
- To handle exception in the worker
- try-catch inside the runnable
submit()
to execute,Future.get()
to handle exTPE.afterExecute()
- pass in
Thread.UncaughtExceptionHandler
- Constructor params
- corePoolSize
- long keepAliveTime
- When # of threads > cores, idle threads will wait this long before got terminated
- Defaults to 60s
- maximumPoolSzie, i.e., we can use it to set constant size TP.
- workQueue
- Capacity defaults to Integer.MAX_VALUE!
- Insertion is done by BlockingQueue.offer(), which will not throw exception or block.
- threadFactory
- RejectedExecutionHandler handler - when out of thread range or queue capacity
- The state of TPE,i.e., highest 3 bits of the atomic
ctl
value- RUNNING
- SHUTDOWN: shutdown(), no new task will be accepted, but will finish executing the jobs in the task queue
- STOP: shutdownNow(), no new task, not executing tasks in the q
- TIDYING: all tasks completed, will execute terminated()
- TERMINIATED: after termiated() is run
- submit() returns Future vs execute() returns nothing
Building pieces of the TPE
class Worker extends AbstractQueuedSynchronizer implements Runnable
{
Runnable firstTask;
final Thread thread;
volatile long completedTasks;
}
thread
is created in the worker’s ctor, by the thread factory passed in- Worker is a nested class of the TPE. In fact, TPE has a reference to
Worker
too
class FutureTask{
Callable callable;
private volatile int state;
private Object outcome; // non-volatile, protected by state reads/writes
private volatile Thread runner;
}
Future<?> submit(Runnable task) {
RunnableFuture<Void> tftask = new Futuretask<T>(runnable, null); //uses Executors.callable(runanble, null)
execute(tfask);
return ftask;
}
runner
is CASed as theThread.currentThread()
from null. This also acts as the mutex checkcallable
is turned fromrunnable
by the Executor. Callable returns result and may throw exceptions
TPE.execute()
void execute(Runnable command) {
int c = ctl.get();
if(wokerCountOf(c) < corePoolsize) {
if(addWorker(command, true))
return;
c = ctil.get();
}
if(isRunning (c) && workQueue.offer(command)){ //added to the blocking queue successfully
int recheck = ctl.get();
if(!isRunning (recheck) && remove(command))
reject(command);
else if (wokerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false)){
reject(command);
}
}
TPE.execute()
is the interface to outside, i.e., directly called bysubmit()
- If TPE is running, and we can add the command to the task q. Note that we have to double check the state again because the offer operation is not an atomic step of
isRunning
and try removing it it. Similarly, the twoelse if
logics are quite similar because we essentialy performed the check twice. - If
isRunning
try adding the worker. Note thataddWorker
side checks the TPE state, so when the TPE is not running, the new task will not be run
TPE.runWorker(Worker w):
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
- This is called from worker and, by extension, worker’s thread since worker is an inner class
- Standard
lock()
and thenunlock()
insidefinally
getTask()
checks if it can poll a task withkeepAliveTime
, if it couldn’t work, it will return a null. This is how the non-core threads are terminated
Shutting down TPE
void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try{
checkShutdownAccess();
advanceRunState(SHUTDOWN);//CAS in a loop to set state. No actual interruption
interruptIdleWorkers();
onShutdown();
}finally{
mainLock.unlock();
}
tryTerminate();
}
List<Runnable> shutdownNow() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try{
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers(); //uses Thread.interrupt(), which just sets the mark, needs Thread.interrupted() inside that thread
tasks = drainQueue();
}finally{
mainLock.unlock();
}
tryTerminate();
return tasks;
}
threadPool.shutdown(); //have to wait for the termination explicitly
while(!threadPool.awaitTermination(60, TimeUnit.SECONDS)){
//busy waiting here, most likely you need to add the max retry count
}
FutureTask.run()
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
- CAS at the start uses current thread id as the optimistic lock. Hence, runner is set to null in the
finally