站内搜索: 请输入搜索关键词
当前页面: 图书首页 > Java Threads, Third Edition

8.3 The Producer/Consumer Pattern - Java Threads, Third Edition

Previous Section  < Day Day Up >  Next Section

8.3 The Producer/Consumer Pattern

One of the more common patterns in threaded programming is the producer/consumer pattern. The idea is to process data asynchronously by partitioning requests among different groups of threads. The producer is a thread (or group of threads) that generates requests (or data) to be processed. The consumer is a thread (or group of threads) that takes those requests (or data) and acts upon them. This pattern provides a clean separation that allows for better thread design and makes development and debugging easier. This pattern is shown in Figure 8-1.

Figure 8-1. The producer/consumer pattern
figs/jth3_0801.gif


The producer/consumer pattern is common for threaded programs because it is easy to make threadsafe. We just need to provide a safe way to pass data from the producer to the consumer. Data needs to be synchronized only during the small period of time when it is being passed between producer and consumer. We can use simple synchronization since the acts of inserting and removing from the collection are single operations. Therefore, any threadsafe vector, list, or queue can be used.

The queue-based collection classes added to J2SE 5.0 were specifically designed for this model. The queue data type is perfect to use for this pattern since it has the simple semantics of adding and removing a single element (with an optional ordering of the requests). Furthermore, blocking queues provide thread-control functionality: this allows you to focus on the functionality of your program while the queue takes care of thread and space management issues. Of course, if you need control over such issues, you can use a nonblocking queue and use your own explicit synchronization and notification.

Here's a simple producer that uses a blocking queue:

package javathreads.examples.ch08.example6;



import java.util.*;

import java.util.concurrent.*;



public class FibonacciProducer implements Runnable {

    private Thread thr;

    private BlockingQueue<Integer> queue;



    public FibonacciProducer(BlockingQueue<Integer> q) {

        queue = q;

        thr = new Thread(this);

        thr.start( );

    }



    public void run( ) {

        try {

            for(int x=0;;x++) {

                Thread.sleep(1000);

                queue.put(new Integer(x));

                System.out.println("Produced request " + x);

            }

        } catch (InterruptedException ex) {

        }

    }

}

The producer is implemented to run in a separate thread; it uses the queue to store requests to be processed. We're using a blocking queue because we want the queue to handle the case where the producer gets too far ahead of the consumer. When that happens, we want the producer to block (so that it does not produce any more requests until the consumer catches up).

Here's the consumer:

package javathreads.examples.ch08.example6;



import java.util.concurrent.*;



public class FibonacciConsumer implements Runnable {

    private Fibonacci fib = new Fibonacci( );

    private Thread thr;

    private BlockingQueue<Integer> queue;



    public FibonacciConsumer(BlockingQueue<Integer> q) {

        queue = q;

        thr = new Thread(this);

        thr.start( );

    }



    public void run( ) {

        int request, result;

        try {

            while (true) {

                request = queue.take( ).intValue( );

                result = fib.calculateWithCache(request);

                System.out.println(

                        "Calculated result of " + result + " from " + request);

            }

        } catch (InterruptedException ex) {

        }

    }

}

The consumer also runs in its own thread. It blocks until a request is in the queue, at which point it calculates a Fibonacci number based on the request. The actual calculation is performed by the Fibonacci class available in the online examples (along with a testing program).

Notice that the producer and consumer threads are decoupled: the producer never directly calls the consumer (and vice versa). This allows us to interchange different producers without affecting the consumer. It also allows us to have multiple producers serviced by a single consumer, or multiple consumers servicing a single producer. More generally, we can vary the number of either based on performance needs or user requirements.

The queue has also hidden all of the interesting thread code. When the queue is full, the producer blocks: it waits on a condition variable. Later, when the consumer takes an element from the queue, it notifies the waiting producer. A similar situation arises when the consumer calls the take() method on an empty queue. You could write all the condition variable code to handle this, but it's far easier to allow the queue to do it for you.

We chose to calculate a Fibonacci number in our test program because we used a recursive algorithm that takes an increasingly long time to compute. It's interesting to watch how the producer and consumer interact in this case. In the beginning, the consumer is blocked a lot of the time because it can calculate the Fibonacci number in less than one second (the time period between requests from the producer). Later, the producer spends most of its time blocked because it has overwhelmed the consumer and filled the queue.

If you have a multiprocessor machine, you can run the example with multiple consumer threads, but eventually the result is the same: the calculations take too long for the consumers to keep up.

    Previous Section  < Day Day Up >  Next Section