博客
关于我
【NIO】Java NIO之选择器
阅读量:436 次
发布时间:2019-03-06

本文共 9951 字,大约阅读时间需要 33 分钟。

选择器基础与应用

作为Java NIO(非阻塞输入输出)体系的核心,选择器(Selector)在网络编程中发挥着重要作用。本文将从基础到应用,深入探讨选择器的工作原理及其实际应用场景。

选择器基础

选择器管理着一个可注册的通道集合的信息及它们的就绪状态。每个通道在注册时需要切换为非阻塞模式,这通常适用于套接字通道,但与FileChannel不兼容。通道可以同时注册到多个选择器上,但每个选择器对应的通道只能注册一次。

选择键(SelectionKey)记录了特定通道与选择器的注册关系。它包含两个比特掩码:一个表示关注的操作(interest集合),另一个表示通道当前已准备好的操作(ready集合)。通过register方法,可以将通道与选择器关联,并指定感兴趣的操作类型。

以下代码展示了通道与选择器的基本注册过程:

Selector selector = Selector.open();channel1.register(selector, SelectionKey.OP_READ);channel2.register(selector, SelectionKey.OP_WRITE);channel3.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);int readyCount = selector.select(10000);

使用选择键

选择键不仅记录了通道与选择器的关系,还包含了操作掩码。键的ready集合是interest集合的子集,表示自上次select操作以来已就绪的操作。键的取消会将其放入已取消键集合,但通道不会立即注销,直到下次操作。

选择器的使用

选择器支持多种操作,如读、写、连接和接受等。每个操作的支持取决于具体的通道类型。例如,SocketChannel不支持accept操作。

多线程场景下的可扩展性

选择器在多线程环境中表现出色。通道的事件处理可以通过线程池来分配,确保服务的高效性。以下代码展示了如何使用线程池来处理选择器事件:

import java.nio.channels.SocketChannel;import java.util.Queue;import java.util.concurrent.LinkedQueue;import java.util.concurrent.ThreadPoolExecutor;public class SelectSocketsThreadPool extends SelectorDemo {    private static final int MAX_THREADS = 5;    private ThreadPool pool = new ThreadPool(MAX_THREADS);    protected void readDataFromSocket(SelectionKey key) throws Exception {        WorkerThread worker = pool.getWorker();        if (worker == null) {            return;        }        worker.serviceChannel(key);    }    private class ThreadPool {        Queue
idle = new LinkedQueue<>(); ThreadPool(int poolSize) { for (int i = 0; i < poolSize; i++) { WorkerThread thread = new WorkerThread(this); thread.setName("Worker" + (i + 1)); thread.start(); idle.add(thread); } } WorkerThread getWorker() { WorkerThread worker = null; synchronized (idle) { if (!idle.isEmpty()) { worker = idle.poll(); } } return worker; } void returnWorker(WorkerThread worker) { synchronized (idle) { idle.add(worker); } } } private class WorkerThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate(1024); private ThreadPool pool; private SelectionKey key; WorkerThread(ThreadPool pool) { this.pool = pool; } public synchronized void run() { System.out.println(this.getName() + " is ready"); while (true) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); this.interrupted(); } if (key == null) { continue; } System.out.println(this.getName() + " has been awakened"); try { drainChannel(key); } catch (Exception e) { System.out.println("Caught '" + e + "' closing channel"); try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } key.selector().wakeup(); } key = null; this.pool.returnWorker(this); } } synchronized void serviceChannel(SelectionKey key) { this.key = key; key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); this.notify(); } void drainChannel(SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); while ((count = channel.read(byteBuffer)) > 0) { byteBuffer.flip(); while (byteBuffer.hasRemaining()) { channel.write(byteBuffer); } byteBuffer.clear(); } if (count < 0) { channel.close(); return; } key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.selector().wakeup(); } }}

客户端与服务端的通信示例

以下是SelectorServerSocketChannel(服务端)和SelectorSocketChannel(客户端)的通信示例:

服务端

import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;public class SelectorServerSocketChannel {    public static void main(String[] args) throws Exception {        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();        ServerSocket serverSocket = serverSocketChannel.socket();        Selector selector = Selector.open();        serverSocketChannel.configureBlocking(false);        serverSocket.bind(new InetSocketAddress("localhost", 1234));        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);        while (true) {            selector.select();            Iterator it = selector.selectedKeys().iterator();            while (it.hasNext()) {                SelectionKey key = (SelectionKey) it.next();                it.remove();                if (key.isAcceptable()) {                    ServerSocketChannel server = (ServerSocketChannel) key.channel();                    SocketChannel channel = server.accept();                    channel.configureBlocking(false);                    channel.register(selector, SelectionKey.OP_READ);                    System.out.println("Connected: " + channel.socket().getRemoteSocketAddress());                }                if (key.isReadable()) {                    ByteBuffer byteBuffer = ByteBuffer.allocate(512);                    SocketChannel socketChannel = (SocketChannel) key.channel();                    socketChannel.read(byteBuffer);                    byteBuffer.flip();                    System.out.println("server received message: " + getString(byteBuffer));                    byteBuffer.clear();                    String message = "server sending message " + System.currentTimeMillis();                    System.out.println("server sending message: " + message);                    byteBuffer.put(message.getBytes());                    byteBuffer.flip();                    socketChannel.write(byteBuffer);                }            }        }    }    private static String getString(ByteBuffer buffer) {        Charset charset;        CharsetDecoder decoder;        CharBuffer charBuffer;        try {            charset = Charset.forName("UTF-8");            decoder = charset.newDecoder();            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());            return charBuffer.toString();        } catch (Exception ex) {            ex.printStackTrace();            return "";        }    }}

