1. Overview

In this tutorial, we're going to look at how we can submit tasks in batch using the ExecutorService implementations.

2. Sample Application

Let's start with the sample application.

We'll work with the SleepAndReturn task:

public class SleepAndReturn implements Callable<String> {

    private final int millis;

    public SleepAndReturn(int millis) {
        this.millis = millis;
    }

    @Override
    public String call() throws Exception {
        TimeUnit.MILLISECONDS.sleep(millis);

        return "Done at " + millis;
    }
}

SleepAndReturn sleeps for the given amount of time and then returns a String.

3. Submit Tasks with invokeAll

Firstly, we'll use the invokeAll method of ExecutorService. When we provide a list of Callable tasks, invokeAll runs them all and returns a list of Futures when all complete:

public class SubmitTasksInBatch {

    private final int threadCount = 5;

    public void batchWithInvokeAll(List<Callable<String>> tasks) throws InterruptedException {
        final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);

        final List<Future<String>> futures = threadPool.invokeAll(tasks);

        for (Future<String> future : futures) {
            try {
                final String result = future.get();
                System.out.println(result);
            } catch (ExecutionException e) {
                System.out.println("Error occurred.");
            }
        }

        threadPool.shutdown();
    }
}

Here, we have the batchWithInvokeAll method that accepts a list of Callable tasks. Firstly, we're creating a thread pool to run the tasks. Then we're invoking the invokeAll method passing the given tasks. Note that the return value is a list of Futures whose order is the same as the submission order. We're then accessing the actual String result by invoking Future.get.

Next, let's look at the caller:

public static void main(String[] args) throws InterruptedException {
    final Callable<String> task1 = new SleepAndReturn(900);
    final Callable<String> task2 = new SleepAndReturn(700);
    final Callable<String> task3 = new SleepAndReturn(300);
    final List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);

    final SubmitTasksInBatch submitTasks = new SubmitTasksInBatch();
    submitTasks.batchWithInvokeAll(tasks);
}

Here, we're creating three SleepAndReturn tasks.

A sample run outputs:

Done at 900
Done at 700
Done at 300

Note that the output has the same order as the submission although the first submitted task - 900 - completes last.

4. Submit Tasks with Timed invokeAll

Now we'll use invokeAll with a timeout value. When the given time passes, invokeAll cancels the remaining tasks and returns the results as a list of Futures:

public void batchWithInvokeAllWithTimeout(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);

    final List<Future<String>> futures = threadPool.invokeAll(tasks, 500, TimeUnit.MILLISECONDS);

    for (Future<String> future : futures) {
        try {
            final String result = future.get();
            System.out.println(result);
        } catch (CancellationException e) {
            System.out.println("Cancelled.");
        } catch (ExecutionException e) {
            System.out.println("Error occurred.");
        }
    }

    threadPool.shutdown();
}

In this method, we're calling invokeAll with the timeout value of 500 milliseconds. If any tasks don't complete after the given time, the ExecutorService instance cancels them. Be aware that when a task is canceled, Future.get throws a CancellationException. Since it is a runtime exception, the compiler won't require you to handle it. But since it is a timed invocation, we're catching the exception.

A sample run prints:

Cancelled.
Cancelled.
Done at 300

As we can see, the thread pool cancels the first two tasks.

5. Submit Tasks Manually

Next, we'll implement a basic task submission method similar to invokeAll. Given a list of Callable tasks, we'll submit these tasks one by one to a thread pool. When all tasks complete, we'll return a list of Futures:

public void submitInBatchManually(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);

    final List<Future<String>> futures = new ArrayList<>();
    for (Callable<String> task : tasks) {
        futures.add(threadPool.submit(task));
    }

    for (Future<String> future : futures) {
        try {
            final String result = future.get();
            System.out.println(result);
        } catch (ExecutionException e) {
            System.out.println("Error occurred.");
        }
    }

    threadPool.shutdown();
}

This example is very similar to the previous one. Instead of invokeAll, we're calling the submit method to run our tasks. After getting the result by invoking Future.get, we're printing the result. Alternatively, we can collect the Future values in a list similar to invokeAll. The whole operation has the effect of waiting for all tasks to complete.

When we call this method:

