In this article, we will implement scatter and gather patterns using multithreading concepts in Java. With this pattern, we can make multiple I/O requests(HTTP calls) parallelly and wait till the responses of all the requests arrive and then we can combine all those responses and perform further processing.
To have a more understanding on this let us assume that we want to develop an aggregator app that finds the price of flight tickets of major airlines from a source to a destination, compares them and finds the cheaper fare.
There are multiple ways to achieve this and we will discuss some of the ways to do that using Thread.join(), Thread.sleep(), using a count down latch and executor service.
First of all, let us create a Runnable implementation that will have the implementation of making the HTTP request inside it's run method and we will pass this implementation to all our all threads.
public class Task implements Runnable{ final String url; final String name; final Hashtable<String, Integer> map; public Task(String url, String name, Hashtable<String, Integer> map){ this.url = url; this.name = name; this.map = map; } @SneakyThrows @Override public void run() { //http call to get the price Thread.sleep(1000); map.put(name, new Random().nextInt(1000)); } }
Using Thread.sleep()
The sleep()
method is used to pause the execution of current thread for any given milliseconds. Hence, a very simple approach for our problem might be to invoke all our APIs parallely and pause the execution of current thread for some seconds assuming that the response would arrive with in that seconds.
@SneakyThrows public static void main(String[] args) { Hashtable<String, Integer> map = new Hashtable<>(); long start = System.currentTimeMillis(); Thread t1 = new Thread(new Task("url1", "A", map)); Thread t2 = new Thread(new Task("url2", "B", map)); Thread t3 = new Thread(new Task("url3", "C", map)); t1.start(); t2.start(); t3.start(); Thread.sleep(3000); map.entrySet().forEach(entry ->{ System.out.println(entry.getKey() + " " + entry.getValue()); }); }
But this is a very inefficient solution as this implementation is on the assumption that in the worst case the upper limit of the API response time would be 3 seconds. Hence, even if the response arrives within few milliseconds, the current thread will be paused for 3 seconds.
Using Thread.join
The join() method is used to pause the execution of currently running thread until the thread on which join() is called finishes its execution. Hence, after starting all our threads to parallelly invoke all the Apis, we can actually apply join method which will pause the execution of the main thread until the threads t1, t2 and t3 complete their execution.
@SneakyThrows public static void main(String[] args) { Hashtable<String, Integer> map = new Hashtable<>(); long start = System.currentTimeMillis(); Thread t1 = new Thread(new Task("url1", "A", map)); Thread t2 = new Thread(new Task("url2", "B", map)); Thread t3 = new Thread(new Task("url3", "C", map)); t1.start(); t2.start(); t3.start(); t1.join(); t2.join(); t3.join(); }
But this approach is not scalable and inefficient. To enhance it further, we can add timeouts in millis inside our join().
Using CountDown Latch
We can use count down latch for an efficient implementation with which we can add the timeout but our execution will not be paused till the timeout we provide. If the execution completes early, our main thread will return immediately. The timeout is the upper bound that we give as an input because it may happen that one of the API health might not be good and it requires more time for sending the response.
To use the count down latch, we need to modify our Runnable implementation. The constructor would now require countdown latch too during instantiation. once the run method is executed, the latch will count down by 1.
public class RunnableTask implements Runnable{ final String url; final String name; final Hashtable<String, Integer> map; final CountDownLatch latch; public RunnableTask(String url, String name, Hashtable<String, Integer> map, CountDownLatch latch){ this.url = url; this.name = name; this.map = map; this.latch = latch; } @SneakyThrows @Override public void run() { //http call to get the price Thread.sleep(1000); map.put(name, new Random().nextInt(1000)); latch.countDown(); } }
Now, let us see how we can create our task and use ExecutorService for our execution. As we have 3 tasks, we have initialized the latch with count 3 and the main thread pauses till the count of latch becomes either 0 or a timeout happens after 2 seconds.
@SneakyThrows public static void main(String[] args) { Hashtable<String, Integer> map = new Hashtable<>(); CountDownLatch latch = new CountDownLatch(3); RunnableTask task1 = new RunnableTask("url1", "A", map, latch); RunnableTask task2 = new RunnableTask("url2", "B", map, latch); RunnableTask task3 = new RunnableTask("url3", "C", map, latch); ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.submit(task1); executorService.submit(task2); executorService.submit(task3); latch.await(2, TimeUnit.SECONDS); executorService.shutdown(); map.entrySet().forEach(entry ->{ System.out.println(entry.getKey() + " " + entry.getValue()); }); }
Using ExecutorService.invokeAll()
We can also use the Executor service framework for an efficient way which will also improve our scalability problem. You can read my previous article for a detailed explanation on ExecutorService.
We will initialize our executor service with a fixed thread pool of size 10 and submit all our tasks to it.
invokeAll()
executes the given tasks, returning a list of Futures holding their status and results when all complete. It also accepts a parameter for timeout for the maximum time to wait.
As invokeAll()
accepts Callable, we need to have Callable implementation rather then Runnable.
public class CallableTask implements Callable { final String url; final String name; public CallableTask(String url, String name){ this.url = url; this.name = name; } @SneakyThrows @Override public Integer call() { //http call to get the price Thread.sleep(1000); return new Random().nextInt(1000); } }
Now, let us use submit our task to ExecutorService. Here, 2 seconds is the maximum time to wait.
Callable task4 = new CallableTask("url1", "A"); Callable task5 = new CallableTask("url2", "B"); Callable task6 = new CallableTask("url3", "C"); List> tasks = new ArrayList<>(); tasks.add(task4); tasks.add(task5); tasks.add(task6); ExecutorService executorService = Executors.newFixedThreadPool(10); List > results = executorService.invokeAll(tasks, 2, TimeUnit.SECONDS); results.forEach(f -> { try { System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); executorService.shutdown();
Conclusion
In this article, we implemented scatter and gather patterns using multithreading concepts in Java. With this pattern, we made multiple I/O requests(HTTP calls) parallelly and waited till the responses of all the requests arriveed and then we combined all those responses and perform further processing.