站内搜索: 请输入搜索关键词
当前页面: 图书首页 > Java Threads, Third Edition

10.4 Queues and Sizes - Java Threads, Third Edition

Previous Section  < Day Day Up >  Next Section

10.4 Queues and Sizes

The two fundamental things that affect a thread pool are its size and the queue used for the tasks. These are set in the constructor of the thread pool; the size can change dynamically while the queue must remain fixed. In addition to the constructor, these methods interact with the pool's size and queue:

package java.util.concurrent;

public class ThreadPoolExecutor implements ExecutionService {

    public boolean prestartCoreThread( );

    public int prestartAllCoreThreads( );

    public void setMaximumPoolSize(int maximumPoolSize);

    public int getMaximumPoolSize( );

    public void setCorePoolSize(int corePoolSize);

    public int getCorePoolSize( );

    public int getPoolSize( );

    public int getLargestPoolSize( );



    public int getActiveCount( );

    public BlockingQueue<Runnable> getQueue( );



    public long getTaskCount( );

    public long getCompletedTaskCount( );

}

The first set of methods deal with the thread pool's size, and the remaining methods deal with its queue.


Size

The size of the thread pool varies between a given minimum (or core) and maximum number of threads. In our example, we use the same parameter for both values, making the thread pool a constant size.

If you specify different numbers for the minimum and maximum number of threads, the thread pool dynamically alters the number of threads it uses to run its tasks. The current size (returned from the getPoolSize() method) falls between the core size and the maximum size.


Queue

The queue is the data structure used to hold tasks that are awaiting execution. The choice of queue affects how certain tasks are scheduled. In this case, we've used a linked blocking queue, which places the least constraints on how tasks are added to the queue. Once you've passed this queue to the thread pool, you should not call any methods on it directly. In particular, do not add items directly to the queue; add them through the execute() method of the thread pool. The getQueue() method returns the queue, but you should use that for debugging purposes only; don't execute methods directly on the queue or the internal workings of the thread pool become confused.

These parameters allow considerable flexibility in the way the thread pool operates. The basic principle is that the thread pool tries to keep its minimum number of threads active. If it gets too busy (where busy is a property of the particular queue that the thread pool uses), it adds threads until the maximum number of threads is reached, at which point it does not allow any more tasks to be queued.

