Hello
Friends, let’s discuss about producer and consumer problem.
Producer and
Consumer problem is basically best use case of synchronization.
High level
analysis of producer and consumer problem is that we have to create producer
and consumer on a shared resource that can be anything like synchronized
object, concurrent object like blocking queue, semaphore etc.
Different
solution of Producer and Consumer problem.
Using BlockingQueue,
create producer and consumer classes and share blocking queue between both
package com.test.main;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.test.model.Consumer;
import com.test.model.Producer;
/**
* @author Manoj
*/
public class ProducerConsumerMain {
public static void main(String[] args) {
BlockingQueue<Integer>
sharedQueue = new
LinkedBlockingQueue<Integer>();
Thread prod = new Thread(new Producer(sharedQueue));
Thread cons = new Thread(new Consumer(sharedQueue));
prod.start();
cons.start();
}
}
package com.test.model;
import java.util.concurrent.BlockingQueue;
/**
* @author Manoj
*/
public class Producer implements Runnable {
private BlockingQueue<Integer> sharedQueue;
public Producer(BlockingQueue<Integer> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println(Thread.currentThread().getName()
+ "
produced value" + i);
sharedQueue.put(i);
Thread.sleep(200);
} catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.test.model;
import java.util.concurrent.BlockingQueue;
/**
* @author Manoj Kumar
*/
public class Consumer implements Runnable {
private BlockingQueue<Integer> sharedQueue;
public Consumer(BlockingQueue<Integer> sharedQueue) {
this.sharedQueue = sharedQueue;
}
public void run() {
try {
while (true) {
Integer value = sharedQueue.take();
System.out.println(Thread.currentThread().getName()
+ "
consumed value " + value);
}
} catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
Producer Consumer
using Semaphore
Seamphore
woks on the logic of permit (maintain a lock counter), if a thread acquire the
lock than it will decrement the permit counter if counter permit it 0 than
thread will wait until locks are releases, whenever a thread releases the lock
it increment the permit counter by one.
See below flow of semaphore:-
Code snippet:-
import
java.util.concurrent.Semaphore;
public class Test {
public static void main(String[] args) {
SemProdCon
semProdCon = new SemProdCon();
new Thread(new SemConsumer(semProdCon)).start();
new Thread(new SemProducer(semProdCon)).start();
}
}
class SemProdCon {
private Integer res;
private Semaphore semProd = new Semaphore(1);
private Semaphore semCons = new Semaphore(0);
public void get() {
try {
semCons.acquire();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consumed
value :: " + res);
semProd.release();
}
public void put(Integer resource) {
try {
semProd.acquire();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println("Produced
value :: " + resource);
res = resource;
semCons.release();
}
}
class SemProducer implements Runnable {
private SemProdCon semProdCon;
public
SemProducer(SemProdCon semProdCon) {
this.semProdCon = semProdCon;
}
public void run() {
for (int count = 0; count < 5; count++) {
semProdCon.put(count);
}
}
}
class SemConsumer implements Runnable {
private SemProdCon semProdCon;
public
SemConsumer(SemProdCon semProdCon) {
this.semProdCon = semProdCon;
}
public void run() {
for (int count = 0; count < 5; count++) {
semProdCon.get();
}
}
}
Producer Consumer using wait and notify:
-
package com.mks.prodcon;
import
java.util.LinkedList;
import java.util.Queue;
public class
WaitNotifyProdConMain {
public static void main(String args[]) {
Queue<Integer>
sharedQueue = new
LinkedList<>();
int maxSize = 10;
Thread
producer = new Producer(sharedQueue, maxSize);
Thread
consumer = new Consumer(sharedQueue);
producer.start();
consumer.start();
}
}
class Producer extends Thread {
private Queue<Integer>
queue;
private int maxSize;
public
Producer(Queue<Integer> queue, int maxSize) {
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
int i = 0;
while (true) {
synchronized (queue) {
while (queue.size() == maxSize) {
try {
System.out.println("Queue is
full");
queue.wait();
}
catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Producing
value : " + i);
queue.add(++i);
try {
Thread.sleep(1000);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
queue.notifyAll();
}
}
}
}
class Consumer extends Thread {
private Queue<Integer>
queue;
public
Consumer(Queue<Integer> queue) {
this.queue = queue;
}
public void run() {
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
System.out.println("Queue is
empty");
try {
queue.wait();
}
catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Consuming
value : " + queue.poll());
try {
Thread.sleep(1000);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
queue.notifyAll();
}
}
}
}
very usefull in full details thanks
ReplyDelete