public static void main(String[] args) throws InterruptedException {
    final Callable<String> task1 = new SleepAndReturn(900);
    final Callable<String> task2 = new SleepAndReturn(700);
    final Callable<String> task3 = new SleepAndReturn(300);
    final List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);

    final SubmitTasksInBatch submitTasks = new SubmitTasksInBatch();
    submitTasks.submitInBatchManually(tasks);
}

It prints:

Done at 900
Done at 700
Done at 300

The results are again printed in the submission order, not in the completion order.

6. Submit Tasks with CompletionService

Until now, when we submitted tasks, we waited for the completion of all tasks, in other words for the completion of the longest-running task. Assume that, the first submitted task completes in ten seconds, and the second one completes in three seconds. Although the result of the second task is ready, we can't access it until the first task finishes. To remedy this problem, we'll use the CompletionService class.

CompletionService allows us to submit tasks similar to a thread pool, but in addition to that we can get the task results as soon as they're ready:

public void batchWithCompletionService(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(threadPool);

    for (Callable<String> task : tasks) {
        completionService.submit(task);
    }

    for (int i = 0; i < tasks.size(); i++) {
        final Future<String> future = completionService.take();
        try {
            final String result = future.get();

            System.out.println(result);
        } catch (ExecutionException e) {
            System.out.println("Error occurred.");
        }
    }

    threadPool.shutdown();
}

In this example, after creating a thread pool we're initializing an instance of ExecutorCompletionService. When we submit tasks to the CompletionService instance, it delegates the execution to the wrapped thread pool. To acquire the results, we're calling the CompletionService take method. This method blocks until a task completes. As long as we know the number of submitted and completed tasks, it is easy to work with a CompletionService. 

Although we've created a thread pool exclusively for the CompletionService, we can also use an existing thread pool. This way, for some set of tasks, we can get the results in the order they complete. And for the others, we can have the default behavior.

Next, we'll call our method:

public static void main(String[] args) throws InterruptedException {
    final Callable<String> task1 = new SleepAndReturn(900);
    final Callable<String> task2 = new SleepAndReturn(700);
    final Callable<String> task3 = new SleepAndReturn(300);
    final List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);

    final SubmitTasksInBatch submitTasks = new SubmitTasksInBatch();
    submitTasks.batchWithCompletionService(tasks);
}

It prints:

Done at 300
Done at 700
Done at 900

Unlike the previous examples, the output shows the completion order.

7. Submit Tasks with invokeAny

Lastly, we'll submit multiple tasks and get the result of the first one that completes successfully. For this purpose, we'll use the invokeAny method:

public void batchwithInvokeAny(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);

    try {
        final String result = threadPool.invokeAny(tasks);

        System.out.println(result);
    } catch (ExecutionException e) {
        System.out.println("No tasks successfully completed.");
    }

    threadPool.shutdown();
}

In this example, we're submitting the given tasks to the thread pool calling invokeAny. And it returns the result, not a Future - if there is any successfully completed task.

When we provide some tasks:

public static void main(String[] args) throws InterruptedException {
    final Callable<String> task1 = new SleepAndReturn(900);
    final Callable<String> task2 = new SleepAndReturn(700);
    final Callable<String> task3 = new SleepAndReturn(300);
    final List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);

    final SubmitTasksInBatch submitTasks = new SubmitTasksInBatch();
    submitTasks.batchwithInvokeAny(tasks);
}

The output shows:

Done at 300

8. Submit Tasks with Timed invokeAny

Similar to invokeAll, invokeAny also has a timed variant. If any task can't complete in the given time, invokeAny throws a TimeoutException.

public void batchWithInvokeAnyWithTimeout(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);

    try {
        final String result = threadPool.invokeAny(tasks, 200, TimeUnit.MILLISECONDS);

        System.out.println(result);
    } catch (TimeoutException e) {
        System.out.println("No successful result until timeout.");
    } catch (ExecutionException e) {
        System.out.println("No tasks successfully completed.");
    }

    threadPool.shutdown();
}

Here, we're defining the timeout as 200 milliseconds.

Given the same tasks as the previous example, a sample run prints:

No successful result until timeout.

9. Summary

In this tutorial, we've looked at how we can submit multiple tasks to an ExecutorService instance. First, we've looked at the invokeAll method to get all results in the submission order. Then we've created a custom method similar to invokeAll. We learned that CompletionService allows us to get the task results as they become ready. We then investigated invokeAny to acquire the first result among similar tasks.

Lastly, check out the source code for all examples in this tutorial over on Github.