生产者消费者

Posted by Solace Blog on April 27, 2020

使用Sychronized配合notify和wait实现

package solace.concurrency;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

// notify wait 实现生产者消费者
public class Test8<T> {

    private final Integer MAX = 20;
    private final LinkedList<T> bowl = new LinkedList<T>();


    // 生产者
    public synchronized void put(T t) {
        try {
            while (bowl.size() == MAX) {
                this.wait();
            }

            bowl.add(t);
            TimeUnit.SECONDS.sleep(1);
            this.notifyAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 消费者
    public synchronized T get() {
        T t = null;
        try {
            while (bowl.size() == 0) {
                this.wait();
            }

            t = bowl.removeFirst();
            TimeUnit.SECONDS.sleep(1);
            this.notifyAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return t;
    }

    public static void main(String[] args) {
        Test8 t = new Test8();

        // 先启动消费者
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                while(true) {
                    t.get();
                    System.out.println(Thread.currentThread().getName() + "=====" + t.bowl.size());
                }
            }, "C" + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 再启动生产者
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                while(true) {
                    t.put("aaa");
                    System.out.println(Thread.currentThread().getName()  + "=====" + t.bowl.size());
                }
            }, "P" + i).start();
        }
    }

}


使用ReentrantLock配合Condition实现

package solace.concurrency;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// ReentrankLock 和Condition实现生产者消费者
// https://github.com/bjmashibing/edu/blob/master/concurrent/src/yxxy/c_021/MyContainer2.java
public class Test7<T> {

    private final Integer MAX = 20;
    private final LinkedList<T> bowl = new LinkedList<T>();

    private Lock lock = new ReentrantLock();
    Condition consumer = lock.newCondition();
    Condition producer = lock.newCondition();

    // 生产者
    public void put(T t) {
        try {
            lock.lock();

            while(bowl.size() == MAX) {
                producer.await();
            }

            bowl.add(t);
            TimeUnit.SECONDS.sleep(1);
            // 叫醒消费者
            consumer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // 消费者
    public T get() {
        T t = null;
        try {
            lock.lock();

            while(bowl.size() == 0) {
                consumer.await();
            }

            t = bowl.removeFirst();
            TimeUnit.SECONDS.sleep(1);
            // 叫醒生产者
            producer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

        return t;
    }



    public static void main(String[] args) {
        Test7 t = new Test7();

        // 先启动消费者
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                while(true) {
                    t.get();
                    System.out.println(Thread.currentThread().getName() + "=====" + t.bowl.size());
                }
            }, "C" + i).start();
        }

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 再启动生产者
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                while(true) {
                    t.put("aaa");
                    System.out.println(Thread.currentThread().getName()  + "=====" + t.bowl.size());
                }
            }, "P" + i).start();
        }

    }
}