A Guide to Executor Service in Java

A Guide to Executor Service in Java thumbnail
10K
By Dhiraj 13 June, 2020

In this article, we will discuss about ExecutorService framework provided inside java.util.concurrent.ExecutorService. We will discuss about its instantiation, submitting tasks to it and different ways to shutdown an executor service.

In Java, there are a couple of ways in which we can perform any given task asynchronously. A traditional approach would be to pass a Runnable instance to a thread instance and invoke the start() method on the thread instance. Now, once the scheduler picks this job, the code inside the run() method of Runnable instance is executed by this newly created thread instance.

The implementation would be similar to this. We are just printing the currently executing thread name.

public class ExecutorServiceDemo {

    public static void main(String[] args) {
        System.out.println(String.format("current thread - %s", Thread.currentThread().getName()));
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(String.format("Running in a separate thread - %s", Thread.currentThread().getName()));
            }
        });
        t1.start();
    }
}

With Java 8 lambda expression the above implementation would look like this.

public static void main(String[] args) {
        System.out.println(String.format("current thread - %s", Thread.currentThread().getName()));
        Thread t1 = new Thread(() ->
                System.out.println(String.format("Running in a separate thread - %s", Thread.currentThread().getName())));
        t1.start();
    }
Output
current thread - main
Running in a separate thread - Thread-0

Once this task is completed the thread is killed by the JVM. Now let us assume we have 100 such jobs to perform then it's not practical to just create a 100 thread instance and perform the job.

Each Java thread maps to one OS-level thread and it depends on the underlying CPU core about how many threads can be executed concurrently. Hence, there is a fixed number of threads that can run in an OS. Also, creating a new thread every time is a costlier process.

A good approach would be to create a fixed-size thread pool and submit these tasks and these fixed-size threads can execute these tasks one after the other.

This very execution mechanism can be achieved by ExecutorService. Hence, with ExecutorService it is possible to execute tasks concurrently in an asynchronous manner in the background.

ExecutorService Instantiation

We can use any one of the factory methods for executor service instantiation. The choice of these methods depends upon the requirement. Below are some of them:

ExecutorService executorService = Executors.newFixedThreadPool(4); //1
ExecutorService executorService = Executors.newSingleThreadExecutor();   //2
ExecutorService executorService = Executors.newCachedThreadPool();    //3
ExecutorService executorService = Executors.newScheduledThreadPool(4);    //4

1. With newFixedThreadPool a thread pool is created that reuses a fixed number of threads operating off a shared blocking queue(LinkedBlockingQueue). At any point, at most 4 threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks this method has an overloaded version that accepts ThreadFactory- the factory to use while creating new threads

2. newSingleThreadExecutor() creates an Executor that uses a single worker thread operating off an unbounded queue(LinkedBlockingQueue).

Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.

Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time.

3. newCachedThreadPool() creates a thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.

Calls to execute will reuse previously constructed if available. If no existing thread is available, a new thread will be created and added to the pool(AsynchronousQueue).

Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources.

4. newScheduledThreadPool() creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.

Now, the question arises that on what basis the size of thread pool is selected. For instance, for a CPU intensive job a maximum of thread that can run parallely is equal to the no of available CPU cores in an ideal situation where as

int cpuCores = Runtime.getRuntime().availableProcessors();

But for an I/O operation such as a network call we can have a large number of threads in a thread pool. Let us assume we made 4 parallel network calls with 4 threads in our thread pool. Once this network call is made the thread goes into waiting state till the response arrives and in this scenario, we can have other threads to execute remaining jobs. Hence, we can have a large number of threads in our thread pool.

Submitting Task to ExecutorService

We can submit or assign tasks to executor service using execute(), submit(), invokeAny() and invokeAll() methods.

The execute() method only accepts an instance of Runnable interface and throws a nullpointer exception for null command. The command may execute in a new thread, in a pooled thread, or in the calling thread, based on the Executor implementation.

void execute(Runnable command);

Let us assume below Runnable implementation.

public class RunnableDemo implements Runnable {

    @lombok.SneakyThrows
    @Override
    public void run() {
        System.out.println("Inside run method of RunnableDemo");
        Thread.sleep(2000);
    }
}

Let us execute this with ExecutorService.

public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.execute(new RunnableDemo());
    }

The submit() method submits a value-returning task for execution and returns a Future representing the pending results of the task. The Future's get method will return the task's result upon successful completion.

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Below is our Callable interface implementation.

public class CallableDemo implements Callable {

    @Override
    public String call() throws Exception {
        System.out.println("Inside run method of CallableDemo");
        return "done";
    }
}

    @SneakyThrows
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Future result1 = executorService.submit(new CallableDemo());
        result1.get();
        Future result = executorService.submit(new RunnableDemo());
        Future result3 = executorService.submit(new RunnableDemo());
    }

A call to get() method would immediately block for waiting for the task to be completed.

invokeAny() executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do. Upon normal or exceptional return, tasks that have not completed are cancelled.

The results of this method are undefined if the given collection is modified while this operation is in progress.

Let us create one more Callable implementation and submit both the implementations.

public class CallableDemo1 implements Callable {

    @Override
    public String call() throws Exception {
        System.out.println("Inside run method of CallableDemo1");
        return "done1";
    }
}

    @SneakyThrows
    public static void main(String[] args) {
	ExecutorService executorService = Executors.newFixedThreadPool(10);
        ArrayList<Callable<String>> list = new ArrayList<>();
        list.add(new CallableDemo());
        list.add(new CallableDemo1());
        String result = executorService.invokeAny(list);
        System.out.println(result);

After running above statements, either of the task is executed randomly and the remaining task is cancelled. Hence, after running a couple of times either done or done1 will be printed.

invokeAll() executes the given tasks, returning a list of Futures holding their status and results when all complete.

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

public static void main(String[] args) {
       
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ArrayList<Callable<String>> list = new ArrayList<>();
        list.add(new CallableDemo());
        list.add(new CallableDemo1());
        List<Future<String>> results = executorService.invokeAll(list);
        results.forEach(f -> {
            try {
                System.out.println(f.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });
        executorService.shutdown();
    }

ExecutorService Shutdown

This method is called to initiate the process of terminating the thread and this method invocation is very much required to shut down the executor service because the executor service does not shut down itself even if all the task has completed execution. Executor service stays alive and wait for other tasks to arrive in the queue and the active executor service will cause the JVM to keep running.

With the invocation of this method, no new tasks will be accepted and an orderly shutdown process is initiated in which previously submitted tasks are executed and the executor service shuts down only after all the running threads have completed executing their tasks.

Whereas shutdownNow() attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

This method does not wait for actively executing tasks to terminate. We use awaitTermination to do that.

List shutdownNow();

awaitTermination blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);

ScheduledExecutorService

This is one of the most useful executor service that helps to creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.

Below snippet runs the task after a delay of 2 seconds.

ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.schedule(new CallableDemo(), 2, TimeUnit.SECONDS);

Similarly, below snippet repeatedly runs the task after every 2 seconds. It accepts an instance of Runnable.

service.scheduleAtFixedRate(new RunnableDemo(), 1, 2, TimeUnit.SECONDS);

Below snippet repeatedly runs the task after 2 seconds after previous task completes.

service.scheduleWithFixedDelay(new RunnableDemo(), 1, 2, TimeUnit.SECONDS);

Conclusion

In this article, we discussed about ExecutorService framework, its instantiation, submitting tasks to it and different ways to shutdown an executor service.

Share

If You Appreciate This, You Can Consider:

We are thankful for your never ending support.

About The Author

author-image
A technology savvy professional with an exceptional capacity to analyze, solve problems and multi-task. Technical expertise in highly scalable distributed systems, self-healing systems, and service-oriented architecture. Technical Skills: Java/J2EE, Spring, Hibernate, Reactive Programming, Microservices, Hystrix, Rest APIs, Java 8, Kafka, Kibana, Elasticsearch, etc.

Further Reading on Core Java