图1.1
说明
IOService: 这个接口抽象了一切I/O操作,包括建立端口监听、I/O读写, IOProcessor: 为IOService处理正真的I/O读写操作。IOService在某端口监听,当发生int selected = select();时,说明有建立连接的客户端请求,IOService会调用IOProcessor,将session和交给processor处理。 IOProcessor也会有一个int selected = select(1000);的阻塞,等待IO事件。 当I/O事件到达后,会把输入读入到一个BUF中session.getChannel().read(buf.buf()) (Java NIO)。 IOFilter: 上面IOProcessor得到buf后,会交给 IOFilter处理,如果是读取IO则触发 messageReceived方法。 IOFilter一般被包含在一个IoFilterChain中,顾名思义,当然是实现了责任链模式的一根链条咯。 IoHandler: 是拿到buf做处理的一个接口,是用户最终会面对的一个接口。 上面的IOFilter得到buf(以接收数据为例),IOFilter调用了 IoHandler的messageReceived(s, message);方法。最后还要提一个接口:
IoSession:
A handle which represents connection between two end-points regardless of
transport types.
实际上这个接口是一个会话服务,或者它更像是一个大容器。后面我们还会讲到它。
IoAcceptor接口:
服务器端接口,接受客户端访问的请求
AbstractIoService设定处理函数、Filter、IoServiceListener。
处理函数是一个 Executor或者是Executor的包装。 IoServiceListener可以用于打印日志,主要有Service启动、停止、空闲等监听方法 AbstractIoAcceptor完成绑定监听端口 AbstractPollingIoAcceptor执行具体的监听连接以及监听I/O事件 NioSocketAcceptor用JAVA NIO的方式实现了具体的连接方法 ServerSocketChannel,例如open,accept等
显然,这个结构实现了组成模式
SimpleIoProcessorPool内有一个数组,包装了IoProcessor。这个SimpleIoProcessorPool的DEFAULT_SIZE属性,定义了开多少个IoProcessor。
前面讲过IoProcessor为IOService处理正真的I/O读写操作。(在nio下,一个IoProcessor可以在单线程中处理多个连接请求和io事件)除此之外,再开多个IoProcessor,一个IoProcessor启动一个线程。
SocketAcceptor acceptor = new NioSocketAcceptor();
在图1.1中,我们看到,mina会首先把请求交给IOService。实际上,这个接口也是用户首先面对的接口。
用户在这个端口上执行 setHandler(IoHandler handler)方法。 并执行IoAcceptor接口的bind(SocketAddress localAddress)方法。 实例化 SocketAcceptor,会super()其父类的构造函数。我们依次看看三个父类的构造函数
AbstractPollingIoAcceptorprotected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
Class extends IoProcessor > processorClass) {
this(sessionConfig, null, new SimpleIoProcessorPool (processorClass),
true);
}
构造函数AbstractPollingIoAcceptor (new DefaultSocketSessionConfig(), NioProcessor.class);定义了默认的SessionConfig和NioProcessor。 new SimpleIoProcessorPool (processorClass) 是把NioProcessor包装成了pool.看类图IoProcessor就很好理解了,这是一个组成模式。AbstractIoAcceptor
protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
super(sessionConfig, executor);
defaultLocalAddresses.add(null);
}
AbstractIoAcceptor主要用来绑定监听端口。这个构造函数没有干其他的事情。
AbstractIoService
protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor)
这个构造函数初始化sessionConfig,和当executor为空时,if (executor == null) {
System.out.println("[GL:executor:]"+executor);
this.executor = Executors.newCachedThreadPool();
createdExecutor = true;
} else {
this.executor = executor;
createdExecutor = false;
}
这个构造函数还有一个监听器,或者叫监听池(可以包含多个监听器)。用来监听service创建、连接、断开等动作,当上述动作发生地时候,会调用listener。里面可以写自己的一些方法。
acceptor.setHandler(new EchoProtocolHandler());
设定处理函数,EchoProtocolHandler()是用户自己编写的一个handle,看图1.1可以知道,这是mina平台处理的最后一步。Mina把请求经过ioprocessor处理器、多个iofilter过滤器 处理后,才会交给EchoProtocolHandler。
我们看看从IOService到IoHandler之间发生了什么
在main()函数中,我们看到一个方法
acceptor.bind(new InetSocketAddress(PORT));
这一个bind方法,表面看只是一个端口绑定,实际上发生了很多有趣的事情。
IOService 类图告诉我们,bind端口应该是AbstractIoAcceptor干的事情,去这个类中找。
AbstractIoAcceptor类bind方法
//端口打包成一个list,以实现多端口监听
List localAddresses = new ArrayList (1);
localAddresses.add(localAddress);
然后调用了子类 AbstractPollingIoAcceptor 的bind0方法
bind0(localAddressesCopy);
AbstractPollingIoAcceptor. bind0方法,把代码贴出来
protected final Set bind0(
List extends SocketAddress> localAddresses) throws Exception {
//这个request包装了localAddresses,间接包装了localAddress
AcceptorOperationFuture request = new AcceptorOperationFuture(
localAddresses);
// adds the Registration request to the queue for the Workers
// to handle
registerQueue.add(request);
// creates the Acceptor instance and has the local
// executor kick it off.
/*
这个方法很重要,我们去看看发生了什么
*/
startupAcceptor();
// wakeup()即selector.wakeup();
wakeup();
//bind0结束
startupAcceptor();方法内部代码。需要注意的是Acceptor不是IOACCEPTOR,这个Acceptor是一个线程,用于处理IO事件。
if (acceptor == null) {
/*
Acceptor完成 即监听连接事件,
如果有感兴趣的连接发生,即调用 processHandles(selectedHandles());
*/
acceptor = new Acceptor();
//调用爷爷类 AbstractIoService 的方法。
//把acceptor分配给线程池
executeWorker(acceptor);
}
Acceptor()代码
private class Acceptor implements Runnable {
public void run() {
int nHandles = 0;
while (selectable) {//这个循环用来监听连接到达,以建立连接和io事件
try {
System.out.println("[GaoLing:]连接建立");
int selected = select();//如果一直没有新连接,则一直会阻塞
/*
*这个方法完成:打开ServerSocketChannel通道,注册感兴趣的监听
将ServerSocketChannel 丢到boundHandles中去
就是在定义的若干个端口放置监听事件,接受accept
注意:只是建立监听而已!还没有建立客户端连接!!
完成sessionConfig、IoProcessor的初始化【SocketAcceptor父类】->
acceptor.setHandler【main】
3.4.2运行(对照类图看)
acceptor.bind(new InetSocketAddress(PORT)); 【main】->
AbstractIoAcceptor.bind【AbstractIoAcceptor】->
AbstractPollingIoAcceptor. bind0【AbstractPollingIoAcceptor】->
startupAcceptor()【开线程,监听连接事件,AbstractPollingIoAcceptor】->
class Acceptor【AbstractPollingIoAcceptor】->
processHandles(Iterator handles) 【AbstractPollingIoAcceptor】->
T session = accept(processor, handle); 【初始化session,AbstractPollingIoAcceptor】->
session.getProcessor().add(session); 【处理io事件,AbstractPollingIoAcceptor】->
SimpleIoProcessorPool.add【这里用到了自定义线程池(数组),SimpleIoProcessorPool】->
AbstractPollingIoProcessor.add【调用startupWorker(),AbstractPollingIoProcessor】->
startupWorker();【处理io事件。AbstractPollingIoProcessor】->
processor = new Processor();【线程池,处理IO输入输出数据,AbstractPollingIoProcessor】->
process()【AbstractPollingIoProcessor】->
read(T session) 【AbstractPollingIoProcessor】->
filterChain->
handle
-->