站内搜索: 请输入搜索关键词
当前页面: 图书首页 > Java Concurrency in Practice

Section 8.5.  Parallelizing Recursive Algorithms - Java Concurrency in Practice

Previous Page
Next Page

8.5. Parallelizing Recursive Algorithms

The page rendering examples in Section 6.3 went through a series of refinements in search of exploitable parallelism. The first attempt was entirely sequential; the second used two threads but still performed all the image downloads sequentially; the final version treated each image download as a separate task to achieve greater parallelism. Loops whose bodies contain nontrivial computation or perform potentially blocking I/O are frequently good candidates for parallelization, as long as the iterations are independent.

If we have a loop whose iterations are independent and we don't need to wait for all of them to complete before proceeding, we can use an Executor to transform a sequential loop into a parallel one, as shown in processSequentially and processInParallel in Listing 8.10.

Listing 8.10. Transforming Sequential Execution into Parallel Execution.

void processSequentially(List<Element> elements) {
    for (Element e : elements)
        process(e);
}

void processInParallel(Executor exec, List<Element> elements) {
    for (final Element e : elements)
        exec.execute(new Runnable() {
            public void run() { process(e); }
        });
}

A call to processInParallel returns more quickly than a call to processSequentially because it returns as soon as all the tasks are queued to the Executor, rather than waiting for them all to complete. If you want to submit a set of tasks and wait for them all to complete, you can use ExecutorService.invokeAll; to retrieve the results as they become available, you can use a CompletionService, as in Renderer on page 130.

Sequential loop iterations are suitable for parallelization when each iteration is independent of the others and the work done in each iteration of the loop body is significant enough to offset the cost of managing a new task.


Loop parallelization can also be applied to some recursive designs; there are often sequential loops within the recursive algorithm that can be parallelized in the same manner as Listing 8.10. The easier case is when each iteration does not require the results of the recursive iterations it invokes. For example, sequentialRecursive in Listing 8.11 does a depth-first traversal of a tree, performing a calculation on each node and placing the result in a collection. The transformed version, parallelRecursive, also does a depth-first traversal, but instead of computing the result as each node is visited, it submits a task to compute the node result.

Listing 8.11. Transforming Sequential Tail-recursion into Parallelized Recursion.

public<T> void sequentialRecursive(List<Node<T>> nodes,
                                   Collection<T> results) {
    for (Node<T> n : nodes) {
        results.add(n.compute());
        sequentialRecursive(n.getChildren(), results);
    }
}

public<T> void parallelRecursive(final Executor exec,
                                 List<Node<T>> nodes,
                                 final Collection<T> results) {
    for (final Node<T> n : nodes) {
        exec.execute(new Runnable() {
            public void run() {
                results.add(n.compute());
            }
        });
        parallelRecursive(exec, n.getChildren(), results);
    }
}

When parallelRecursive returns, each node in the tree has been visited (the traversal is still sequential: only the calls to compute are executed in parallel) and the computation for each node has been queued to the Executor. Callers of parallelRecursive can wait for all the results by creating an Executor specific to the traversal and using shutdown and awaitTermination, as shown in Listing 8.12.

Listing 8.12. Waiting for Results to be Calculated in Parallel.

public<T> Collection<T> getParallelResults(List<Node<T>> nodes)
        throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    parallelRecursive(exec, nodes, resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    return resultQueue;
}

8.5.1. Example: A Puzzle Framework

An appealing application of this technique is solving puzzles that involve finding a sequence of transformations from some initial state to reach a goal state, such as the familiar "sliding block puzzles",[7] "Hi-Q", "Instant Insanity", and other solitaire puzzles.

[7] See http://www.puzzleworld.org/SlidingBlockPuzzles.

We define a "puzzle" as a combination of an initial position, a goal position, and a set of rules that determine valid moves. The rule set has two parts: computing the list of legal moves from a given position and computing the result of applying a move to a position. Puzzle in Listing 8.13 shows our puzzle abstraction; the type parameters P and M represent the classes for a position and a move. From this interface, we can write a simple sequential solver that searches the puzzle space until a solution is found or the puzzle space is exhausted.

