使用Sychronized配合notify和wait实现
package solace.concurrency;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
// notify wait 实现生产者消费者
public class Test8<T> {
private final Integer MAX = 20;
private final LinkedList<T> bowl = new LinkedList<T>();
// 生产者
public synchronized void put(T t) {
try {
while (bowl.size() == MAX) {
this.wait();
}
bowl.add(t);
TimeUnit.SECONDS.sleep(1);
this.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费者
public synchronized T get() {
T t = null;
try {
while (bowl.size() == 0) {
this.wait();
}
t = bowl.removeFirst();
TimeUnit.SECONDS.sleep(1);
this.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
return t;
}
public static void main(String[] args) {
Test8 t = new Test8();
// 先启动消费者
for (int i = 0; i < 2; i++) {
new Thread(() -> {
while(true) {
t.get();
System.out.println(Thread.currentThread().getName() + "=====" + t.bowl.size());
}
}, "C" + i).start();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 再启动生产者
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while(true) {
t.put("aaa");
System.out.println(Thread.currentThread().getName() + "=====" + t.bowl.size());
}
}, "P" + i).start();
}
}
}
使用ReentrantLock配合Condition实现
package solace.concurrency;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
// ReentrankLock 和Condition实现生产者消费者
// https://github.com/bjmashibing/edu/blob/master/concurrent/src/yxxy/c_021/MyContainer2.java
public class Test7<T> {
private final Integer MAX = 20;
private final LinkedList<T> bowl = new LinkedList<T>();
private Lock lock = new ReentrantLock();
Condition consumer = lock.newCondition();
Condition producer = lock.newCondition();
// 生产者
public void put(T t) {
try {
lock.lock();
while(bowl.size() == MAX) {
producer.await();
}
bowl.add(t);
TimeUnit.SECONDS.sleep(1);
// 叫醒消费者
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// 消费者
public T get() {
T t = null;
try {
lock.lock();
while(bowl.size() == 0) {
consumer.await();
}
t = bowl.removeFirst();
TimeUnit.SECONDS.sleep(1);
// 叫醒生产者
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) {
Test7 t = new Test7();
// 先启动消费者
for (int i = 0; i < 2; i++) {
new Thread(() -> {
while(true) {
t.get();
System.out.println(Thread.currentThread().getName() + "=====" + t.bowl.size());
}
}, "C" + i).start();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 再启动生产者
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while(true) {
t.put("aaa");
System.out.println(Thread.currentThread().getName() + "=====" + t.bowl.size());
}
}, "P" + i).start();
}
}
}