运用异步输入输出流编写Socket历程通信
副标题#e#
同步?异步输入输出机制的引入
在Merlin之前,编写Socket措施是比 较繁琐的事情.因为输入输出都必需同步.这样,对付多客户端客户/处事器模式, 不得不利用多线程.即为每个毗连的客户都分派一个线程来处理惩罚输入输出.由此而 带来的问题是可想而知的.措施员不得不为了制止死锁,线程安详等问题,举办大 量的编码和测试.许多人都在诉苦为什么不在Java中引入异步输入输出机制.较量 官方的表明是,任何一种应用措施接口的引入,都必需兼容任何操纵平台.因为 Java是跨平台的.而其时支持异步输入输出机制的操纵平台显然不行能是全部.自 Java 2 Platform今后,疏散出J2SE,J2ME,J2EE三种差异范例的应用措施接口,以 适应差异的应用开拓.Java尺度的制订者们意识到了这个问题,而且支持异步输入 输出机制的操纵平台在当今操纵平台中处于主流职位.于是,Jdk(J2SE) 的第五次 宣布中引入了异步输入输出机制.
以前的Socket历程通信措施设计中,一 般客户端和处事器端措施设计如下:
处事器端:
//处事器 端监听线程
while (true) {
.............
Socket clientSocket;
clientSocket = socket.accept(); //取得客户请求Socket,假如没 有//客户请求毗连,线程在此处阻塞
//用取得的Socket 结构输入输出流
PrintStream os = new PrintStream (new
BufferedOutputStream (clientSocket.getOutputStream(),
1024), false);
BufferedReader is = new BufferedReader (new
InputStreamReader (clientSocket.getInputStream()));
//建设客户会话 线程,举办输入输出节制,为同步机制
new ClientSession();
.......
}
客户端:
............
clientSocket = new Socket(HOSTNAME, LISTENPORT);//毗连处事器套接字
//用取得的 Socket结构输入输出流
PrintStream os = new PrintStream(new
BufferedOutputStream(clientSocket.getOutputStream (),
1024), false);
BufferedReader is = new BufferedReader(new
InputStreamReader(clientSocket.getInputStream()));
//举办输入 输出节制
.......
以上代码段只是用同步机制编写 Socket历程通信的一个框架,实际上要思量的问题要巨大的多(有乐趣的读者可 以参考我的一篇文章《Internet 及时通信系统设计与实现》)。将这样一个框 架列出来,只是为了与用异步机制实现的Socket历程通信举办较量。下面将先容 利用异步机制的措施设计。
用异步输入输出流编写Socket历程通信措施
在Merlin中插手了用于实现异步输入输出机制的应用措施接口包: java.nio(新的输入输出包,界说了许多根基范例缓冲(Buffer)), java.nio.channels(通道及选择器等,用于异步输入输出),java.nio.charset (字符的编码解码)。通道(Channel)首先在选择器(Selector)中注册本身感兴 趣的事件,当相应的事件产生时,选择器便通过选择键(SelectionKey)通知已注 册的通道。然后通道将需要处理惩罚的信息,通过缓冲(Buffer)打包,编码/解码, 完成输入输出节制。
通道先容:
这里主要先容 ServerSocketChannel和 SocketChannel.它们都是可选择的(selectable)通道, 别离可以事情在同步和异步两种方法下(留意,这里的可选择不是指可以选择两 种事情方法,而是指可以有选择的注册本身感乐趣的事件)。可以用 channel.configureBlocking(Boolean )来配置其事情方法。与以前版本的API相 较量,ServerSocketChannel就相当于ServerSocket(ServerSocketChannel封装 了ServerSocket),而SocketChannel就相当于Socket(SocketChannel封装了 Socket)。当通道事情在同步方法时,编程要领与以前的基内情似,这里主要介 绍异步事情方法。
所谓异步输入输出机制,是指在举办输入输出处理惩罚时 ,不必比及输入输出处理惩罚完毕才返回。所以异步的同义语长短阻塞(None Blocking)。在处事器端,ServerSocketChannel通过静态函数open()返回一个 实例serverChl。然后该通道挪用serverChl.socket().bind()绑定随处事器某端 口,并挪用register(Selector sel, SelectionKey.OP_ACCEPT)注册 OP_ACCEPT事件到一个选择器中(ServerSocketChannel只可以注册OP_ACCEPT事 件)。当有客户请求毗连时,选择器就会通知该通道有客户毗连请求,就可以进 行相应的输入输出节制了;在客户端,clientChl实例注册本身感乐趣的事件后 (可以是OP_CONNECT,OP_READ,OP_WRITE的组合),挪用clientChl.connect (InetSocketAddress )毗连处事器然后举办相应处理惩罚。留意,这里的毗连是异步 的,即会当即返回而继承执行后头的代码。
#p#副标题#e#
选择器和选择键先容:
#p#分页标题#e#
选择器(Selector)的浸染是:将通道感乐趣的事件放入行列中,而不 是顿时提交给应用措施,等已注册的通道本身来请求处理惩罚这些事件。换句话说, 就是选择器将会随时陈诉已经筹备好了的通道,并且是凭据先进先出的顺序。那 么,选择器是通过什么来陈诉的呢?选择键(SelectionKey)。选择键的浸染就是 表白哪个通道已经做好了筹备,筹备干什么。你也许顿时会想到,那必然是已注 册的通道感乐趣的事件。不错,譬喻对付处事器端serverChl来说,可以挪用 key.isAcceptable()来通知serverChl有客户端毗连请求。相应的函数尚有: SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一个循 环中轮询感乐趣的事件(详细可参照下面的代码)。假如选择器中尚无通道已注 册事件产生,挪用Selector.select()将阻塞,直到有事件产生为止。别的,可 以挪用selectNow()可能select(long timeout)。前者当即返回,没有事件时返 回0值;后者期待timeout时间后返回。一个选择器最多可以同时被63个通道一起 注册利用。
应用实例:
下面是用异步输入输出机制实现的客户/ 处事器实例措施清单(限于篇幅,只给出了处事器端 实现,读者可以参照着实现客户端代码):
措施类图
措施清单1
public class NBlockingServer {
int port = 8000;
int BUFFERSIZE = 1024;
Selector selector = null;
ServerSocketChannel serverChannel = null;
HashMap clientChannelMap = null;//用来存放每一个客户毗连对应的套接字和 通道
public NBlockingServer( int port ) {
this.clientChannelMap = new HashMap();
this.port = port;
}
public void initialize() throws IOException {
//初始化,别离实例化一个选择器,一个处事器端 可选择通道
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
InetAddress localhost = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(localhost, this.port );
this.serverChannel.socket().bind(isa);//将该套接字绑定随处事器某一可用 端口
}
//竣事时释放资源
public void finalize() throws IOException {
this.serverChannel.close ();
this.selector.close();
}
//将读入字 节缓冲的信息解码
public String decode( ByteBuffer byteBuffer ) throws
CharacterCodingException {
Charset charset = Charset.forName( "ISO-8859-1" );
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode( byteBuffer );
String result = charBuffer.toString();
return result;
}
//监听端口,当通道筹备好时举办相应操纵
public void portListening() throws IOException, InterruptedException {
//处事器端通道注册OP_ACCEPT事件
SelectionKey acceptKey =this.serverChannel.register( this.selector,
SelectionKey.OP_ACCEPT );
//当有已 注册的事件产生时,select()返回值将大于0
while (acceptKey.selector().select() > 0 ) {
System.out.println("event happened");
//取 得所有已经筹备好的所有选择键
Set readyKeys = this.selector.selectedKeys();
//利用迭代器对选择键进 行轮询
Iterator i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey)i.next();
i.remove();//删除当前将要 处理惩罚的选择键
if ( key.isAcceptable() ) {//假如是 有客户端毗连请求
System.out.println ("more client connect in!");
ServerSocketChannel nextReady =
(ServerSocketChannel)key.channel();
//获取客 户端套接字
Socket s = nextReady.accept();
//配置对应的通道为异步方法并注册感乐趣事件
s.getChannel().configureBlocking( false );
SelectionKey readWriteKey =
s.getChannel().register( this.selector,
SelectionKey.OP_READ|SelectionKey.OP_WRITE );
//将注册的事件与该套接字接洽起来
readWriteKey.attach( s );
//将当前成立毗连的客户端套接字及对应的通道存放在哈希 表//clientChannelMap中
this.clientChannelMap.put( s, new
ClientChInstance( s.getChannel () ) );
}
else if ( key.isReadable() ) {//假如是通道读筹备功德件
System.out.println("Readable");
// 取得选择键对应的通道和套接字
SelectableChannel nextReady =
(SelectableChannel) key.channel();
Socket socket = (Socket) key.attachment();
//处理惩罚该 事件,处理惩罚要领已封装在类ClientChInstance中
this.readFromChannel( socket.getChannel(),
(ClientChInstance)
this.clientChannelMap.get( socket ) );
}
else if ( key.isWritable() ) {// 假如是通道写筹备功德件
System.out.println ("writeable");
//取得套接字后处理惩罚, 要领同上
Socket socket = (Socket) key.attachment();
SocketChannel channel = (SocketChannel)
socket.getChannel();
this.writeToChannel( channel,"This is from server!");
}
}
}
}
//对通道的写操纵
public void writeToChannel( SocketChannel channel, String message )
throws IOException {
ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );
int nbytes = channel.write( buf );
}
//对通道的读操 作
public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance )
throws IOException, InterruptedException {
ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE );
int nbytes = channel.read( byteBuffer );
byteBuffer.flip();
String result = this.decode( byteBuffer );
//当客户端 发出”@exit”退出呼吁时,封锁其通道
if ( result.indexOf( "@exit" ) >= 0 ) {
channel.close();
}
else {
clientInstance.append( result.toString() );
//读入一行完毕,执行相应操纵
if ( result.indexOf( "n" ) >= 0 ){
System.out.println("client input"+result);
clientInstance.execute();
}
}
}
//该类封装了奈何对客户端的通道举办操纵,详细实现可以 通过重载execute()要领
public class ClientChInstance {
SocketChannel channel;
StringBuffer buffer=new StringBuffer();
public ClientChInstance( SocketChannel channel ) {
this.channel = channel;
}
public void execute() throws IOException {
String message = "This is response after reading from channel! ";
writeToChannel( this.channel, message );
buffer = new StringBuffer();
}
//当一行没有竣事时,将当前字窜置于缓冲尾
public void append( String values ) {
buffer.append( values );
}
}
//主措施
public static void main( String[] args ) {
NBlockingServer nbServer = new NBlockingServer(8000);
try {
nbServer.initialize();
} catch ( Exception e ) {
e.printStackTrace();
System.exit( -1 );
}
try {
nbServer.portListening();
}
catch ( Exception e ) {
e.printStackTrace();
}
}
}
小结:
#p#分页标题#e#
从以上措施段可以看出,服 务器端没有引入多余线程就完成了多客户的客户/处事器模式。该措施中利用了 回调模式(CALLBACK),细心的读者应该早就看出来了。需要留意的是,请不要将 本来的输入输出包与新插手的输入输出包混用,因为出于一些原因的思量,这两 个包并不兼容。纵然用通道时请利用缓冲完成输入输出节制。该措施在 Windows2000,J2SE1.4下,用telnet测试乐成。