线程高级---生产者消费者问题

    技术2022-05-12  19

     

    说到生产者消费者问题,大部分人都不会陌生,OS课的经典问题,并且其本身就是一个计算机编程中常见的问题。可以说既是难点又是重点。对于它的应用,可以举出无数的例子,小到一个多线程程序对队列的共享互斥操作,大到目前流行的中间件产品,诸如BEA的BMQ(BEA Message Queue),IBM的MQ Serious等中间件就是将生产者消费者问题应用通用化体系化的结果。      在开始之前先简单回顾一下生产者消费者问题:一群生产者在生产消息,并将此消息提供给消费者去消费。它们中间设了具有N个缓存区的缓冲池,生产者每次可将生产的消息放入一个缓存区内,消费者每次可将一个缓存区内的消息拿出来消费。但这个过程有两个条件:任何一方操作一个缓冲区时不能有其它同时对该缓冲区进行操作;只有当缓冲区还有空余,生产者才能生产,只有当缓冲区至少有一个产品,消费者才能从中取出来消费。这里两个条件分别对应了互斥和同步。      由于JAVA面向对象的特性,使得线程的互斥同步类似管程的概念。就是将需要互斥或同步的资源作为对象单独管理,将同步和互斥都定义在这个对象中而不是每个线程中,这样一来每个线程的访问操作都必须经过这个对象,形成了统一的访问入口和出口,在多线程环境中,这种做法便于管理而不致混乱,试想,如果每个线程中都单独写同步互斥操作,那么当问题的规模一大,同步互斥就完全混乱了。      另外,为什么中间需要加个缓冲区?让生产者直接跟消费者打交道行不行呢?当然不行了,这样的话程序就变串行了,不如用一个线程来解决。就好比生产者生产完消息,要等待消费者将消息完全消费完之后才会再次进行生产一样。      之后便是一个使用多线程的写的生产者消费者的通用类,我会对其使用和编写一一解释。 /** * @author levi * MessageQueue */ package ProducerConsumer; public class MessageQueue { private Object [] buffer; private int head; private int tail; public MessageQueue(int queueSize){ buffer = new Object[queueSize]; head = tail = 1; } public synchronized void put(Object msg) { while((tail + 1) % buffer.length == head){//full try{ wait(); } catch(InterruptedException ie){ } } buffer[tail] = msg; tail = (tail + 1) % buffer.length; notifyAll(); } public synchronized Object get(){ while(head == tail){//empty try{ wait(); } catch(InterruptedException ie){ } } Object obj = buffer[head]; head = (head + 1) % buffer.length; notifyAll(); return obj; } } 

         临界资源,类似上面说的管程的概念,这个类在实现时用了一个对象数组,当然也可以用LinkedList来做,更为方便。来分析一个put方法。刚开始使用到一个之前提到的循环锁,循环条件是(tail + 1) % buffer.length == head,这里用到了数据结构中的循环队列,故判断循环队列满的条件如上,如果满了,自然只能等待,一旦有一个空位,则将消息放入空位,同时将队尾指针加一,最后唤醒所有等待的线程。比较简单,却十分核心。

    /** * @author levi * Producer */ package ProducerConsumer; public class Producer extends Thread{ private MessageQueue queue; public Producer(MessageQueue queue){ this.queue = queue; } public void run(){ for(int i = 0;i < 100;i++){ queue.put("object" + i); try{ Thread.sleep(100); } catch(InterruptedException ie){ } } } } 

         生产者类,主要产生消息,注意继承了Thread并实现了run,并且有一个对MessageQueue实例的引用,主要调用了MessageQueue的put方法。

    /** * @author levi * Consumer */ package ProducerConsumer; public class Consumer extends Thread{ private MessageQueue queue; public Consumer(MessageQueue queue){ this.queue = queue; } public void run(){ for(int i = 0;i < 100;i++){ String s = (String)queue.get(); System.out.println(Thread.currentThread().getName() + " " + s); } } } 

    消费者类,主要消费消息,跟生产者是对称的,主要调用了MessageQueue的get方法。

    /** * @author levi * Test */ package ProducerConsumer; public class Test { public static void main(String [] args){ MessageQueue queue = new MessageQueue(10); new Producer(queue).start(); new Consumer(queue).start(); new Producer(queue).start(); new Consumer(queue).start(); new Producer(queue).start(); new Consumer(queue).start(); } } 

    主类,主要构造出一个MessageQueue并启动N个生产者和消费者。

     

        从以上这个例子,可以看出来,通过一个MessageQueue,我们将一个对象的创建和执行分离开了(在生产者中创建,在消费者中执行,注意,这个对象可以执行复杂的方法)。这种分离带来了极大的好处。比如将费时的执行分开之后提高了创建线程的响应性,并且它们之间的顺序实现了可控性,不用创建完了马上就执行;同时我们能够创建后根据条件取消或者重复执行;最后它们的分离使得可以分布式做创建和分离,从而出现了类似BMQ的消息中间件。

        最后来讲一个JDK中Producer-Consumer的应用,提起JAVA的多线程,最先想到的自然是JVM后台执行的那个GC(garbage collection)始终默默无闻的执行清扫工作,甚至让我们感觉不到;其次是applet,每个applet都要继承Applet,而Applet的init就是创建线程的方法;最后,最为常用的一个就是JAVA的GUI了,为了保证GUI的响应性,JAVA使用了多线程,并且是用生产者消费者问题来解决问题的。   

     


    最新回复(0)