Tuesday, 15 August 2017

Thread Pool | A chapter to discuss

ThreadPool:-


Please refer Java docs for detail description of ThreadPool

ThreadPool creates a group of fixed size threads. Whenever a runnable object comes a thread is assigned it from threadpool and after completion thread is contained in the thread pool again.
ThreadPool improve performance as there is no need to create new threads

Different way of creating ThreadPool using concurrent api;-

public static void main(String[] args) {
//Creates an Executor that uses a single worker thread operating off an unbounded queue.
//(Note however that if this single thread terminates due to a failure during execution prior to shutdown,
//a new one will take its place if needed to execute subsequent tasks.).
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
//Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);
//thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
}



In below example Thread Pool of fixed thread size 5 is created and used by 10 runnable,

public void TestThreadPool() {
ExecutorService executor = Executors.newFixedThreadPool(5);
for(int i = 0;i<10;i++) {
Runnable worker = new Runnable() {
public void run() {
System.out.println("Worker thread " + Thread.currentThread().getName() + " execution started.");
System.out.println("Worker thread " + Thread.currentThread().getName() + " execution completed.");
}
};
executor.execute(worker);
executor.shutdown();
}
}
}



Custome ThreadPool implementation:-

package com.test.main;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPool {
private BlockingQueue<Runnable> runnableQueue;
private WorkerThread[] workerThread;
private boolean isShutDown = false;
public ThreadPool(int size) {
runnableQueue = new LinkedBlockingQueue<>();
workerThread = new WorkerThread[size];
for (int threadCount = 0; threadCount < size; threadCount++) {
workerThread[threadCount] = new WorkerThread();
workerThread[threadCount].start();
}
}
public void execute(Runnable task) throws Exception {
if (this.isShutDown) {
throw new Exception("Invalid state exception");
}
synchronized (runnableQueue) {
runnableQueue.put(task);
runnableQueue.notify();
}
}
public void shutDown() {
this.isShutDown = true;
}
private class WorkerThread extends Thread {
Runnable worker;
public void run() {
try {
while (true) {
if (isShutDown && runnableQueue.size() == 0) {
this.interrupt();
}
synchronized (runnableQueue) {
worker = runnableQueue.take();
System.out.println(this.getName());
worker.run();
}
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " stopped");
}
}
}
}

package com.test.main;

public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool(2);
for (int count = 0; count < 5; count++) {
try {
threadPool.execute(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
System.out.println("Task executed");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
threadPool.shutDown();
}
}

1 comment: