MINA研究-源代码

    技术2022-05-12  5

    第一部分:mina各层的关系

    图1.1

    说明

    IOService: 这个接口抽象了一切I/O操作,包括建立端口监听、I/O读写, IOProcessor: 为IOService处理正真的I/O读写操作。IOService在某端口监听,当发生int selected = select();时,说明有建立连接的客户端请求,IOService会调用IOProcessor,将session和交给processor处理。 IOProcessor也会有一个int selected = select(1000);的阻塞,等待IO事件。 当I/O事件到达后,会把输入读入到一个BUF中session.getChannel().read(buf.buf()) (Java NIO)。 IOFilter: 上面IOProcessor得到buf后,会交给 IOFilter处理,如果是读取IO则触发 messageReceived方法。 IOFilter一般被包含在一个IoFilterChain中,顾名思义,当然是实现了责任链模式的一根链条咯。 IoHandler 是拿到buf做处理的一个接口,是用户最终会面对的一个接口。 上面的IOFilter得到buf(以接收数据为例),IOFilter调用了 IoHandlermessageReceived(s, message);方法。

    最后还要提一个接口:

    IoSession

    A handle which represents connection between two end-points regardless of

    transport types.

    实际上这个接口是一个会话服务,或者它更像是一个大容器。后面我们还会讲到它。

     

    第二部分:类图

    2.1 IOService

    IoAcceptor接口:

    服务器端接口,接受客户端访问的请求

    AbstractIoService

    设定处理函数、Filter、IoServiceListener

    处理函数是一个 Executor或者是Executor的包装。 IoServiceListener可以用于打印日志,主要有Service启动、停止、空闲等监听方法 AbstractIoAcceptor完成绑定监听端口 AbstractPollingIoAcceptor执行具体的监听连接以及监听I/O事件 NioSocketAcceptor用JAVA NIO的方式实现了具体的连接方法 ServerSocketChannel,例如openaccept

     

    2.2IoProcessor

    显然,这个结构实现了组成模式

    SimpleIoProcessorPool内有一个数组,包装了IoProcessor。这个SimpleIoProcessorPool的DEFAULT_SIZE属性,定义了开多少个IoProcessor

    前面讲过IoProcessor为IOService处理正真的I/O读写操作。(在nio下,一个IoProcessor可以在单线程中处理多个连接请求和io事件)除此之外,再开多个IoProcessor,一个IoProcessor启动一个线程。

     

    2.3 IoSession

    第三部分 源码解读

    3.1初始化

    SocketAcceptor acceptor = new NioSocketAcceptor();

    图1.1中,我们看到,mina会首先把请求交给IOService。实际上,这个接口也是用户首先面对的接口。

    用户在这个端口上执行 setHandler(IoHandler handler)方法。 并执行IoAcceptor接口的bind(SocketAddress localAddress)方法。 实例化 SocketAcceptor,会super()其父类的构造函数。

    我们依次看看三个父类的构造函数

    AbstractPollingIoAcceptor

    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,

    Class extends IoProcessor > processorClass) {

    this(sessionConfig, null, new SimpleIoProcessorPool (processorClass),

    true);

    }

    构造函数AbstractPollingIoAcceptor (new DefaultSocketSessionConfig(), NioProcessor.class);定义了默认的SessionConfigNioProcessor new SimpleIoProcessorPool (processorClass) 是把NioProcessor包装成了pool.看类图IoProcessor就很好理解了,这是一个组成模式。

     

    AbstractIoAcceptor

    protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {

    super(sessionConfig, executor);

    defaultLocalAddresses.add(null);

    }

    AbstractIoAcceptor主要用来绑定监听端口。这个构造函数没有干其他的事情。

     

    AbstractIoService

    protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor)

    这个构造函数初始化sessionConfig,和当executor为空时,

    if (executor == null) {

    System.out.println("[GL:executor:]"+executor);

    this.executor = Executors.newCachedThreadPool();

    createdExecutor = true;

    } else {

    this.executor = executor;

    createdExecutor = false;

    }

    这个构造函数还有一个监听器,或者叫监听池(可以包含多个监听器)。用来监听service创建、连接、断开等动作,当上述动作发生地时候,会调用listener。里面可以写自己的一些方法。

     

     

    3.2 初始化2

    acceptor.setHandler(new EchoProtocolHandler());

    设定处理函数,EchoProtocolHandler()是用户自己编写的一个handle,看图1.1可以知道,这是mina平台处理的最后一步。Mina把请求经过ioprocessor处理器、多个iofilter过滤器 处理后,才会交给EchoProtocolHandler

    我们看看从IOService到IoHandler之间发生了什么

     

    3.3运行

    在main()函数中,我们看到一个方法

    acceptor.bind(new InetSocketAddress(PORT));

    这一个bind方法,表面看只是一个端口绑定,实际上发生了很多有趣的事情。

    IOService 类图告诉我们,bind端口应该是AbstractIoAcceptor干的事情,去这个类中找。

     

    AbstractIoAcceptorbind方法

    //端口打包成一个list,以实现多端口监听

    List localAddresses = new ArrayList (1);

    localAddresses.add(localAddress);

     

    然后调用了子类 AbstractPollingIoAcceptor bind0方法

    bind0(localAddressesCopy);

     

    AbstractPollingIoAcceptor. bind0方法,把代码贴出来

    protected final Set bind0(

    List extends SocketAddress> localAddresses) throws Exception {

          

        //这个request包装了localAddresses,间接包装了localAddress

    AcceptorOperationFuture request = new AcceptorOperationFuture(

    localAddresses);

     

    // adds the Registration request to the queue for the Workers

    // to handle

    registerQueue.add(request);

     

    // creates the Acceptor instance and has the local

    // executor kick it off.

    /*

            这个方法很重要,我们去看看发生了什么

    */

    startupAcceptor();

     

    // wakeup()selector.wakeup();

    wakeup();

    //bind0结束

     

    startupAcceptor();方法内部代码。需要注意的是Acceptor不是IOACCEPTOR,这个Acceptor是一个线程,用于处理IO事件。

    if (acceptor == null) {

        /*

                    Acceptor完成 监听连接事件

                    如果有感兴趣的连接发生,即调用 processHandles(selectedHandles());

         */

    acceptor = new Acceptor();

    //调用爷爷类 AbstractIoService 的方法。

    //acceptor分配给线程池

    executeWorker(acceptor);

    }

     

    Acceptor()代码

    private class Acceptor implements Runnable {

    public void run() {

    int nHandles = 0;

     

    while (selectable) {//这个循环用来监听连接到达,以建立连接和io事件

    try {

        System.out.println("[GaoLing:]连接建立");

    int selected = select();//如果一直没有新连接,则一直会阻塞

     

    /*

    *这个方法完成:打开ServerSocketChannel通道,注册感兴趣的监听

                        ServerSocketChannel 丢到boundHandles中去

                        就是在定义的若干个端口放置监听事件,接受accept

                        注意:只是建立监听而已!还没有建立客户端连接!!

    完成sessionConfigIoProcessor的初始化【SocketAcceptor父类】->

    acceptor.setHandlermain

     

    3.4.2运行(对照类图看)

    acceptor.bind(new InetSocketAddress(PORT)); main->

    AbstractIoAcceptor.bindAbstractIoAcceptor->

    AbstractPollingIoAcceptor. bind0AbstractPollingIoAcceptor->

    startupAcceptor()【开线程,监听连接事件,AbstractPollingIoAcceptor->

    class AcceptorAbstractPollingIoAcceptor->

    processHandles(Iterator handles) AbstractPollingIoAcceptor->

    T session = accept(processor, handle); 【初始化session,AbstractPollingIoAcceptor->

    session.getProcessor().add(session); 【处理io事件,AbstractPollingIoAcceptor->

    SimpleIoProcessorPool.add【这里用到了自定义线程池(数组),SimpleIoProcessorPool->

    AbstractPollingIoProcessor.add【调用startupWorker()AbstractPollingIoProcessor->

    startupWorker();【处理io事件。AbstractPollingIoProcessor->

    processor = new Processor();【线程池,处理IO输入输出数据,AbstractPollingIoProcessor->

    process()AbstractPollingIoProcessor->

    read(T session) AbstractPollingIoProcessor->

    filterChain->

    handle

    -->

    最新回复(0)