1.1.15 使用Semaphore实现多生产者/多消费者模式

本实验的目的不光是实现生产者与消费者模式,还要限制生产者与消费者的数量,这样代码的复杂性就提高一些,但好在使用Semaphore类实现这个功能还是比较简单的。

创建实验用的项目repastTest,类RepastService.java代码如下:


    package service;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    public class RepastService {
        volatile private Semaphore setSemaphore = new Semaphore(10);      //厨师
        volatile private Semaphore getSemaphore = new Semaphore(20);      //就餐者
        volatile private ReentrantLock lock = new ReentrantLock();
        volatile private Condition setCondition = lock.newCondition();
        volatile private Condition getCondition = lock.newCondition();
        //producePosition变量的含义是最多只有4个盒子存放菜品
        volatile private Object[] producePosition = new Object[4];
        private boolean isEmpty() {
            boolean isEmpty = true;
            for (int i = 0; i < producePosition.length; i++) {
                if (producePosition[i] ! = null) {
                    isEmpty = false;
                    break;
                }
            }
            if (isEmpty == true) {
                return true;
            } else {
                return false;
            }
        }
        private boolean isFull() {
            boolean isFull = true;
            for (int i = 0; i < producePosition.length; i++) {
                if (producePosition[i] == null) {
                    isFull = false;
                    break;
                }
            }
            return isFull;
        }
        public void set() {
            try {
                //System.out.println("set");
                setSemaphore.acquire();               //允许同时最多有10个厨师进行生产
                lock.lock();
                while (isFull()) {
                    //System.out.println("生产者在等待");
                    setCondition.await();
                }
                  for (int i = 0; i < producePosition.length; i++) {
                      if (producePosition[i] == null) {
                          producePosition[i] = "数据";
                          System.out.println(Thread.currentThread().getName()
                                  + " 生产了 " + producePosition[i]);
                          break;
                      }
                  }
                  getCondition.signalAll();
                  lock.unlock();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } finally {
                  setSemaphore.release();
              }
          }
          public void get() {
              try {
                  //System.out.println("get");
                  getSemaphore.acquire();      //允许同时最多有16个就餐者
                  lock.lock();
                  while (isEmpty()) {
                      //System.out.println("消费者在等待");
                      getCondition.await();
                  }
                  for (int i = 0; i < producePosition.length; i++) {
                      if (producePosition[i] ! = null) {
                          System.out.println(Thread.currentThread().getName()
                                  + " 消费了 " + producePosition[i]);
                          producePosition[i] = null;
                          break;
                      }
                  }
                  setCondition.signalAll();
                  lock.unlock();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } finally {
                  getSemaphore.release();
              }
          }
      }

两个线程类代码如图1-27所示。

图1-27 线程类代码

运行类Run.java代码如下:


    package test.run;
    import service.RepastService;
    import extthread.ThreadC;
    import extthread.ThreadP;
    public class Run {
        public static void main(String[] args) throws InterruptedException {
            RepastService service = new RepastService();
            ThreadP[] arrayP = new ThreadP[60];
            ThreadC[] arrayC = new ThreadC[60];
            for (int i = 0; i < 60; i++) {
                arrayP[i] = new ThreadP(service);
                arrayC[i] = new ThreadC(service);
            }
            Thread.sleep(2000);
            for (int i = 0; i < 60; i++) {
                arrayP[i].start();
                arrayC[i].start();
            }
        }
    }

