Producer Consumer Problem
This is one of the popular problem asked during the interviews to code on paper. And believe me, it is not possible for me to write this on paper at the same time, though I code enough. But generally we see the code using the BlockingQueue in Java APIs. And how about if the interviewer asks you to do the same without using the BlockingQueue or without using wait() & notifyAll() methods. So I have written the code for the same to print the numbers in reverse order, you can change that data & this approach needs to be tested enough before using in project, but its just a thought which you can consider -
Another follow-up question you can get to design your own BlockingQueue. Below I have written one sample, it is not properly tested & took me enough time to get it working for my test class. So it is easier to think about the approach in threading problems but it becomes tricky to implement those properly.
Below is the test class I wrote to test the above queue.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class BlockingQueueTest {
public static void main(String[] args) {
List<Integer> data = new ArrayList<>();
for(int i = 0; i < 50; i++) {
data.add(i);
}
data.add(-1);
int size = 2;
BlockingQueue<Integer> bq = new BlockingQueue<>(size);
Producer<Integer> producer = new Producer<>(bq, data);
Consumer<Integer> consumer = new Consumer<>(bq);
Thread prod = new Thread(producer);
Thread cons1 = new Thread(consumer, "Consumer1");
Thread cons2 = new Thread(consumer, "Consumer2");
prod.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
cons1.start();
cons2.start();
try {
cons1.join();
cons2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main completed..........");
}
}
class Producer<T> implements Runnable {
private BlockingQueue<T> bq;
private List<T> data;
private int count = 0;
public Producer(BlockingQueue<T> bq, List<T> data) {
this.bq = bq;
this.data = data;
}
@Override
public void run() {
for(T d : data) {
bq.push(d);
count++;
if(count < 20)
try {
TimeUnit.SECONDS.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Producer finished...");
}
}
class Consumer<T> implements Runnable {
private BlockingQueue<T> bq;
private boolean isEnd = false;
public Consumer(BlockingQueue<T> bq) {
this.bq = bq;
}
@Override
public void run() {
while(true) {
T data = bq.pop();
System.out.println(data);
if(Integer.parseInt(data.toString()) > 30)
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
if(data.equals(-1))
isEnd = true;
if(isEnd)
break;
}
System.out.println("Consumer finished...." + Thread.currentThread().getName());
}
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class BlockingQueueTest {
public static void main(String[] args) {
List<Integer> data = new ArrayList<>();
for(int i = 0; i < 50; i++) {
data.add(i);
}
data.add(-1);
int size = 2;
BlockingQueue<Integer> bq = new BlockingQueue<>(size);
Producer<Integer> producer = new Producer<>(bq, data);
Consumer<Integer> consumer = new Consumer<>(bq);
Thread prod = new Thread(producer);
Thread cons1 = new Thread(consumer, "Consumer1");
Thread cons2 = new Thread(consumer, "Consumer2");
prod.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
cons1.start();
cons2.start();
try {
cons1.join();
cons2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main completed..........");
}
}
class Producer<T> implements Runnable {
private BlockingQueue<T> bq;
private List<T> data;
private int count = 0;
public Producer(BlockingQueue<T> bq, List<T> data) {
this.bq = bq;
this.data = data;
}
@Override
public void run() {
for(T d : data) {
bq.push(d);
count++;
if(count < 20)
try {
TimeUnit.SECONDS.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Producer finished...");
}
}
class Consumer<T> implements Runnable {
private BlockingQueue<T> bq;
private boolean isEnd = false;
public Consumer(BlockingQueue<T> bq) {
this.bq = bq;
}
@Override
public void run() {
while(true) {
T data = bq.pop();
System.out.println(data);
if(Integer.parseInt(data.toString()) > 30)
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
if(data.equals(-1))
isEnd = true;
if(isEnd)
break;
}
System.out.println("Consumer finished...." + Thread.currentThread().getName());
}
}
Above we saw one consumer only. Now what about if we have 2 consumers consuming data from the same queue, & it is 2 different consumer classes which want to take different action on the data received from the queue. How will you do this & how will you communicate the end of the data in queue.
I changed the above code to achieve this, like shown below -
public class BlockingQueueTest {
public static void main(String[] args) {
List<Integer> data = new ArrayList<>();
for(int i = 0; i < 50; i++) {
data.add(i);
}
data.add(-1);
int size = 2;
BlockingQueue<Integer> bq = new BlockingQueue<>(size);
IsEnd isEnd = new IsEnd();
isEnd.isEnd = false;
Producer<Integer> producer = new Producer<>(bq, data);
Consumer1<Integer> consumer1 = new Consumer1<>(bq, isEnd);
Consumer2<Integer> consumer2 = new Consumer2<>(bq, isEnd);
Thread prod = new Thread(producer);
Thread cons1 = new Thread(consumer1, "Consumer1");
Thread cons2 = new Thread(consumer2, "Consumer2");
prod.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
cons1.start();
cons2.start();
try {
cons1.join();
cons2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main completed..........");
}
}
class Producer<T> implements Runnable {
private BlockingQueue<T> bq;
private List<T> data;
private int count = 0;
public Producer(BlockingQueue<T> bq, List<T> data) {
this.bq = bq;
this.data = data;
}
@Override
public void run() {
for(T d : data) {
bq.push(d);
count++;
if(count < 20)
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Producer finished...");
}
}
class Consumer1<T> implements Runnable {
private BlockingQueue<T> bq;
private IsEnd isEnd;
public Consumer1(BlockingQueue<T> bq, IsEnd isEnd) {
this.bq = bq;
this.isEnd = isEnd;
}
@Override
public void run() {
while(true) {
T data = bq.pop();
System.out.println("Consumer1 : " + data);
if(Integer.parseInt(data.toString()) > 30)
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if(data.equals(-1))
isEnd.isEnd = true;
if(isEnd.isEnd)
break;
}
System.out.println("Consumer1 finished...." + Thread.currentThread().getName());
}
}
class Consumer2<T> implements Runnable {
private BlockingQueue<T> bq;
private IsEnd isEnd;
public Consumer2(BlockingQueue<T> bq, IsEnd isEnd) {
this.bq = bq;
this.isEnd = isEnd;
}
@Override
public void run() {
while(true) {
T data = bq.pop();
System.out.println("Consumer2 : " + data);
if(Integer.parseInt(data.toString()) > 30)
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if(data.equals(-1))
isEnd.isEnd = true;
if(isEnd.isEnd)
break;
}
System.out.println("Consumer2 finished...." + Thread.currentThread().getName());
}
}
class IsEnd {
public boolean isEnd;
}
I changed the above code to achieve this, like shown below -
public class BlockingQueueTest {
public static void main(String[] args) {
List<Integer> data = new ArrayList<>();
for(int i = 0; i < 50; i++) {
data.add(i);
}
data.add(-1);
int size = 2;
BlockingQueue<Integer> bq = new BlockingQueue<>(size);
IsEnd isEnd = new IsEnd();
isEnd.isEnd = false;
Producer<Integer> producer = new Producer<>(bq, data);
Consumer1<Integer> consumer1 = new Consumer1<>(bq, isEnd);
Consumer2<Integer> consumer2 = new Consumer2<>(bq, isEnd);
Thread prod = new Thread(producer);
Thread cons1 = new Thread(consumer1, "Consumer1");
Thread cons2 = new Thread(consumer2, "Consumer2");
prod.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
cons1.start();
cons2.start();
try {
cons1.join();
cons2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main completed..........");
}
}
class Producer<T> implements Runnable {
private BlockingQueue<T> bq;
private List<T> data;
private int count = 0;
public Producer(BlockingQueue<T> bq, List<T> data) {
this.bq = bq;
this.data = data;
}
@Override
public void run() {
for(T d : data) {
bq.push(d);
count++;
if(count < 20)
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Producer finished...");
}
}
class Consumer1<T> implements Runnable {
private BlockingQueue<T> bq;
private IsEnd isEnd;
public Consumer1(BlockingQueue<T> bq, IsEnd isEnd) {
this.bq = bq;
this.isEnd = isEnd;
}
@Override
public void run() {
while(true) {
T data = bq.pop();
System.out.println("Consumer1 : " + data);
if(Integer.parseInt(data.toString()) > 30)
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if(data.equals(-1))
isEnd.isEnd = true;
if(isEnd.isEnd)
break;
}
System.out.println("Consumer1 finished...." + Thread.currentThread().getName());
}
}
class Consumer2<T> implements Runnable {
private BlockingQueue<T> bq;
private IsEnd isEnd;
public Consumer2(BlockingQueue<T> bq, IsEnd isEnd) {
this.bq = bq;
this.isEnd = isEnd;
}
@Override
public void run() {
while(true) {
T data = bq.pop();
System.out.println("Consumer2 : " + data);
if(Integer.parseInt(data.toString()) > 30)
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if(data.equals(-1))
isEnd.isEnd = true;
if(isEnd.isEnd)
break;
}
System.out.println("Consumer2 finished...." + Thread.currentThread().getName());
}
}
class IsEnd {
public boolean isEnd;
}