Concurrent API


The purpose of concurrent API to provide the programmer high level capability in multithreading
java.util.concurrent.* package provide various interfaces and classes to achieve the goal of high performing application


Java Concurrent Collection and Classes



Atomic Variable

Atomic variables work on compare and swap algorithm (low level CPU operation, no synchronization needed). It read the value from a memory and compare with the expected value if values matches then it replace the value with new one, So when we call getAndIncrement() method it first get the current value than call compareAndSet() method
Code Snippet –

int current;
do {
current = get();
} while(!compareAndSet(current, current + 1));

Visit oracle docs for detail:-
Java provides AtomicInteger, AtomicLong, AtomicBoolean and AtomicReference

Let’s see below java code 10 threads are trying to accessing the AtomicInteger and calling getAndIncrement() methods, which return the incremented value.

package com.test.main;

import java.util.concurrent.atomic.AtomicInteger;

public class Test1 {
 private AtomicInteger counter = new AtomicInteger();
 private Thread[] task;
 private int size;

 Test1() {
      size = 10;
      task = new Thread[size];
 }

 public static void main(String[] args) throws Exception {
      new Test1().getCounter();
 }

 private void getCounter() {
      for (int count = 0; count < size; count++) {
            task[count] = new Thread(new Runnable() {
                  public void run() {
                        System.out.print(" "+counter.getAndIncrement());
                  }
            });
            task[count].start();
      }
 }
}
O\P
1 3 4 2 5 0 6 7 8 9

Here you can see the each thread get the updated value of atomic integer without any synchronization block, let’s see example of synchronization where we use and Integer and increment the value but we put a lock before on counter.

public class Test1 {
      private static Integer counter = new Integer(0);
      private Thread[] task;
      private int size;

      Test1() {
            size = 10;
            task = new Thread[size];
      }

      public static void main(String[] args) throws Exception {
            new Test1().getCounter();
      }