Listing 8.13. Abstraction for Puzzles Like the "Sliding Blocks Puzzle".

public interface Puzzle<P, M> {
    P initialPosition();
    boolean isGoal(P position);
    Set<M> legalMoves(P position);
    P move(P position, M move);
}

Node in Listing 8.14 represents a position that has been reached through some series of moves, holding a reference to the move that created the position and the previous Node. Following the links back from a Node lets us reconstruct the sequence of moves that led to the current position.

SequentialPuzzleSolver in Listing 8.15 shows a sequential solver for the puzzle framework that performs a depth-first search of the puzzle space. It terminates when it finds a solution (which is not necessarily the shortest solution).

Rewriting the solver to exploit concurrency would allow us to compute next moves and evaluate the goal condition in parallel, since the process of evaluating one move is mostly independent of evaluating other moves. (We say "mostly" because tasks share some mutable state, such as the set of seen positions.) If multiple processors are available, this could reduce the time it takes to find a solution.

ConcurrentPuzzleSolver in Listing 8.16 uses an inner SolverTask class that extends Node and implements Runnable. Most of the work is done in run: evaluating the set of possible next positions, pruning positions already searched, evaluating whether success has yet been achieved (by this task or by some other task), and submitting unsearched positions to an Executor.

To avoid infinite loops, the sequential version maintained a Set of previously searched positions; ConcurrentPuzzleSolver uses a ConcurrentHashMap for this purpose. This provides thread safety and avoids the race condition inherent in conditionally updating a shared collection by using putIfAbsent to atomically add a position only if it was not previously known. ConcurrentPuzzleSolver uses the internal work queue of the thread pool instead of the call stack to hold the state of the search.

Listing 8.14. Link Node for the Puzzle Solver Framework.

@Immutable
static class Node<P, M> {
    final P pos;
    final M move;
    final Node<P, M> prev;

    Node(P pos, M move, Node<P, M> prev) {...}

    List<M> asMoveList() {
        List<M> solution = new LinkedList<M>();
        for (Node<P, M> n = this; n.move != null; n = n.prev)
            solution.add(0, n.move);
        return solution;
    }
}

The concurrent approach also trades one form of limitation for another that might be more suitable to the problem domain. The sequential version performs a depth-first search, so the search is bounded by the available stack size. The concurrent version performs a breadth-first search and is therefore free of the stack size restriction (but can still run out of memory if the set of positions to be searched or already searched exceeds the available memory).

In order to stop searching when we find a solution, we need a way to determine whether any thread has found a solution yet. If we want to accept the first solution found, we also need to update the solution only if no other task has already found one. These requirements describe a sort of latch (see Section 5.5.1) and in particular, a result-bearing latch. We could easily build a blocking resultbearing latch using the techniques in Chapter 14, but it is often easier and less error-prone to use existing library classes rather than low-level language mechanisms. ValueLatch in Listing 8.17 uses a CountDownLatch to provide the needed latching behavior, and uses locking to ensure that the solution is set only once.

Each task first consults the solution latch and stops if a solution has already been found. The main thread needs to wait until a solution is found; getValue in ValueLatch blocks until some thread has set the value. ValueLatch provides a way to hold a value such that only the first call actually sets the value, callers can test whether it has been set, and callers can block waiting for it to be set. On the first call to setValue, the solution is updated and the CountDownLatch is decremented, releasing the main solver thread from getValue.

The first thread to find a solution also shuts down the Executor, to prevent new tasks from being accepted. To avoid having to deal with RejectedExecutionException, the rejected execution handler should be set to discard submitted tasks. Then, all unfinished tasks eventually run to completion and any subsequent attempts to execute new tasks fail silently, allowing the executor to terminate. (If the tasks took longer to run, we might want to interrupt them instead of letting them finish.)

Listing 8.15. Sequential Puzzle Solver.

public class SequentialPuzzleSolver<P, M> {
    private final Puzzle<P, M> puzzle;
    private final Set<P> seen = new HashSet<P>();

    public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
        this.puzzle = puzzle;
    }

    public List<M> solve() {
        P pos = puzzle.initialPosition();
        return search(new Node<P, M>(pos, null, null));
    }

    private List<M> search(Node<P, M> node) {
        if (!seen.contains(node.pos)) {
            seen.add(node.pos);
            if (puzzle.isGoal(node.pos))
                return node.asMoveList();
            for (M move : puzzle.legalMoves(node.pos)) {
                P pos = puzzle.move(node.pos, move);
                Node<P, M> child = new Node<P, M>(pos, move, node);
                List<M> result = search(child);
                if (result != null)
                    return result;
            }
        }
        return null;
    }

    static class Node<P, M> {  /*  Listing 8.14  */  }
}

Listing 8.16. Concurrent Version of Puzzle Solver.

public class ConcurrentPuzzleSolver<P, M> {
    private final Puzzle<P, M> puzzle;
    private final ExecutorService exec;
    private final ConcurrentMap<P, Boolean> seen;
    final ValueLatch<Node<P, M>> solution
            = new ValueLatch<Node<P, M>>();
    ...
    public List<M> solve() throws InterruptedException {
        try {
            P p = puzzle.initialPosition();
            exec.execute(newTask(p, null, null));
            // block until solution found
            Node<P, M> solnNode = solution.getValue();
            return (solnNode == null) ? null : solnNode.asMoveList();
        } finally {
            exec.shutdown();
        }
    }

    protected Runnable newTask(P p, M m, Node<P,M> n) {
        return new SolverTask(p, m, n);
    }

    class SolverTask extends Node<P, M> implements Runnable {
        ...
        public void run() {
            if (solution.isSet()
                    || seen.putIfAbsent(pos, true) != null)
                return; // already solved or seen this position
            if (puzzle.isGoal(pos))
                solution.setValue(this);
            else
                for (M m : puzzle.legalMoves(pos))
                    exec.execute(
                        newTask(puzzle.move(pos, m), m, this));
        }
    }
}

Listing 8.17. Result-bearing Latch Used by ConcurrentPuzzleSolver.

@ThreadSafe
public class ValueLatch<T> {
    @GuardedBy("this") private T value = null;
    private final CountDownLatch done = new CountDownLatch(1);

    public boolean isSet() {
        return (done.getCount() == 0);
    }

    public synchronized void setValue(T newValue) {
        if (!isSet()) {
            value = newValue;
            done.countDown();
        }
    }

    public T getValue() throws InterruptedException {
        done.await();
        synchronized (this) {
            return value;
        }
    }
}

ConcurrentPuzzleSolver does not deal well with the case where there is no solution: if all possible moves and positions have been evaluated and no solution has been found, solve waits forever in the call to getSolution. The sequential version terminated when it had exhausted the search space, but getting concurrent programs to terminate can sometimes be more difficult. One possible solution is to keep a count of active solver tasks and set the solution to null when the count drops to zero, as in Listing 8.18.

Finding the solution may also take longer than we are willing to wait; there are several additional termination conditions we could impose on the solver. One is a time limit; this is easily done by implementing a timed getValue in ValueLatch (which would use the timed version of await), and shutting down the Executor and declaring failure if getValue times out. Another is some sort of puzzle-specific metric such as searching only up to a certain number of positions. Or we can provide a cancellation mechanism and let the client make its own decision about when to stop searching.

Listing 8.18. Solver that Recognizes when No Solution Exists.

public class PuzzleSolver<P,M> extends ConcurrentPuzzleSolver<P,M> {
    ...
    private final AtomicInteger taskCount = new AtomicInteger(0);

    protected Runnable newTask(P p, M m, Node<P,M> n) {
        return new CountingSolverTask(p, m, n);
    }

    class CountingSolverTask extends SolverTask {
        CountingSolverTask(P pos, M move, Node<P, M> prev) {
            super(pos, move, prev);
            taskCount.incrementAndGet();
        }
        public void run() {
            try {
                super.run();
            } finally {
                if (taskCount.decrementAndGet() == 0)
                    solution.setValue(null);
            }
        }
    }
}


Previous Page
Next Page