There are some nuances in this, particularly in how the queue interacts with the number of threads. Let's take it step by step:

  1. The thread pool is constructed with M core threads and N maximum threads. At this point, no threads are actually created (though you can specify that the pool create the M core threads by calling the thread pool's prestartAllCoreThreads() method or that it preallocate one core thread by calling the prestartCoreThread() method).

  2. A task enters the pool (via the thread pool's execute() method). Now one of five things happens:

    • If the pool has created fewer than M threads, it starts a new thread and runs the new task immediately. Even if some of the existing threads are idle, a new thread is created in the pool's attempt to reach M threads.

    • If the pool has between M and N threads and one of those threads is idle, the task is run by an idle thread.

    • If the pool has between M and N threads and all the threads are busy, the thread pool examines the existing work queue. If the task can be placed on the work queue without blocking, it's put on the queue and no new thread is started.

    • If the pool has between M and N threads, all threads are busy, and the task cannot be added to the queue without blocking, the pool starts a new thread and runs the task on that thread.

    • If the pool has N threads and all threads are busy, the pool attempts to place the new task on the queue. If the queue has reached its maximum size, this attempt fails and the task is rejected. Otherwise, the task is accepted and run when a thread becomes idle (and all previously queued tasks have run).

  3. A task completes execution. The thread running the task then runs the next task on the queue. If no tasks are on the queue, one of two things happens:

    • If the pool has more than M threads, the thread waits for a new task to be queued. If a new task is queued within the timeout period, the thread runs it. If not, the thread exits, reducing the total number of threads in the pool. The timeout period is a parameter used to construct the thread pool; in our example, we specified 50 seconds (50000L time units of TimeUnit.MILLISECONDS). Note that if the specified timeout is 0, the thread always exits, regardless of the requested minimum thread pool size.

    • If the pool has M or fewer threads, the thread blocks indefinitely waiting for a new task to be queued (unless the timeout was 0, in which case it exits). It runs the new task when available.

    What are the implications of all this? It means that the choice of pool size and queue are important to getting the behavior you want. For a queue, you have three choices:

    • A SynchronousQueue, which effectively has a size of 0. In this case, whenever the pool tries to queue a task, it fails. The implication of this is tasks are either run immediately (because the pool has an idle thread or is below its threshold and, therefore, creates a new thread) or are rejected immediately. Note that you can prevent rejection of a task if you specify an unlimited maximum number of threads, but this prevents the throttling benefit of using a thread pool in the first place.

    • An unbounded queue, such as a LinkedBlockingQueue with an unlimited capacity. In this case, adding a task to the queue always succeeds, which means that the thread pool never creates more than M threads and never rejects a task.

    • A bounded queue, such as a LinkedBlockingQueue with a fixed capacity or an ArrayBlockingQueue. Let's suppose that the queue has a bounds of P. As tasks are added to the pool, it creates threads until it reaches M threads. At that point, it starts queueing tasks until the number of waiting tasks reaches P. As more tasks are added, the pool starts adding threads until it reaches N threads. If we reach a state where N threads are active and P tasks are queued, additional tasks are rejected.

In our example, we used a LinkedBlockingQueue with an unbounded capacity and a fixed pool size. This is perhaps the most common configuration of thread pools: it allows tasks to wait for an available thread, and a fixed number of threads is easier to monitor than a variable number of threads. A good alternative to this is to use a bounded queue with a fixed number of threads. In this model, if tasks start to arrive faster than they can be processed, they queue. Unlike the unbounded case, however, at some point the queue threshold is reached and your program must take appropriate action: if it's a server, it can reject future requests from clients, telling them that it's too busy right now and they should try again later.

If you use a thread pool, there is no magic formula that you can use to determine its optimal size and queuing strategy. When the operations are strictly CPU-bound, use only as many threads as there are CPUs. For more complex operations, choosing a thread pool size is a matter of testing different values to see which gives you the best program performance.

10.4.1 Rejected Tasks

Depending on the type of queue you use in the thread pool, a task may be rejected by the execute() method. Tasks are rejected if the queue is full or if the shutdown( ) method has been called on the thread pool.

When a task is rejected, the thread pool calls the rejected execution handler associated with the thread pool. These APIs deal with the rejected execution handler:

package java.util.concurrent;

public interface RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

}



package java.util.concurrent;

public class ThreadPoolExecutor implements ExecutorService {

    public void setRejectedExecutionHandler(RejectedExecutionHandler handler);

    public RejectedExecutionHandler getRejectedExecutionHandler( );

    public static class AbortPolicy implements RejectedExecutionHandler;

    public static class CallerRunsPolicy implements RejectedExecutionHandler;

    public static class DiscardPolicy implements RejectedExecutionHandler;

    public static class DiscardOldestPolicy implements RejectedExecutionHandler;

}

There is one rejected execution handler for the entire pool; it applies to all potential tasks. You can write your own rejected execution handler, or you can use one of four predefined handlers. By choosing a predefined rejected execution handler梠r by creating your own handler梱our program can take appropriate action when a task is rejected.

Here are the predefined handlers:


AbortPolicy

This handler does not allow the new task to be scheduled when the queue is full (or the pool has been shut down); in that case, the execute() method throws a RejectedExecutionException. That exception is a runtime exception, so when using this policy, it's up to the program to catch the exception. Otherwise, the exception is propagated up the stack.

This is the default policy for rejected tasks.


CallerRunsPolicy

This handler executes the new task independently of the thread pool if the queue is full. That is, rather than queuing the task and executing it in another thread, the task is immediately executed by calling its run() method, and the execute() method does not return until the task has completed. If the task is rejected because the pool has been shut down, the task is silently discarded.


DiscardPolicy

This handler silently discards the task. No exception is thrown.


DiscardOldestPolicy

This handler silently discards the oldest task in the queue and then queues the new task. When used with a LinkedBlockingQueue or ArrayBlockingQueue, the oldest task is the one that is first in line to execute when a thread becomes idle. When used with a SynchronousQueue, there are never waiting tasks and so the execute() method silently discards the submitted task.

If the pool has been shut down, the task is silently discarded.

To create your own rejected task handler, create a class that implements the RejectedExecutionHandler interface. Your handler (just like a predefined handler) can then be set using the setRejectedExecutionHandler() method of the thread pool executor.

    Previous Section  < Day Day Up >  Next Section