      private void getCounter() {
       for (int count = 0; count < size; count++) {
        task[count] = new Thread(new Runnable() {    
            public void run() {
                  try {
                        synchronized (counter) {
                              Thread.sleep(1000);
                              ++counter;
                              System.out.print(" "+counter);
                        }
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
            }
      });
      task[count].start();
      }
 }
}
O\P
1 2 3 4 5 6 7 8 9 10

So AtmicInteger integer get the value without any locking. So atomic operation are relative quick compare to lock.
The limitation of atomic variable is they only provide limited set of operation, we can’t perform complex operations.


Lock Object

Lock objects work very much like the implicit locks used by synchronized code.
Concurrent API various explicit locks fine grained lock control support.
Different lock provided by concurrent API are as follows.
1.      ReentrantLock
2.      ReadWriteLock
3.      StampedLock
Reentrant Lock: - Reentrant lock provide the behavior of synchronized block with extended features and set of methods for better control.
Synchronized block is reentrant in behavior
Let’s discuss Reentrant behavior first, what is reentrant?
Reentrant means if a thread get lock on a synchronized block, thread can enter another synchronized block with same lock.
Let’s understand ReentrantLock with example.

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Test1 {
      private Integer counter = new Integer(0);
      private Thread[] task;
      private int size;
      private Lock lock;

      Test1() {
            size = 5;
            task = new Thread[size];
            lock = new ReentrantLock();
      }

      public static void main(String[] args) throws Exception {
            new Test1().getCounter();
      }

      private void getCounter() {
            for (int count = 0; count < size; count++) {
                  task[count] = new Thread(new Runnable() {
                  public void run() {
                        try {
                              Thread.sleep(1000);
                              lock.lock();
                              System.out.print(" " + ++counter);
                              lock.unlock();
                        } catch (InterruptedException e) {
                              e.printStackTrace();
                        }
                  }
                  });
                  task[count].start();
            }
      }
}
The biggest advantage of Lock objects over implicit locks is their ability to back out of an attempt to acquire a lock. The tryLock() method backs out if the lock is not available immediately or before a timeout expires (if specified).

ReadWriteLock  - read write lock have lockRead()/unlockRead() and lockWrite()/unlockWrite() method. All thread get read access unless there is a thread with write access. A thread ca get write access if there is no thread with read access as well as write access on that resource.
Let’s use ReadWriteLock in above example:-

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Test1 {
      private Integer counter = new Integer(0);
      private Thread[] task;
      private int size;
      private ReentrantReadWriteLock lock;

      Test1() {
            size = 5;
            task = new Thread[size];
            lock = new ReentrantReadWriteLock();
      }

      public static void main(String[] args) throws Exception {
            new Test1().getCounter();
      }

      private void getCounter() {
            for (int count = 0; count < size; count++) {
            task[count] = new Thread(new Runnable() {
                  public void run() {
                  try {
                        Thread.sleep(1000);
                        lock.writeLock().lock();
                        System.out.print(" " + ++counter);
                        lock.writeLock().unlock();
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
                  }
            });
            task[count].start();
            }
      }
}

StampedLock:-
Java 8 introduced stamped lock which support optimistic locking, the benefit of the stamped lock is that it returns a long value, this value can be used to release a lock or to check whether lock is still valid or not.
Let’s use stamped lock in our example:-
package com.test.main;

import java.util.concurrent.locks.StampedLock;

public class Test1 {
private Integer counter = new Integer(0);
private Thread[] task;
private int size;
private StampedLock lock;

Test1() {
      size = 5;
      task = new Thread[size];
      lock = new StampedLock();
}

public static void main(String[] args) throws Exception {
      new Test1().getCounter();
}

private void getCounter() {
      for (int count = 0; count < size; count++) {
            task[count] = new Thread(new Runnable() {
                  public void run() {
                  try {
                  Thread.sleep(1000);
                  long stamped = lock.writeLock();
                  System.out.println("Stamped value is:: "+stamped+" counter value is:: " + ++counter);
                  lock.unlockWrite(stamped);
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
            }
      });
      task[count].start();
      }
 }
}


Semaphore –

Semaphore provide access control to a shared resource through the use of a counter. If counter is greater than zero then access is allowed, if it is zero than access id denied.
Code Snippet – producer/consumer problem
import java.util.concurrent.Semaphore;
/**
 * Initially semProducer gets permit via passing value 1 in constructor
 * semaConsumer will suspended until semProducer releases the permit.
 * @author Manoj
 */
public class SemaphoreImpl {
      private int num;
      private static Semaphore semProducer = new Semaphore(1);
      private static Semaphore semConsumer = new Semaphore(0);
     
      public static void main(String[] args) {
            SemaphoreImpl semaphoreImpl = new SemaphoreImpl();
            new Thread(new NumberConsumer(semaphoreImpl)).start();;
            new Thread(new NumberGenerator(semaphoreImpl)).start();;
           
      }
     
      public void get() {
            try {
                  semConsumer.acquire();
            } catch (InterruptedException e) {
                  e.printStackTrace();
            }
            System.out.println("Consumed value is " +num);
            semProducer.release();
      }
     
      public void put(int count) {
            try {
                  semProducer.acquire();
            } catch (InterruptedException e) {
                  e.printStackTrace();
            }
            num = count;
            System.out.println("Produced value is " +num);
            semConsumer.release();
      }
}

class NumberGenerator implements Runnable{
      private SemaphoreImpl semaphoreImpl;
      NumberGenerator(SemaphoreImpl semaphoreImpl){
            this.semaphoreImpl = semaphoreImpl;
      }
      public void run() {
            for (int index = 0 ; index < 10 ; index++) {
                  semaphoreImpl.put(index);
            }
      }
}

class NumberConsumer implements Runnable{
      private SemaphoreImpl semaphoreImpl;
      NumberConsumer(SemaphoreImpl semaphoreImpl){
            this.semaphoreImpl = semaphoreImpl;
      }
      public void run() {
            for (int index = 0 ; index < 10 ; index++) {
                  semaphoreImpl.get();
            }
      }
}

Here we are printing simple example of semaphore.
import java.util.concurrent.Semaphore;
package com.mks.testone;

import java.util.concurrent.Semaphore;

public class SemRunnbale {

 public static void main(String[] args) {
      Semaphore sem = new Semaphore(1);
      Thread mt1 = new Thread(new SemaTest(sem));
      Thread mt2 = new Thread(new SemaTest(sem));
      mt1.start();
      mt2.start();
 }
}

class SemaTest implements Runnable {
 int count = 0;
 private Semaphore sem;

 SemaTest(Semaphore sem) {
      this.sem = sem;
 }

 @Override
 public void run() {
      System.out.println(Thread.currentThread() + "waiting for permit");
      try {
            sem.acquire();
            System.out.println(Thread.currentThread() + " gets a permit.");
            for (int i = 0; i < 5; i++) {
                  System.out.println(Thread.currentThread() + ": " + ++count);
                  Thread.sleep(10);
            }
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      System.out.println(Thread.currentThread() + " releases the permit.");
      sem.release();
 }
}



CountDownLatch –

Countdownlatch works on latch principle main thread will wait until gate open.
Initially count is created with a count of no of events, each time an events happens, the count is decremented when the count reached to zero, the latch opens.
Code Snippet –
import java.util.concurrent.CountDownLatch;
/**
 * In this class countDownLatch is created with event count 5
 * latch will wait until all 5 events complete than latch will open
 * @author Manoj
 */
public class CountDownLatchImpl {

      public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(5);
            new Thread(new EventCounter(countDownLatch, 5)).start();
            try {
                  countDownLatch.await();
            } catch (InterruptedException e) {
                  System.out.println(e.getMessage());
            }
      }
}

class EventCounter implements Runnable {

      private CountDownLatch countDownLatch;
      private int eventSize;

      EventCounter(CountDownLatch countDownLatch, int eventSize) {
            this.countDownLatch = countDownLatch;
            this.eventSize = eventSize;
      }

      public void run() {
            for (int eventCount = 0; eventCount < eventSize-1; eventCount++) {
                  System.out.println("Count Down " + eventCount);
                  countDownLatch.countDown();
            }
            System.out.println("Count down completed");
      }
}


CyclicBarrier –

Cyclic barrier will wait until specified no of threads has reached the barrier point.
CyclicBarrier can be reused because it releases the waiting threads each time.
Code snippet –

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 * Below java class CyclicBarrier will suspend 3 threads untill
 * all of them reached the barrier point
 * @author Manoj
 */
public class CyclicBarrierTest {

      public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new BarrierPoint());
            System.out.println("CyclicBarrier started");
            new Thread(new BarrierThread(cyclicBarrier, "A")).start();
            new Thread(new BarrierThread(cyclicBarrier, "B")).start();
            new Thread(new BarrierThread(cyclicBarrier, "C")).start();
      }
}

class BarrierThread implements Runnable {
      private CyclicBarrier cyclicBarrier;
      private String name;