客户端

import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SocketChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.Iterator;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;public class SelectorSocketChannel {    public static void main(String[] args) throws Exception {        SocketChannel socketChannel = SocketChannel.open();        socketChannel.configureBlocking(false);        Selector selector = Selector.open();        socketChannel.connect(new InetSocketAddress("localhost", 1234));        socketChannel.register(selector, SelectionKey.OP_CONNECT);        while (true) {            selector.select();            Iterator it = selector.selectedKeys().iterator();            while (it.hasNext()) {                SelectionKey key = (SelectionKey) it.next();                it.remove();                if (key.isConnectable()) {                    if (socketChannel.isConnectionPending()) {                        if (socketChannel.finishConnect()) {                            key.interestOps(SelectionKey.OP_READ);                            sendMessage(socketChannel);                        } else {                            key.cancel();                        }                    }                }                if (key.isReadable()) {                    ByteBuffer byteBuffer = ByteBuffer.allocate(512);                    while (true) {                        byteBuffer.clear();                        int count = socketChannel.read(byteBuffer);                        if (count > 0) {                            byteBuffer.flip();                            System.out.println("client receive message: " + getString(byteBuffer));                            break;                        }                    }                }            }        }    }    private static void sendMessage(SocketChannel socketChannel) throws Exception {        String message = "client sending message " + System.currentTimeMillis();        ByteBuffer byteBuffer = ByteBuffer.allocate(512);        byteBuffer.clear();        System.out.println("client sending message: " + message);        byteBuffer.put(message.getBytes());        byteBuffer.flip();        socketChannel.write(byteBuffer);    }    private static String getString(ByteBuffer buffer) {        Charset charset;        CharsetDecoder decoder;        CharBuffer charBuffer;        try {            charset = Charset.forName("UTF-8");            decoder = charset.newDecoder();            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());            return charBuffer.toString();        } catch (Exception ex) {            ex.printStackTrace();            return "";        }    }}

总结

选择器在Java NIO体系中扮演着关键角色。通过选择器,开发者可以高效地管理多个通道的I/O操作,提升系统性能。选择器的灵活性和扩展性使其在多线程环境中尤为重要。通过选择键、唤醒机制和线程池的配合,选择器能够处理大量的并发连接和数据流量。本文通过服务端和客户端的示例,展示了选择器在实际应用中的强大能力。

转载地址:http://gkjyz.baihongyu.com/

你可能感兴趣的文章
Nginx配置——不记录指定文件类型日志
查看>>
nginx配置一、二级域名、多域名对应(api接口、前端网站、后台管理网站)
查看>>
Nginx配置代理解决本地html进行ajax请求接口跨域问题
查看>>
nginx配置全解
查看>>
Nginx配置参数中文说明
查看>>
nginx配置域名和ip同时访问、开放多端口
查看>>
Nginx配置好ssl,但$_SERVER[‘HTTPS‘]取不到值
查看>>
Nginx配置如何一键生成
查看>>
Nginx配置实例-负载均衡实例:平均访问多台服务器
查看>>
Nginx配置文件nginx.conf中文详解(总结)
查看>>
Nginx配置负载均衡到后台网关集群
查看>>
ngrok | 内网穿透,支持 HTTPS、国内访问、静态域名
查看>>
NHibernate学习[1]
查看>>
NHibernate异常:No persister for的解决办法
查看>>
NIFI1.21.0_Mysql到Mysql增量CDC同步中_日期类型_以及null数据同步处理补充---大数据之Nifi工作笔记0057
查看>>
NIFI1.21.0_NIFI和hadoop蹦了_200G集群磁盘又满了_Jps看不到进程了_Unable to write in /tmp. Aborting----大数据之Nifi工作笔记0052
查看>>
NIFI1.21.0通过Postgresql11的CDC逻辑复制槽实现_指定表多表增量同步_增删改数据分发及删除数据实时同步_通过分页解决变更记录过大问题_02----大数据之Nifi工作笔记0054
查看>>
NIFI从MySql中增量同步数据_通过Mysql的binlog功能_实时同步mysql数据_根据binlog实现数据实时delete同步_实际操作04---大数据之Nifi工作笔记0043
查看>>
NIFI从MySql中增量同步数据_通过Mysql的binlog功能_实时同步mysql数据_配置binlog_使用处理器抓取binlog数据_实际操作01---大数据之Nifi工作笔记0040
查看>>
NIFI从MySql中增量同步数据_通过Mysql的binlog功能_实时同步mysql数据_配置数据路由_实现数据插入数据到目标数据库_实际操作03---大数据之Nifi工作笔记0042
查看>>