Monday, 14 August 2017

Producer and Consumer Problem

Hello Friends, let’s discuss about producer and consumer problem.
Producer and Consumer problem is basically best use case of synchronization.

High level analysis of producer and consumer problem is that we have to create producer and consumer on a shared resource that can be anything like synchronized object, concurrent object like blocking queue, semaphore etc.


Different solution of Producer and Consumer problem.

Using BlockingQueue, create producer and consumer classes and share blocking queue between both

package com.test.main;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import com.test.model.Consumer;
import com.test.model.Producer;
/**
 * @author Manoj
 */
public class ProducerConsumerMain {

public static void main(String[] args) {
       BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
       Thread prod = new Thread(new Producer(sharedQueue));
       Thread cons = new Thread(new Consumer(sharedQueue));
             
       prod.start();
       cons.start();
}
}

package com.test.model;

import java.util.concurrent.BlockingQueue;
/**
 * @author Manoj
 */
public class Producer implements Runnable {
private BlockingQueue<Integer> sharedQueue;

public Producer(BlockingQueue<Integer> sharedQueue) {
       this.sharedQueue = sharedQueue;
}

@Override
public void run() {
       for (int i = 0; i < 10; i++) {
       try {
       System.out.println(Thread.currentThread().getName() + " produced value" + i);
       sharedQueue.put(i);
       Thread.sleep(200);
       } catch (InterruptedException e) {
       e.printStackTrace();
       }
}

}
}
package com.test.model;

import java.util.concurrent.BlockingQueue;
/**
 * @author Manoj Kumar
 */
public class Consumer implements Runnable {
private BlockingQueue<Integer> sharedQueue;
public Consumer(BlockingQueue<Integer> sharedQueue) {
       this.sharedQueue = sharedQueue;
}
public void run() {
       try {
       while (true) {
       Integer value = sharedQueue.take();
       System.out.println(Thread.currentThread().getName() + " consumed value " + value);
       }
       } catch (InterruptedException e) {
       e.printStackTrace();
       }
}
}


Producer Consumer using Semaphore

Seamphore woks on the logic of permit (maintain a lock counter), if a thread acquire the lock than it will decrement the permit counter if counter permit it 0 than thread will wait until locks are releases, whenever a thread releases the lock it increment the permit counter by one.
See below flow of semaphore:-


Code snippet:-

import java.util.concurrent.Semaphore;

public class Test {

       public static void main(String[] args) {
              SemProdCon semProdCon = new SemProdCon();
              new Thread(new SemConsumer(semProdCon)).start();
              new Thread(new SemProducer(semProdCon)).start();
       }
}

class SemProdCon {
       private Integer res;
       private Semaphore semProd = new Semaphore(1);
       private Semaphore semCons = new Semaphore(0);

       public void get() {
              try {
                     semCons.acquire();
              } catch (InterruptedException e) {
                     e.printStackTrace();
              }
              System.out.println("Consumed value :: " + res);
              semProd.release();
       }

       public void put(Integer resource) {
              try {
                     semProd.acquire();
              } catch (InterruptedException e) {
                     e.printStackTrace();
              }
              System.out.println("Produced value :: " + resource);
              res = resource;
              semCons.release();
       }
}

class SemProducer implements Runnable {

       private SemProdCon semProdCon;

       public SemProducer(SemProdCon semProdCon) {
              this.semProdCon = semProdCon;
       }

       public void run() {
              for (int count = 0; count < 5; count++) {
                     semProdCon.put(count);
              }
       }
}

class SemConsumer implements Runnable {
       private SemProdCon semProdCon;

       public SemConsumer(SemProdCon semProdCon) {
              this.semProdCon = semProdCon;
       }

       public void run() {
              for (int count = 0; count < 5; count++) {
                     semProdCon.get();
              }
       }
}




Producer Consumer using wait and notify: -

package com.mks.prodcon;

import java.util.LinkedList;
import java.util.Queue;

public class WaitNotifyProdConMain {

       public static void main(String args[]) {
              Queue<Integer> sharedQueue = new LinkedList<>();
              int maxSize = 10;
              Thread producer = new Producer(sharedQueue, maxSize);
              Thread consumer = new Consumer(sharedQueue);
              producer.start();
              consumer.start();

       }
}

class Producer extends Thread {
       private Queue<Integer> queue;
       private int maxSize;

       public Producer(Queue<Integer> queue, int maxSize) {
              this.queue = queue;
              this.maxSize = maxSize;
       }

       @Override
       public void run() {
              int i = 0;
              while (true) {
                     synchronized (queue) {
                           while (queue.size() == maxSize) {
                                  try {
                                         System.out.println("Queue is full");
                                         queue.wait();
                                  } catch (Exception ex) {
                                         ex.printStackTrace();
                                  }
                           }
                           System.out.println("Producing value : " + i);
                           queue.add(++i);
                           try {
                                  Thread.sleep(1000);
                           } catch (InterruptedException e) {
                                  e.printStackTrace();
                           }
                           queue.notifyAll();
                     }
              }
       }
}

class Consumer extends Thread {
       private Queue<Integer> queue;

       public Consumer(Queue<Integer> queue) {
              this.queue = queue;
       }

       public void run() {
              while (true) {
                     synchronized (queue) {
                           while (queue.isEmpty()) {
                                  System.out.println("Queue is empty");
                                  try {
                                         queue.wait();
                                  } catch (Exception ex) {
                                         ex.printStackTrace();
                                  }
                           }
                           System.out.println("Consuming value : " + queue.poll());
                           try {
                                  Thread.sleep(1000);
                           } catch (InterruptedException e) {
                                  e.printStackTrace();
                           }
                           queue.notifyAll();
                     }
              }
       }
}

1 comment: