The purpose of concurrent API to provide the programmer high level
capability in multithreading
java.util.concurrent.* package provide various interfaces and
classes to achieve the goal of high performing application
Java
Concurrent Collection and Classes
Atomic
Variable
Atomic variables work on compare and swap algorithm (low level CPU
operation, no synchronization needed). It read the value from a memory and
compare with the expected value if values matches then it replace the value
with new one, So when we call getAndIncrement() method it first get the current
value than call compareAndSet() method
Code Snippet –
int current;
do {
current = get();
} while(!compareAndSet(current,
current + 1));
Visit oracle docs for
detail:-
Java provides
AtomicInteger, AtomicLong, AtomicBoolean and AtomicReference
Let’s see below java code
10 threads are trying to accessing the AtomicInteger and calling getAndIncrement()
methods, which return the incremented value.
package
com.test.main;
import
java.util.concurrent.atomic.AtomicInteger;
public class Test1 {
private AtomicInteger
counter = new
AtomicInteger();
private Thread[] task;
private int size;
Test1() {
size = 10;
task = new Thread[size];
}
public static void
main(String[] args) throws
Exception {
new Test1().getCounter();
}
private void
getCounter() {
for (int count = 0; count < size; count++) {
task[count] = new Thread(new
Runnable() {
public void run() {
System.out.print("
"+counter.getAndIncrement());
}
});
task[count].start();
}
}
}
O\P
1 3 4 2 5 0 6 7 8 9
Here you can see the each
thread get the updated value of atomic integer without any synchronization
block, let’s see example of synchronization where we use and Integer and
increment the value but we put a lock before on counter.
public class Test1 {
private static Integer
counter = new
Integer(0);
private Thread[] task;
private int size;
Test1() {
size = 10;
task = new Thread[size];
}
public static void
main(String[] args) throws
Exception {
new Test1().getCounter();
}
private void
getCounter() {
for (int count = 0; count < size; count++) {
task[count] = new Thread(new
Runnable() {
public void run() {
try {
synchronized (counter) {
Thread.sleep(1000);
++counter;
System.out.print("
"+counter);
}
} catch (InterruptedException
e) {
e.printStackTrace();
}
}
});
task[count].start();
}
}
}
O\P
1 2 3 4 5 6 7 8 9 10
So AtmicInteger integer
get the value without any locking. So atomic operation are relative quick
compare to lock.
The limitation of atomic variable is they only provide
limited set of operation, we can’t perform complex operations.
Lock
Object
Lock objects work very much like the implicit locks used by
synchronized code.
Concurrent API various explicit locks fine grained lock control
support.
Different lock provided by concurrent API are as follows.
1.
ReentrantLock
2.
ReadWriteLock
3.
StampedLock
Reentrant Lock:
- Reentrant lock provide the behavior of synchronized block with
extended features and set of methods for better control.
Synchronized block is reentrant in behavior
Let’s discuss
Reentrant behavior first, what is reentrant?
Reentrant means if a thread get lock on a synchronized block,
thread can enter another synchronized block with same lock.
Let’s understand ReentrantLock with example.
import
java.util.concurrent.locks.Lock;
import
java.util.concurrent.locks.ReentrantLock;
public class Test1 {
private Integer counter = new
Integer(0);
private Thread[] task;
private int size;
private Lock lock;
Test1() {
size = 5;
task = new Thread[size];
lock = new
ReentrantLock();
}
public static void
main(String[] args) throws
Exception {
new Test1().getCounter();
}
private void
getCounter() {
for (int count = 0; count < size; count++) {
task[count] = new Thread(new
Runnable() {
public void run() {
try {
Thread.sleep(1000);
lock.lock();
System.out.print("
" + ++counter);
lock.unlock();
} catch
(InterruptedException e) {
e.printStackTrace();
}
}
});
task[count].start();
}
}
}
The biggest advantage of Lock objects over implicit locks is their ability to back out of
an attempt to acquire a lock. The tryLock() method backs out if the lock is not available immediately
or before a timeout expires (if specified).
ReadWriteLock - read write lock have
lockRead()/unlockRead() and lockWrite()/unlockWrite() method. All thread get
read access unless there is a thread with write access. A thread ca get write
access if there is no thread with read access as well as write access on that
resource.
Let’s use ReadWriteLock in
above example:-
import
java.util.concurrent.locks.ReentrantReadWriteLock;
public class Test1 {
private Integer counter = new
Integer(0);
private Thread[] task;
private int size;
private ReentrantReadWriteLock lock;
Test1() {
size = 5;
task = new Thread[size];
lock = new
ReentrantReadWriteLock();
}
public static void
main(String[] args) throws
Exception {
new Test1().getCounter();
}
private void
getCounter() {
for (int count = 0; count < size; count++) {
task[count] = new Thread(new
Runnable() {
public void run() {
try {
Thread.sleep(1000);
lock.writeLock().lock();
System.out.print("
" + ++counter);
lock.writeLock().unlock();
} catch (InterruptedException
e) {
e.printStackTrace();
}
}
});
task[count].start();
}
}
}
StampedLock:-
Java 8 introduced stamped
lock which support optimistic locking, the benefit of the stamped lock is that
it returns a long value, this value can be used to release a lock or to check
whether lock is still valid or not.
Let’s use stamped lock in our
example:-
package
com.test.main;
import
java.util.concurrent.locks.StampedLock;
public class Test1 {
private Integer
counter = new
Integer(0);
private
Thread[] task;
private int size;
private
StampedLock lock;
Test1() {
size = 5;
task = new Thread[size];
lock = new
StampedLock();
}
public static void
main(String[] args) throws
Exception {
new Test1().getCounter();
}
private void
getCounter() {
for (int count = 0; count < size; count++) {
task[count] = new Thread(new
Runnable() {
public void run() {
try {
Thread.sleep(1000);
long stamped = lock.writeLock();
System.out.println("Stamped
value is:: "+stamped+" counter value is::
" + ++counter);
lock.unlockWrite(stamped);
} catch
(InterruptedException e) {
e.printStackTrace();
}
}
});
task[count].start();
}
}
}
Semaphore
–
Semaphore provide access control to a shared resource through the
use of a counter. If counter is greater than zero then access is allowed, if it
is zero than access id denied.
Code Snippet – producer/consumer problem
import
java.util.concurrent.Semaphore;
/**
* Initially semProducer gets permit via
passing value 1 in constructor
* semaConsumer will suspended until
semProducer releases the permit.
* @author Manoj
*/
public class
SemaphoreImpl {
private int num;
private static
Semaphore semProducer = new
Semaphore(1);
private static
Semaphore semConsumer = new
Semaphore(0);
public static void
main(String[] args) {
SemaphoreImpl semaphoreImpl = new
SemaphoreImpl();
new Thread(new
NumberConsumer(semaphoreImpl)).start();;
new Thread(new
NumberGenerator(semaphoreImpl)).start();;
}
public void get() {
try {
semConsumer.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consumed
value is " +num);
semProducer.release();
}
public void put(int count) {
try {
semProducer.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
num = count;
System.out.println("Produced
value is " +num);
semConsumer.release();
}
}
class
NumberGenerator implements Runnable{
private SemaphoreImpl semaphoreImpl;
NumberGenerator(SemaphoreImpl semaphoreImpl){
this.semaphoreImpl = semaphoreImpl;
}
public void run() {
for (int index = 0 ; index < 10
; index++) {
semaphoreImpl.put(index);
}
}
}
class
NumberConsumer implements Runnable{
private SemaphoreImpl semaphoreImpl;
NumberConsumer(SemaphoreImpl semaphoreImpl){
this.semaphoreImpl = semaphoreImpl;
}
public void run() {
for (int index = 0 ; index < 10
; index++) {
semaphoreImpl.get();
}
}
}
Here we are printing
simple example of semaphore.
import java.util.concurrent.Semaphore;
package com.mks.testone;
import java.util.concurrent.Semaphore;
public class SemRunnbale {
public
static void main(String[] args) {
Semaphore
sem = new Semaphore(1);
Thread
mt1 = new Thread(new SemaTest(sem));
Thread
mt2 = new Thread(new SemaTest(sem));
mt1.start();
mt2.start();
}
}
class SemaTest implements Runnable {
int
count = 0;
private
Semaphore sem;
SemaTest(Semaphore
sem) {
this.sem = sem;
}
@Override
public
void run() {
System.out.println(Thread.currentThread()
+ "waiting for permit");
try
{
sem.acquire();
System.out.println(Thread.currentThread()
+ " gets a permit.");
for
(int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread()
+ ": " + ++count);
Thread.sleep(10);
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread()
+ " releases the permit.");
sem.release();
}
}
CountDownLatch
–
Countdownlatch works on latch principle main thread will wait
until gate open.
Initially count is created with a count of no of events, each time
an events happens, the count is decremented when the count reached to zero, the
latch opens.
Code Snippet –
import
java.util.concurrent.CountDownLatch;
/**
* In this class countDownLatch is created with
event count 5
* latch will wait until all 5 events complete
than latch will open
* @author Manoj
*/
public class
CountDownLatchImpl {
public static void
main(String[] args) {
CountDownLatch countDownLatch = new
CountDownLatch(5);
new Thread(new
EventCounter(countDownLatch, 5)).start();
try {
countDownLatch.await();
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
class EventCounter
implements Runnable {
private CountDownLatch countDownLatch;
private int eventSize;
EventCounter(CountDownLatch countDownLatch, int eventSize) {
this.countDownLatch = countDownLatch;
this.eventSize = eventSize;
}
public void run() {
for (int eventCount = 0; eventCount < eventSize-1; eventCount++) {
System.out.println("Count
Down " + eventCount);
countDownLatch.countDown();
}
System.out.println("Count
down completed");
}
}
CyclicBarrier
–
Cyclic barrier will wait
until specified no of threads has reached the barrier point.
CyclicBarrier can be reused
because it releases the waiting threads each time.
Code snippet –
import
java.util.concurrent.BrokenBarrierException;
import
java.util.concurrent.CyclicBarrier;
/**
* Below java class CyclicBarrier will suspend
3 threads untill
* all of them reached the barrier point
* @author Manoj
*/
public class
CyclicBarrierTest {
public static void
main(String[] args) {
CyclicBarrier cyclicBarrier = new
CyclicBarrier(3, new BarrierPoint());
System.out.println("CyclicBarrier
started");
new Thread(new
BarrierThread(cyclicBarrier, "A")).start();
new Thread(new
BarrierThread(cyclicBarrier, "B")).start();
new Thread(new
BarrierThread(cyclicBarrier, "C")).start();
}
}
class
BarrierThread implements Runnable {
private CyclicBarrier cyclicBarrier;
private String name;
BarrierThread(CyclicBarrier cyclicBarrier, String
name) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}
public void run() {
try {
System.out.println(name + "
executed");
cyclicBarrier.await();
} catch (InterruptedException |
BrokenBarrierException e) {
System.out.println(e.getMessage());
}
}
}
class
BarrierPoint implements Runnable {
public void run() {
System.out.println("BarrierPoint
reached");
}
}
Executor
–
The concept of executor to manage pool of threads. So we don’t
need to create thread manually, all threads will be reused.
Code snippet-
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
/**
* Creating executor using
newSingleThreadExecutor
* @author Manoj
*/
public class
ExecutorTest {
public static void
main(String[] args) {
ExecutorService executor =
Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName =
Thread.currentThread().getName();
System.out.println("Hello
" + threadName);
});
}
}
Callable
and Future –
Callable interface represents a thread that return a value. This
is a powerful mechanism because with the help of callable we can perform many
type of numerical computation.
Future is a generic interface that represent the value return by
callable.
Code
Snippet-
/**
* In below class we are creating
newFixedThreadPool with pool size 1
* So only one thread will be created and
submitted the two callble
* instance and get there value in future
instance.
* @author Manoj
*
*/
public class
CallableTest {
public static void
main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
int number = 10;
Future<Integer> sumation;
Future<Long> factorial;
sumation = executorService.submit(new
Summation(10));
factorial = executorService.submit(new
Factorial(10));
try {
System.out.println("Summation
of "+number+" number is "+sumation.get());
System.out.println("Factorial
of "+number+" is "+factorial.get());
} catch (InterruptedException |
ExecutionException e) {
System.out.println(e.getMessage());
}
}
}
class
Summation implements Callable<Integer> {
private int sum;
private int numLimit;
Summation(int numLimit) {
this.numLimit = numLimit;
}
public Integer call() throws
Exception {
for (int index = 0 ; index < numLimit ; index++) {
sum = sum + index;
}
return sum;
}
}
class
Factorial implements Callable<Long> {
private long factorial = 1;
private int number;
Factorial(int numLimit) {
this.number = numLimit;
}
public Long call() throws
Exception {
for (int index = 1 ; index <= number ; index++) {
factorial = factorial * index;
}
return factorial;
}
}
CompletableFuture
–
CompletableFuture
introduced in JAVA8. It implements Future and CompletionStage Interface where
CompletionStage Interface provide vast selection of method that let you attach
callback which executed on completion.
Code Snippet-
import
java.util.concurrent.CompletableFuture;
import
java.util.concurrent.ExecutionException;
/**
* Simple example of completable future
* @author Manoj
*/
public class
CompletableFutureTest {
public static void
main(String[] args) {
CompletableFuture<String> future
= CompletableFuture.supplyAsync(()
-> "Hello ! friends");
try {
System.out.println(future.get());
} catch (InterruptedException |
ExecutionException e) {
System.out.println(e.getMessage());
}
}
}
Nice information, very usefull thanks
ReplyDeletewrite more senario based example with complex logic
ReplyDelete