      BarrierThread(CyclicBarrier cyclicBarrier, String name) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
      }

      public void run() {
            try {
                  System.out.println(name + " executed");
                  cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                  System.out.println(e.getMessage());
            }
      }
}

class BarrierPoint implements Runnable {
      public void run() {
            System.out.println("BarrierPoint reached");
      }
}


Executor –

The concept of executor to manage pool of threads. So we don’t need to create thread manually, all threads will be reused.
Code snippet-

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * Creating executor using newSingleThreadExecutor
 * @author Manoj
 */
public class ExecutorTest {
      public static void main(String[] args) {
            ExecutorService executor = Executors.newSingleThreadExecutor();
            executor.submit(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("Hello " + threadName);
            });
      }
}



Callable and Future –

Callable interface represents a thread that return a value. This is a powerful mechanism because with the help of callable we can perform many type of numerical computation.
Future is a generic interface that represent the value return by callable.

Code Snippet-
/**
 * In below class we are creating newFixedThreadPool with pool size 1
 * So only one thread will be created and submitted the two callble
 * instance and get there value in future instance.
 * @author Manoj
 *
 */
public class CallableTest {

  public static void main(String[] args) {
      ExecutorService executorService = Executors.newFixedThreadPool(1);
      int number = 10;
      Future<Integer> sumation;
      Future<Long> factorial;
      sumation = executorService.submit(new Summation(10));
      factorial = executorService.submit(new Factorial(10));
      try {
        System.out.println("Summation of "+number+" number is "+sumation.get());
       System.out.println("Factorial of "+number+" is "+factorial.get());
      } catch (InterruptedException | ExecutionException e) {
            System.out.println(e.getMessage());
      }
   }
}

class Summation implements Callable<Integer> {
      private int sum;
      private int numLimit;
     
      Summation(int numLimit) {
            this.numLimit = numLimit;
      }
      public Integer call() throws Exception {
            for (int index = 0 ; index < numLimit ; index++) {
                  sum = sum + index;
            }
            return sum;
      }
}

class Factorial implements Callable<Long> {
      private long factorial = 1;
      private int number;
     
      Factorial(int numLimit) {
            this.number = numLimit;
      }
      public Long call() throws Exception {
            for (int index = 1 ; index <= number ; index++) {
                  factorial = factorial * index;
            }
            return factorial;
      }
}


CompletableFuture –

CompletableFuture introduced in JAVA8. It implements Future and CompletionStage Interface where CompletionStage Interface provide vast selection of method that let you attach callback which executed on completion.

Code Snippet-
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
 * Simple example of completable future
 * @author Manoj
 */
public class CompletableFutureTest {

   public static void main(String[] args) {
      CompletableFuture<String> future
        = CompletableFuture.supplyAsync(() -> "Hello ! friends");
      try {
            System.out.println(future.get());
      } catch (InterruptedException | ExecutionException e) {
            System.out.println(e.getMessage());
      }
   }
}



2 comments:

  1. Nice information, very usefull thanks

    ReplyDelete
  2. write more senario based example with complex logic

    ReplyDelete