程序运行结果如下:


    Thread-0 生产了 数据
    Thread-3 消费了 数据
    Thread-2 生产了 数据
    Thread-8 生产了 数据
    Thread-16 生产了 数据
    Thread-12 生产了 数据
    Thread-19 消费了 数据
    Thread-11 消费了 数据
    Thread-20 生产了 数据
    Thread-21 消费了 数据
    Thread-25 消费了 数据
    Thread-26 生产了 数据
    Thread-27 消费了 数据
    Thread-28 生产了 数据
    Thread-29 消费了 数据
    Thread-30 生产了 数据
    Thread-31 消费了 数据
    Thread-1 消费了 数据
    Thread-34 生产了 数据
    Thread-36 生产了 数据
    Thread-38 生产了 数据
    Thread-40 生产了 数据
    Thread-41 消费了 数据
    Thread-42 生产了 数据
    Thread-7 消费了 数据
    Thread-61 消费了 数据
    Thread-63 消费了 数据
    Thread-65 消费了 数据
    Thread-22 生产了 数据
    Thread-48 生产了 数据
    Thread-4 生产了 数据
    Thread-13 消费了 数据
    Thread-69 消费了 数据
    Thread-71 消费了 数据
    Thread-24 生产了 数据
    Thread-18 生产了 数据
    Thread-56 生产了 数据
    Thread-58 生产了 数据
    Thread-33 消费了 数据
    Thread-75 消费了 数据
    Thread-77 消费了 数据
    Thread-35 消费了 数据
    Thread-44 生产了 数据
    Thread-45 消费了 数据
    Thread-6 生产了 数据
    Thread-64 生产了 数据
    Thread-66 生产了 数据
    Thread-46 生产了 数据
    Thread-67 消费了 数据
    Thread-15 消费了 数据
    Thread-87 消费了 数据
    Thread-50 生产了 数据
    Thread-72 生产了 数据
    Thread-74 生产了 数据
    Thread-17 消费了 数据
    Thread-73 消费了 数据
    Thread-23 消费了 数据
    Thread-5 消费了 数据
    Thread-54 生产了 数据
    Thread-60 生产了 数据
    Thread-32 生产了 数据
    Thread-82 生产了 数据
    Thread-79 消费了 数据
    Thread-37 消费了 数据
    Thread-81 消费了 数据
    Thread-39 消费了 数据
    Thread-62 生产了 数据
    Thread-83 消费了 数据
    Thread-68 生产了 数据
    Thread-10 生产了 数据
    Thread-14 生产了 数据
    Thread-92 生产了 数据
    Thread-85 消费了 数据
    Thread-89 消费了 数据
    Thread-111 消费了 数据
    Thread-113 消费了 数据
    Thread-76 生产了 数据
    Thread-52 生产了 数据
    Thread-91 消费了 数据
    Thread-93 消费了 数据
    Thread-78 生产了 数据
    Thread-80 生产了 数据
    Thread-84 生产了 数据
    Thread-99 消费了 数据
    Thread-101 消费了 数据
    Thread-103 消费了 数据
    Thread-86 生产了 数据
    Thread-47 消费了 数据
    Thread-88 生产了 数据
    Thread-108 生产了 数据
    Thread-110 生产了 数据
    Thread-112 生产了 数据
    Thread-109 消费了 数据
    Thread-115 消费了 数据
    Thread-96 生产了 数据
    Thread-98 生产了 数据
    Thread-117 消费了 数据
    Thread-119 消费了 数据
    Thread-95 消费了 数据
    Thread-97 消费了 数据
    Thread-102 生产了 数据
    Thread-100 生产了 数据
    Thread-104 生产了 数据
    Thread-105 消费了 数据
    Thread-43 消费了 数据
    Thread-106 生产了 数据
    Thread-107 消费了 数据
    Thread-49 消费了 数据
    Thread-114 生产了 数据
    Thread-90 生产了 数据
    Thread-94 生产了 数据
    Thread-70 生产了 数据
    Thread-9 消费了 数据
    Thread-51 消费了 数据
    Thread-53 消费了 数据
    Thread-55 消费了 数据
    Thread-116 生产了 数据
    Thread-118 生产了 数据
    Thread-57 消费了 数据
    Thread-59 消费了 数据

工具editplus中的行数如图1-28所示。

图1-28 打印的行数

类Semaphore提供了限制并发线程数的功能,此功能在默认的synchronized中是不提供的。