今天开始研究jetty .发现一个jetty 的 source 中 有一个 threadPool研究一下

    技术2022-05-11  121

    // ========================================================================// $Id: LifeCycle.java,v 1.5 2004/05/09 20:32:49 gregwilkins Exp $// Copyright 1999-2004 Mort Bay Consulting Pty. Ltd.// ------------------------------------------------------------------------// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.// ========================================================================

    package org.mortbay.util;/* ------------------------------------------------------------ *//** A component LifeCycle. * Represents the life cycle interface for an abstract * software component.  * * @version $Id: LifeCycle.java,v 1.5 2004/05/09 20:32:49 gregwilkins Exp $ * @author Greg Wilkins (gregw) */public interface LifeCycle{    /* ------------------------------------------------------------ */    /** Start the LifeCycle.     * @exception Exception An arbitrary exception may be thrown.     */    public void start()        throws Exception;        /* ------------------------------------------------------------ */    /** Stop the LifeCycle.     * The LifeCycle may wait for current activities to complete     * normally, but it can be interrupted.     * @exception InterruptedException Stopping a lifecycle is rarely atomic     * and may be interrupted by another thread.  If this happens     * InterruptedException is throw and the component will be in an     * indeterminant state and should probably be discarded.     */    public void stop()        throws InterruptedException;       /* ------------------------------------------------------------ */    /**      * @return True if the LifeCycle has been started.      */    public boolean isStarted();}

    // ========================================================================                    Pool// ========================================================================

    package org.mortbay.util;

    import java.io.Serializable;import java.util.HashMap;

    import org.apache.commons.logging.Log;import org.mortbay.log.LogFactory;

    /* ------------------------------------------------------------ *//** A pool of Objects. * <p> * @version $Id: Pool.java,v 1.13 2005/08/13 00:01:28 gregwilkins Exp $ * @author Juancarlo Az <juancarlo@modelistica.com> * @author Greg Wilkins <gregw@mortbay.com> */public class Pool    implements LifeCycle, Serializable{    private static Log log = LogFactory.getLog(Pool.class);

        /* ------------------------------------------------------------ */    static int __max =         Integer.getInteger("POOL_MAX",256).intValue();    static int __min =        Integer.getInteger("POOL_MIN",2).intValue();

            /* ------------------------------------------------------------ */    public static interface PondLife    {        int getID();        void enterPool(Pool pool,int id);        void poolClosing();        void leavePool();    }        /* ------------------------------------------------------------------- */    static HashMap __nameMap=new HashMap();        /* ------------------------------------------------------------------- */    private int _max = __max;    private int _min = __min;    private String _name;    private String _className;    private int _maxIdleTimeMs=10000;    private HashMap _attributes = new HashMap();        private transient Class _class;    private transient PondLife[] _pondLife; // Array of pondlife indexed by ID.    private transient int[] _index; // Mapping of pondlife IDs.  Entries with indexes <_available are idle IDs.  Entries with indexes>_size are unused IDs.    private transient int _size;    private transient int _available;    private transient int _running=0;    private transient long _lastShrink=0;  // control shrinking to once per maxIdleTime        /* ------------------------------------------------------------------- */    public static Pool getPool(String name)    {        return (Pool)__nameMap.get(name);    }

        /* ------------------------------------------------------------------- */    /* Construct     */    public Pool()     {}

        /* ------------------------------------------------------------ */    /**      * @return The name of the Pool.     */    public String getPoolName()    {        return _name;    }

        /* ------------------------------------------------------------ */    /**      * @param name The pool name     * @exception IllegalStateException If the name is already defined.     */    public void setPoolName(String name)        throws IllegalStateException    {        synchronized(this)        {            synchronized(Pool.class)            {                if (_name!=null && !_name.equals(name))                    __nameMap.remove(_name);                if (__nameMap.containsKey(name))                    throw new IllegalStateException("Name already exists");                _name=name;                                __nameMap.put(_name,this);            }        }    }        /* ------------------------------------------------------------ */    /** Set the class.     * @param poolClass The class     * @exception IllegalStateException If the pool has already     *            been started.     */    public void setPoolClass(Class poolClass)        throws IllegalStateException    {        synchronized(this)        {            if (_class!=poolClass)            {                if (_running>0)                    throw new IllegalStateException("Thread Pool Running");                if (!PondLife.class.isAssignableFrom(poolClass))                    throw new IllegalArgumentException("Not PondLife: "+poolClass);                _class=poolClass;                _className=_class.getName();            }        }    }   

        /* ------------------------------------------------------------ */    public Class getPoolClass()    {        return _class;    }

        /* ------------------------------------------------------------ */    public int getMinSize()    {        return _min;    }        /* ------------------------------------------------------------ */    public void setMinSize(int min)    {        _min=min;    }        /* ------------------------------------------------------------ */    public int getMaxSize()    {        return _max;    }        /* ------------------------------------------------------------ */    public void setMaxSize(int max)    {        _max=max;    }        /* ------------------------------------------------------------ */    public int getMaxIdleTimeMs()    {        return _maxIdleTimeMs;    }        /* ------------------------------------------------------------ */    public void setMaxIdleTimeMs(int maxIdleTimeMs)    {        _maxIdleTimeMs=maxIdleTimeMs;    }        /* ------------------------------------------------------------ */    public void setAttribute(String name,Object value)    {        _attributes.put(name,value);    }        /* ------------------------------------------------------------ */    public Object getAttribute(String name)    {        return _attributes.get(name);    }        /* ------------------------------------------------------------ */    public boolean isStarted()    {        return _running>0 && _pondLife!=null;    }        /* ------------------------------------------------------------ */    public int size()    {        return _size;    }        /* ------------------------------------------------------------ */    public int available()    {        return _available;    }            /* ------------------------------------------------------------ */    public void start()        throws Exception    {        synchronized(this)        {            _running++;            if (_running>1)                return;

          if (_min >= _max || _max<1)    throw new IllegalStateException("!(0<=min<max)");

                // Start the threads            _pondLife=new PondLife[_max];            _index=new int[_max];            _size=0;            _available=0;                        for (int i=0;i<_max;i++)                _index[i]=i;            for (int i=0;i<_min;i++)                newPondLife();        }    }

        /* ------------------------------------------------------------ */    public void stop()        throws InterruptedException    {        synchronized(this)        {            _running--;            if (_running>0)                return;            notifyAll();        }                if (_pondLife!=null && _size>0)        {            for (int i=0;i<_pondLife.length;i++)                closePondLife(i);            Thread.yield();            for (int i=0;i<_pondLife.length;i++)                stopPondLife(i);        }

            synchronized(this)        {            _pondLife=null;            _index=null;            _size=0;            _available=0;        }    }        /* ------------------------------------------------------------ */    public PondLife get(int timeoutMs)        throws Exception    {        PondLife pl=null;                // Defer to other threads before locking         if (_available<_min)            Thread.yield();                int new_id=-1;                // Try to get pondlife without creating new one.        synchronized(this)        {            // Wait if none available.            if (_running>0 && _available==0 && _size==_pondLife.length && timeoutMs>0)                wait(timeoutMs);

                // If still running            if (_running>0)            {                // if pondlife available                if (_available>0)                {                    int id=_index[--_available];                    pl=_pondLife[id];                }                else if (_size<_pondLife.length)                {                    // Reserve spot for a new one                    new_id=reservePondLife(false);                }            }

                // create reserved pondlife            if (pl==null && new_id>=0)                pl=newPondLife(new_id);        }

            return pl;    }            /* ------------------------------------------------------------ */    public void put(PondLife pl)        throws InterruptedException    {        int id=pl.getID();                synchronized(this)        {            if (_running==0)                stopPondLife(id);            else if (_pondLife[id]!=null)            {                _index[_available++]=id;                notify();            }        }            }        /* ------------------------------------------------------------ */    public void shrink()        throws InterruptedException    {        if (_running==0)            return;

            synchronized(this)        {            // If we have a maxIdleTime, then only shrink once per period.            if (_maxIdleTimeMs>0)            {                long now=System.currentTimeMillis();                if ((now-_lastShrink)<_maxIdleTimeMs)                    return; // don't shrink                _lastShrink=now;            }                        // shrink if we are running and have available threads and we are above minimal size            if (_running>0 && _available>0 && _size>_min)                stopPondLife(_index[--_available]);        }    }

        /* ------------------------------------------------------------ */    private int reservePondLife(boolean available)        throws Exception    {        int id=-1;        synchronized(this)        {            id=_index[_size++];            if (available)                _index[_available++]=id;        }        return id;    }        /* ------------------------------------------------------------ */    private PondLife newPondLife(int id)        throws Exception    {        PondLife pl= (PondLife)_class.newInstance();        _pondLife[id]=pl;        pl.enterPool(this,id);        return pl;    }

        /* ------------------------------------------------------------ */    private PondLife newPondLife()        throws Exception    {        return newPondLife(reservePondLife(true));    }        /* ------------------------------------------------------------ */    private void closePondLife(int id)    {        if (_pondLife[id]!=null)            _pondLife[id].poolClosing();    }        /* ------------------------------------------------------------ */    private void stopPondLife(int id)    {        PondLife pl = null;        synchronized(this)        {            pl=_pondLife[id];            if (pl!=null)            {                _pondLife[id]=null;                _index[--_size]=id;                if (_available>_size)                    _available=_size;            }        }        if (pl!=null)            pl.leavePool();    }

        /* ------------------------------------------------------------ */    public void dump(String msg)    {        StringBuffer pond=new StringBuffer();        StringBuffer avail=new StringBuffer();        StringBuffer index=new StringBuffer();                 pond.append("pond: ");         avail.append("avail:");         index.append("index:");                for (int i=0;i<_pondLife.length;i++)        {            if (_pondLife[i]==null)                pond.append("   ");            else            {                pond.append(' ');                StringUtil.append(pond,(byte)i,16);            }                        if (i==_size)                avail.append(i==_available?" AS":"  S");            else                avail.append(i==_available?" A ":"   ");                        index.append(' ');            StringUtil.append(index,(byte)_index[i],16);        }

            System.err.println();        System.err.println(msg);        System.err.println(pond);        System.err.println(avail);        System.err.println(index);    }            /* ------------------------------------------------------------ */    private void readObject(java.io.ObjectInputStream in)        throws java.io.IOException, ClassNotFoundException    {        in.defaultReadObject();        if (_class==null || !_class.getName().equals(_className))        {            try            {                setPoolClass(Loader.loadClass(Pool.class,_className));            }            catch (Exception e)            {                log.warn(LogSupport.EXCEPTION,e);                throw new java.io.InvalidObjectException(e.toString());            }        }    }}

    // ========================================================================// $Id: ThreadPool.java,v 1.41 2005/08/13 00:01:28 gregwilkins Exp $// Copyright 1999-2004 Mort Bay Consulting Pty. Ltd.// ------------------------------------------------------------------------// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.// ========================================================================package org.mortbay.util;

    import java.io.Serializable;

    import org.apache.commons.logging.Log;import org.mortbay.log.LogFactory;/* ------------------------------------------------------------ *//** * A pool of threads. * <p> * Avoids the expense of thread creation by pooling threads after their run methods exit for reuse. * <p> * If the maximum pool size is reached, jobs wait for a free thread.  * Idle threads timeout and terminate until the minimum number of threads are running. * <p> * This implementation uses the run(Object) method to place a job on a queue, which is read by the * getJob(timeout) method. Derived implementations may specialize getJob(timeout) to obtain jobs * from other sources without queing overheads. *  * @version $Id: ThreadPool.java,v 1.41 2005/08/13 00:01:28 gregwilkins Exp $ * @author Juancarlo A�ソスez <juancarlo@modelistica.com> * @author Greg Wilkins <gregw@mortbay.com> */public class ThreadPool implements LifeCycle,Serializable{    static Log log=LogFactory.getLog(ThreadPool.class);    static private int __pool=0;    public static final String __DAEMON="org.mortbay.util.ThreadPool.daemon";    public static final String __PRIORITY="org.mortbay.util.ThreadPool.priority";        /* ------------------------------------------------------------------- */    private Pool _pool;    private Object _join="";    private transient boolean _started;

        /* ------------------------------------------------------------------- */    /*     * Construct     */    public ThreadPool()    {        String name=this.getClass().getName();        int ld = name.lastIndexOf('.');        if (ld>=0)            name=name.substring(ld+1);        synchronized(ThreadPool.class)        {            name+=__pool++;        }                _pool=new Pool();        _pool.setPoolClass(ThreadPool.PoolThread.class);        setName(name);    }

        /* ------------------------------------------------------------ */    /**     * @return The name of the ThreadPool.     */    public String getName()    {        return _pool.getPoolName();    }

        /* ------------------------------------------------------------ */    /**     * Set the Pool name. All ThreadPool instances with the same Pool name will share the same Pool     * instance. Thus they will share the same max, min and available Threads. The field values of     * the first ThreadPool to call setPoolName with a specific name are used for the named Pool.     * Subsequent ThreadPools that join the name pool will loose their private values.     *      * @param name Name of the Pool instance this ThreadPool uses or null for an anonymous private     *                  pool.     */    public void setName(String name)    {        synchronized(Pool.class)        {            if(isStarted())            {                if((name==null&&_pool.getPoolName()!=null)||(name!=null&&!name.equals(_pool.getPoolName())))                    throw new IllegalStateException("started");                return;            }                        if(name==null)            {                if(_pool.getPoolName()!=null)                {                                   _pool=new Pool();                    _pool.setPoolName(getName());                }            }            else if (!name.equals(getName()))            {                Pool pool=Pool.getPool(name);                if(pool==null)                    _pool.setPoolName(name);                else                    _pool=pool;                   }        }    }

        /* ------------------------------------------------------------ */    /**     * @deprecated use getName()     */    public String getPoolName()    {        return getName();    }

        /* ------------------------------------------------------------ */    /**     * @deprecated use setName(String)     */    public void setPoolName(String name)    {        setName(name);    }

        /* ------------------------------------------------------------ */    /**     * Delegated to the named or anonymous Pool.     */    public boolean isDaemon()    {        return _pool.getAttribute(__DAEMON)!=null;    }

        /* ------------------------------------------------------------ */    /**     * Delegated to the named or anonymous Pool.     */    public void setDaemon(boolean daemon)    {        _pool.setAttribute(__DAEMON,daemon?"true":null);    }

        /* ------------------------------------------------------------ */    /**     * Is the pool running jobs.     *      * @return True if start() has been called.     */    public boolean isStarted()    {        return _started;    }

        /* ------------------------------------------------------------ */    /**     * Get the number of threads in the pool. Delegated to the named or anonymous Pool.     *      * @see #getIdleThreads     * @return Number of threads     */    public int getThreads()    {        return _pool.size();    }

        /* ------------------------------------------------------------ */    /**     * Get the number of idle threads in the pool. Delegated to the named or anonymous Pool.     *      * @see #getThreads     * @return Number of threads     */    public int getIdleThreads()    {        return _pool.available();    }

        /* ------------------------------------------------------------ */    /**     * Get the minimum number of threads. Delegated to the named or anonymous Pool.     *      * @see #setMinThreads     * @return minimum number of threads.     */    public int getMinThreads()    {        return _pool.getMinSize();    }

        /* ------------------------------------------------------------ */    /**     * Set the minimum number of threads. Delegated to the named or anonymous Pool.     *      * @see #getMinThreads     * @param minThreads minimum number of threads     */    public void setMinThreads(int minThreads)    {        _pool.setMinSize(minThreads);    }

        /* ------------------------------------------------------------ */    /**     * Set the maximum number of threads. Delegated to the named or anonymous Pool.     *      * @see #setMaxThreads     * @return maximum number of threads.     */    public int getMaxThreads()    {        return _pool.getMaxSize();    }

        /* ------------------------------------------------------------ */    /**     * Set the maximum number of threads. Delegated to the named or anonymous Pool.     *      * @see #getMaxThreads     * @param maxThreads maximum number of threads.     */    public void setMaxThreads(int maxThreads)    {        _pool.setMaxSize(maxThreads);    }

        /* ------------------------------------------------------------ */    /**     * Get the maximum thread idle time. Delegated to the named or anonymous Pool.     *      * @see #setMaxIdleTimeMs     * @return Max idle time in ms.     */    public int getMaxIdleTimeMs()    {        return _pool.getMaxIdleTimeMs();    }

        /* ------------------------------------------------------------ */    /**     * Set the maximum thread idle time. Threads that are idle for longer than this period may be     * stopped. Delegated to the named or anonymous Pool.     *      * @see #getMaxIdleTimeMs     * @param maxIdleTimeMs Max idle time in ms.     */    public void setMaxIdleTimeMs(int maxIdleTimeMs)    {        _pool.setMaxIdleTimeMs(maxIdleTimeMs);    }

        /* ------------------------------------------------------------ */    /**     * Get the priority of the pool threads.     *      * @return the priority of the pool threads.     */    public int getThreadsPriority()    {        int priority=Thread.NORM_PRIORITY;        Object o=_pool.getAttribute(__PRIORITY);        if(o!=null)        {            priority=((Integer)o).intValue();        }        return priority;    }

        /* ------------------------------------------------------------ */    /**     * Set the priority of the pool threads.     *      * @param priority the new thread priority.     */    public void setThreadsPriority(int priority)    {        _pool.setAttribute(__PRIORITY,new Integer(priority));    }

        /* ------------------------------------------------------------ */    /**     * Set Max Read Time.     *      * @deprecated maxIdleTime is used instead.     */    public void setMaxStopTimeMs(int ms)    {        log.warn("setMaxStopTimeMs is deprecated. No longer required.");    }

        /* ------------------------------------------------------------ */    /*     * Start the ThreadPool. Construct the minimum number of threads.     */    public void start() throws Exception    {        _started=true;        _pool.start();    }

        /* ------------------------------------------------------------ */    /**     * Stop the ThreadPool. New jobs are no longer accepted,idle threads are interrupted and     * stopJob is called on active threads. The method then waits     * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to stop, at which time killJob is     * called.     */    public void stop() throws InterruptedException    {        _started=false;        _pool.stop();        synchronized(_join)        {            _join.notifyAll();        }    }

        /* ------------------------------------------------------------ */    public void join()    {        while(isStarted()&&_pool!=null)        {            synchronized(_join)            {                try                {                    if(isStarted()&&_pool!=null)                        _join.wait(30000);                }                catch(Exception e)                {                    LogSupport.ignore(log,e);                }            }        }    }

        /* ------------------------------------------------------------ */    public void shrink() throws InterruptedException    {        _pool.shrink();    }

        /* ------------------------------------------------------------ */    /**     * Run job. Give a job to the pool.     *      * @param job If the job is derived from Runnable, the run method is called, otherwise it is     *                  passed as the argument to the handle method.     */    public void run(Object job) throws InterruptedException    {        if(job==null)            return;        try        {            PoolThread thread=(PoolThread)_pool.get(getMaxIdleTimeMs());            if(thread!=null)                thread.run(this,job);            else            {                log.warn("No thread for "+job);                stopJob(null,job);            }        }        catch(InterruptedException e)        {            throw e;        }        catch(Exception e)        {            log.warn(LogSupport.EXCEPTION,e);        }    }

        /* ------------------------------------------------------------ */    /**     * Handle a job. Called by the allocated thread to handle a job. If the job is a Runnable, it's     * run method is called. Otherwise this method needs to be specialized by a derived class to     * provide specific handling.     *      * @param job The job to execute.     * @exception InterruptedException     */    protected void handle(Object job) throws InterruptedException    {        if(job!=null&&job instanceof Runnable)            ((Runnable)job).run();        else            log.warn("Invalid job: "+job);    }

        /* ------------------------------------------------------------ */    /**     * Stop a Job. This method is called by the Pool if a job needs to be stopped. The default     * implementation does nothing and should be extended by a derived thread pool class if special     * action is required.     *      * @param thread The thread allocated to the job, or null if no thread allocated.     * @param job The job object passed to run.     */    protected void stopJob(Thread thread,Object job)    {}

        /* ------------------------------------------------------------ */    /**     * Pool Thread class. The PoolThread allows the threads job to be retrieved and active status     * to be indicated.     */    public static class PoolThread extends Thread implements Pool.PondLife    {        Pool _pool;        ThreadPool _jobPool;        Object _job;        ThreadPool _runPool;        Object _run;        int _id;        String _name;

            /* ------------------------------------------------------------ */        public void enterPool(Pool pool,int id)        {            synchronized(this)            {                _pool=pool;                _id=id;                _name=_pool.getPoolName()+"-"+id;                this.setName(_name);                this.setDaemon(pool.getAttribute(__DAEMON)!=null);                Object o=pool.getAttribute(__PRIORITY);                if(o!=null)                {                    this.setPriority(((Integer)o).intValue());                }                this.start();            }        }

            /* ------------------------------------------------------------ */        public int getID()        {            return _id;        }

            /* ------------------------------------------------------------ */        public void poolClosing()        {            synchronized(this)            {                _pool=null;                if(_run==null)                    notify();                else                    interrupt();            }        }

            /* ------------------------------------------------------------ */        public void leavePool()        {            synchronized(this)            {                _pool=null;                if(_jobPool==null&&_runPool==null)                    notify();                if(_job!=null&&_jobPool!=null)                {                    _jobPool.stopJob(this,_job);                    _job=null;                    _jobPool=null;                }                                if(_run!=null&&_runPool!=null)                {                    _runPool.stopJob(this,_run);                    _run=null;                    _runPool=null;                }            }        }

            /* ------------------------------------------------------------ */        public void run(ThreadPool pool,Object job)        {            synchronized(this)            {                _jobPool=pool;                _job=job;                notify();            }        }

            /* ------------------------------------------------------------ */        /**         * ThreadPool run. Loop getting jobs and handling them until idle or stopped.         */        public void run()        {            Object run=null;            ThreadPool runPool=null;            while(_pool!=null&&_pool.isStarted())            {                try                {                    synchronized(this)                    {                        // Wait for a job.                        if(run==null&&_pool!=null&&_pool.isStarted()&&_job==null)                            wait(_pool.getMaxIdleTimeMs());                        if(_job!=null)                        {                            run=_run=_job;                            _job=null;                            runPool=_runPool=_jobPool;                            _jobPool=null;                        }                    }                                        // handle outside of sync                    if(run!=null && runPool!=null)                        runPool.handle(run);                    else if (run==null && _pool!=null)                        _pool.shrink();                }                catch(InterruptedException e)                {                    LogSupport.ignore(log,e);                }                finally                {                    synchronized(this)                    {                        boolean got=run!=null;                        run=_run=null;                        runPool=_runPool=null;                        try                        {                            if(got&&_pool!=null)                                _pool.put(this);                        }                        catch(InterruptedException e)                        {                            LogSupport.ignore(log,e);                        }                    }                }            }        }

            public String toString()        {            return _name;        }    }}


    最新回复(0)