设计模式之生产者消费者模式

生产者消费者模式一般具有以下几个部分:生产者、消费者、缓冲区。常用来进行模块间的解藕。

Java 实现例子

生产者和消费者

缓冲区

public interface IStorage {
    void put(Object obj);
    Object get();
}

生产者

public class Producer extends Thread {
    private int num;            // 每次生产的产品数量
    private IStorage storage;   // 数据缓冲区

    public Producer(IStorage storage, int num) {
        this.storage = storage;
        this.num = num;
    }

    @Override
    public void run() {
        produce();
    }

    public void produce() {
        for (int i = 0; i < num; i++) {
            storage.put(new Object().hashCode());
        }
    }
}

消费者

public class Consumer extends Thread {
    private int num;            // 每次消费的产品数量
    private IStorage storage;   // 数据缓冲区

    public Consumer(IStorage storage, int num) {
        this.storage = storage;
        this.num = num;
    }

    @Override
    public void run() {
        consume();
    }

    public void consume() {
        for (int i = 0; i < num; i++) {
            storage.get();
        }
    }
}

缓冲区实现

模式一

synchronized 控制线程同步

public class Storage1 {
    // 缓冲区最大限制
    private final int maxSize = 10;
    // Queue 存储数据
    private Queue<Object> queue = new LinkedList<Object>();

    public void put(Object obj) throws InterruptedException {
        synchronized (queue) {
            while (queue.size() > maxSize) {
                try {
                    // 生产阻塞
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            queue.add(obj);
            queue.notifyAll();
        }
    }

    public Object get() throws InterruptedException {
        Object obj = null;
        synchronized (queue) {
            while (queue.size() <= 0) {
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            obj = queue.poll();
            queue.notifyAll();
        }
        return obj;
    }
}

模式二

Condition 控制线程同步

public class Storage2 {
    // 缓冲区最大限制
    private final int maxSize = 100;
    // LinkedList 存储数据
    private LinkedList<Object> list = new LinkedList<Object>();

    private final Lock lock = new ReentrantLock();
    // 仓库满的条件变量
    private final Condition full = lock.newCondition();
    // 仓库空的条件变量
    private final Condition empty = lock.newCondition();

    public void put(Object obj) {
        lock.lock();

        while (list.size() >= maxSize) {
            try {
                // 生产阻塞
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        list.add(obj);
        //full.signalAll();
        empty.signalAll();

        lock.unlock();
    }

    public Object get() {
        lock.lock();

        while (list.size() <= 0) {
            try {
                // 消费阻塞
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        Object obj = list.remove();
        full.signalAll();
        //empty.signalAll();

        lock.unlock();
        return obj;
    }
}

模式三

LinkedBlockingQueue 实现同步

public class Storage3 {
    // LinkedBlockingQueue 存储数据,并设置缓存区最大限制
    private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(10);

    @Override
    public void put(Object obj) {
        try {
            list.put(obj);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Object get() {
        Object obj = null;
        try {
            obj = list.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return obj;
    }
}

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注

4 × 5 =