10.4 Queues and Sizes
The two fundamental
things that affect a thread pool are its size and the queue used for
the tasks. These are set in the constructor of the thread pool; the
size can change dynamically while the queue must remain fixed. In
addition to the constructor, these methods interact with the
pool's size and queue:
package java.util.concurrent;
public class ThreadPoolExecutor implements ExecutionService {
public boolean prestartCoreThread( );
public int prestartAllCoreThreads( );
public void setMaximumPoolSize(int maximumPoolSize);
public int getMaximumPoolSize( );
public void setCorePoolSize(int corePoolSize);
public int getCorePoolSize( );
public int getPoolSize( );
public int getLargestPoolSize( );
public int getActiveCount( );
public BlockingQueue<Runnable> getQueue( );
public long getTaskCount( );
public long getCompletedTaskCount( );
}
The first set of methods deal with the thread pool's
size, and the remaining methods deal with its queue.
- Size
-
The size of the thread pool varies between a given minimum (or core)
and maximum number of threads. In our example, we use the same
parameter for both values, making the thread pool a constant size.
If you specify different numbers for the minimum and maximum number
of threads, the thread pool dynamically alters the number of threads
it uses to run its tasks. The current size (returned from the
getPoolSize() method) falls between the core size
and the maximum size.
- Queue
-
The queue is the data structure used to hold tasks that are awaiting
execution. The choice of queue affects how certain tasks are
scheduled. In this case, we've used a linked
blocking queue, which places the least constraints on how tasks are
added to the queue. Once you've passed this queue to
the thread pool, you should not call any methods on it directly. In
particular, do not add items directly to the queue; add them through
the execute() method of the thread pool. The
getQueue() method returns the queue, but you
should use that for debugging purposes only; don't
execute methods directly on the queue or the internal workings of the
thread pool become confused.
These parameters allow considerable flexibility in the way the thread
pool operates. The basic principle is that the thread pool tries to
keep its minimum number of threads active. If it gets too busy (where
busy is a property of the particular queue that the thread pool
uses), it adds threads until the maximum number of threads is
reached, at which point it does not allow any more tasks to be
queued.
There are some nuances in this, particularly in how the queue
interacts with the number of threads. Let's take it
step by step:
The thread pool is constructed with M core
threads and N maximum threads. At this point, no
threads are actually created (though you can specify that the pool
create the M core threads by calling the thread
pool's prestartAllCoreThreads()
method or that it preallocate one core thread by calling the
prestartCoreThread() method). A task enters the pool (via the thread pool's
execute() method). Now one of five things
happens: If the pool has created fewer than M threads, it
starts a new thread and runs the new task immediately. Even if some
of the existing threads are idle, a new thread is created in the
pool's attempt to reach M
threads. If the pool has between M and
N threads and one of those threads is idle, the
task is run by an idle thread. If the pool has between M and
N threads and all the threads are busy, the
thread pool examines the existing work queue. If the task can be
placed on the work queue without blocking, it's put
on the queue and no new thread is started. If the pool has between M and
N threads, all threads are busy, and the task
cannot be added to the queue without blocking, the pool starts a new
thread and runs the task on that thread. If the pool has N threads and all threads are
busy, the pool attempts to place the new task on the queue. If the
queue has reached its maximum size, this attempt fails and the task
is rejected. Otherwise, the task is accepted and run when a thread
becomes idle (and all previously queued tasks have run).
A task completes execution. The thread running the task then runs the
next task on the queue. If no tasks are on the queue, one of two
things happens: If the pool has more than M threads, the thread
waits for a new task to be queued. If a new task is queued within the
timeout period, the thread runs it. If not, the thread exits,
reducing the total number of threads in the pool. The timeout period
is a parameter used to construct the thread pool; in our example, we
specified 50 seconds (50000L time units of
TimeUnit.MILLISECONDS). Note that if the specified
timeout is 0, the thread always exits, regardless of the requested
minimum thread pool size. If the pool has M or fewer threads, the thread
blocks indefinitely waiting for a new task to be queued (unless the
timeout was 0, in which case it exits). It runs the new task when
available.
What are the implications of all this? It means that the choice of
pool size and queue are important to getting the behavior you want.
For a queue, you have three choices: A
SynchronousQueue,
which effectively has a size of 0. In this case, whenever the pool
tries to queue a task, it fails. The implication of this is tasks are
either run immediately (because the pool has an idle thread or is
below its threshold and, therefore, creates a new thread) or are
rejected immediately. Note that you can prevent rejection of a task
if you specify an unlimited maximum number of threads, but this
prevents the throttling benefit of using a thread pool in the first
place. An unbounded queue, such as a
LinkedBlockingQueue
with an unlimited capacity. In this case, adding a task to the queue
always succeeds, which means that the thread pool never creates more
than M threads and never rejects a task. A bounded queue, such as a LinkedBlockingQueue
with a fixed capacity or an
ArrayBlockingQueue.
Let's suppose that the queue has a bounds of
P. As tasks are added to the pool, it creates
threads until it reaches M threads. At that
point, it starts queueing tasks until the number of waiting tasks
reaches P. As more tasks are added, the pool
starts adding threads until it reaches N
threads. If we reach a state where N threads are
active and P tasks are queued, additional tasks
are rejected.
In our example, we used a LinkedBlockingQueue with
an unbounded capacity and a fixed pool size. This is perhaps the most
common configuration of thread pools: it allows tasks to wait for an
available thread, and a fixed number of threads is easier to monitor
than a variable number of threads. A good alternative to this is to
use a bounded queue with a fixed number of threads. In this model, if
tasks start to arrive faster than they can be processed, they queue.
Unlike the unbounded case, however, at some point the queue threshold
is reached and your program must take appropriate action: if
it's a server, it can reject future requests from
clients, telling them that it's too busy right now
and they should try again later.
If you use a thread pool, there is no magic formula that you can use
to determine its optimal size and queuing strategy. When the
operations are strictly CPU-bound, use only as many threads as there
are CPUs. For more complex operations, choosing a thread pool size is
a matter of testing different values to see which gives you the best
program performance.
10.4.1 Rejected Tasks
Depending on the type of queue you use in the thread pool, a task may be
rejected by the execute() method. Tasks are
rejected if the queue is full or if the shutdown(
) method has been called on the thread pool.
When a task is rejected, the thread pool calls the rejected execution
handler associated with the thread pool. These
APIs
deal with the rejected execution handler:
package java.util.concurrent;
public interface RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
package java.util.concurrent;
public class ThreadPoolExecutor implements ExecutorService {
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
public RejectedExecutionHandler getRejectedExecutionHandler( );
public static class AbortPolicy implements RejectedExecutionHandler;
public static class CallerRunsPolicy implements RejectedExecutionHandler;
public static class DiscardPolicy implements RejectedExecutionHandler;
public static class DiscardOldestPolicy implements RejectedExecutionHandler;
}
There is one rejected execution handler for the entire pool; it
applies to all potential tasks. You can write your own rejected
execution handler, or you can use one of four predefined handlers. By
choosing a predefined rejected execution handler梠r by creating
your own handler梱our program can take appropriate action when
a task is rejected.
Here are the predefined handlers:
- AbortPolicy
-
This handler does not allow the new task to be scheduled when the
queue is full (or the pool has been shut down); in that case, the
execute() method throws a
RejectedExecutionException.
That exception is a runtime exception, so when using this policy,
it's up to the program to catch the exception.
Otherwise, the exception is propagated up the stack.
This is the default policy for rejected tasks.
- CallerRunsPolicy
-
This handler executes the new task independently of the thread pool
if the queue is full. That is, rather than queuing the task and
executing it in another thread, the task is immediately executed by
calling its run() method, and the
execute() method does not return until the task
has completed. If the task is rejected because the pool has been shut
down, the task is silently discarded.
- DiscardPolicy
-
This handler silently discards the task. No exception is thrown.
- DiscardOldestPolicy
-
This handler silently discards the oldest task in the queue and then
queues the new task. When used with a
LinkedBlockingQueue or
ArrayBlockingQueue, the oldest task is the one
that is first in line to execute when a thread becomes idle. When
used with a SynchronousQueue, there are never
waiting tasks and so the execute() method
silently discards the submitted task.
If the pool has been shut down, the task is silently discarded.
To create your own rejected task handler, create a class that
implements the
RejectedExecutionHandler
interface. Your handler (just like a predefined handler) can then be
set
using
the setRejectedExecutionHandler()
method of the thread pool executor.
|