使用Netty传输大文件内容

最近又开始需要使用netty进行网络通信方面的编程开发了。于是遇到了一些问题通过查找好多资料记录下来。

 

做的内容大致是:客户端向服务端发送一条命令,服务端接收到之后,根据命令里面的一些信息去读取服务器上的一些文件并把文件内容(文件的内容类似于数据库中的一行一行的数据,是以行存储的,每个字段值以\t分割,每条数据为一行)发送给客户端处理(我这里的样例暂以获取数据之后按行保存入文件中)。

 

1、客户端服务端的代码

 

cmdLog = getSearchCmd();  
        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),  
                Executors.newCachedThreadPool());  
        ClientBootstrap bootstrap = new ClientBootstrap(factory);  
        final ClientBufferHandler clientHandler =  new ClientBufferHandler(cmdLog, getEncoding());  
        final DelimiterBasedFrameDecoder clientDecoder = new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, false, true, ChannelBuffers.copiedBuffer("\r\n", Charset.defaultCharset()));  
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
            @Override  
            public ChannelPipeline getPipeline() throws Exception {  
                return Channels.pipeline(clientDecoder, clientHandler);  
            }  
        });  
 
        bootstrap.setOption("tcpNoDelay", true);  
        bootstrap.setOption("keepAlive", true);  
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(serverHost, serverPort));  
        future.awaitUninterruptibly();  
        if (!future.isSuccess())  
        {  
            future.getCause().printStackTrace();  
        }  
        future.getChannel().getCloseFuture().awaitUninterruptibly();  
        factory.releaseExternalResources();

cmdLog是客户端要发送的命令,getEncoding()是因为每个服务端要读取的文件可能是不同的编码,客户端这边传过去之后通过这个来编码。

有两个handler,下面会介绍,其他的都是很常规的这里就不多说了。

2、服务端代码

 

ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool()  
                ,Executors.newCachedThreadPool());  
        ServerBootstrap bootstrap = new ServerBootstrap(factory);  
        bootstrap.setPipelineFactory(new ChannelPipelineFactory()  
        {  
            public ChannelPipeline getPipeline()  
            {  
                ChannelPipeline pipeline = Channels.pipeline(  
                        new ServerDecoderHandler(),  
                        new FileSearchHandler());  
                return pipeline;  
            }  
        });  
        bootstrap.setOption("child.tcpNoDelay", true);  
        bootstrap.setOption("child.keepAlive", true);  
        bootstrap.bind(new InetSocketAddress(8027));

服务端的代码也很简单,后面会详细介绍handler。

 

3、先看客户端的handler,一个是ClientBufferHandler,这个是用来发送命令并接收服务端响应handler。

 

ClientBufferHandler extends SimpleChannelHandler
private static final String testPath = "F:/ test/test";  
 
private String cmd;  
private String encoding;  
public ClientBufferHandler(String cmd, String encoding)  
    {  
        this.cmd = cmd;  
        this.encoding = encoding;  
    }  
 
@Override  
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception  
    {  
        int cmdLength = cmd.getBytes().length;  
        ChannelBuffer cmdBuffer = ChannelBuffers.buffer(cmdLength+4);  
        cmdBuffer.writeInt(cmdLength);  
        cmdBuffer.writeBytes(cmd.getBytes());  
        e.getChannel().write(cmdBuffer);  
    }  
@Override  
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception  
    {  
ChannelBuffer buf = (ChannelBuffer) e.getMessage();  
        if(!buf.readable())  
        {  
            return;  
        }  
 
        FileHelper.writeFile(testPath, buf.toString(Charset.forName(encoding)));

重写channelConnected方法,发送命令。我这里在发送命令前面跟了四个字节的命令长度,保证服务端一次接收到所有的命令信息。

 

重写messageReceived方法,接收服务端获取的信息存入文件中。FileHelper.writeFile是把字符串追加写入文件的工具方法我就不放出了,实现方法还是很多的。

 

在这个之前还有一个解码器,因为服务端发送过来的数据都是按行发送的(每行结尾是\r\n),所以使用netty提供的一个解码器DelimiterBasedFrameDecoder实现按分隔符分割接收到的数据,构造方法见客户端的代码。保证获取的数据每次都是完整的一行。这里感谢一下http://blog.163.com/linfenliang@126/blog/static/12785719520121082103807/提供的netty的分包、组包、粘包处理机制。

 

4、然后是服务端的handler。

 

首先是ServerDecoderHandler解码器,保证能够读取完整的命令并把命令前的四个字节用来标识命令长度的内容丢掉。

 

public class ServerDecoderHandler extends FrameDecoder  
{  
@Override  
    protected Object decode(ChannelHandlerContext ctx, Channel c, ChannelBuffer buf) throws Exception  
    {  
        int length = 4;  
        if(buf.readableBytes() < length)  
        {  
            return null;  
        }  
        byte[] header = new byte[length];  
        buf.markReaderIndex();  
        buf.readBytes(header);  
        int cmdLength = (header[0] & 0xFF) << 24 | (header[1] & 0xFF) << 16 | (header[2] & 0xFF) << 8 | (header[3] & 0xFF);  
        if (cmdLength != 0)  
        {  
            if (buf.readableBytes() < cmdLength)  
            {  
                buf.resetReaderIndex();  
                return null;  
            }  
            length += cmdLength;  
        }  
        buf.resetReaderIndex();  
        buf.readerIndex(4);  
        return buf.readBytes(cmdLength);  
    }  
}

这部分代码内容比较简单就不多做说明了。

 

然后是FileSearchHandler的代码。

 

public class CdrFileSearchHandler extends SimpleChannelUpstreamHandler  
{  
@Override  
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception  
    {  
        ChannelBuffer buf = (ChannelBuffer) e.getMessage();  
        String cmd = buf.toString(Charset.defaultCharset());  
        logger.info("查询命令:\r\n" + cmd);  
 
        // 返回文件内容  
        Channel ch = e.getChannel();  
ChannelFuture f = null;  
        final BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(getPath(cmd))));  
        String line = "";  
        while (line != null)  
        {  
            line = reader.readLine();  
            if (line != null)  
            {  
                ChannelBuffer returnBuf = ChannelBuffers.dynamicBuffer();  
                returnBuf.writeBytes((line + "\r\n").getBytes());  
                f = ch.write(returnBuf);  
            }  
        }  
        if(line == null)  
        {  
            f.addListener(new ChannelFutureListener()  
            {  
                @Override  
                public void operationComplete(ChannelFuture future) throws Exception  
                {  
                    reader.close();  
                    Channel ch = future.getChannel();  
                    ch.close();  
                }  
            });  
        }  
    }

这部分代码其实也很简单,就是获取到命令,根据命令选择文件(这部分代码省略了),按行读取文件,然后加上\r\n然后写入发送。当line不为null时则读取到了数据,如果为null则说明没有读取到数据,跳出循环,并且添加监听器,当发送完关闭各种链接。(之所以会这样判断是因为netty本身是多次调用messageReceived的,需要在发送完最后一条数据的时候关闭连接。)

 

这种内容发送的方式只适用与文件内容比较小可以,但是youyu 一般的handler都是同步执行的,一旦文件内容很大,就会因为文件读取耗时较长导致Worker线程不能及时返回处理其它请求,对性能影响较高,从而导致内存溢出等问题。

 

这个时候就需要使用netty提供的一个handler,ExecutionHandler,ExecutionHandler就是为这种情况设计了,它提供了一种异步处理任务的机制,将它之后handler处理以任务的形式投递到线程池中并直接返回。ExecutionHandler不像其它Handler都是独立的,它是所有Handler共享使用。其使用OrderedMemoryAwareThreadPoolExecutor线程池来保证同一个Channel上事件的先后顺序。

所以在服务端的代码处需要修改代码如下:

 

bootstrap.setPipelineFactory(new ChannelPipelineFactory()  
        {  
            public ChannelPipeline getPipeline()  
            {  
                ChannelPipeline pipeline = Channels.pipeline(  
                        new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)),  
                        new ServerDecoderHandler(),  
                        new CdrFileSearchHandler());  
                return pipeline;  
            }  
        });

增加一个ExecutionHandler的handler,即可处理。

 

 

注意ExecutionHandler一定要在不同的pipeline 之间共享。它的作用是自动从ExecutionHandler自己管理的一个线程池中拿出一个线程来处理排在它后面的业务逻辑handler。而 worker线程在经过ExecutionHandler后就结束了,它会被ChannelFactory的worker线程池所回收。

它的构造方法是ExecutionHandler(Executor executor) ,很显然executor就是ExecutionHandler内部管理的线程池了。netty额外给我们提供了两种线程池:
MemoryAwareThreadPoolExecutor和OrderedMemoryAwareThreadPoolExecutor,它们都在org.jboss.netty.handler.execution 包下。
MemoryAwareThreadPoolExecutor 确保jvm不会因为过多的线程而导致内存溢出错误,OrderedMemoryAwareThreadPoolExecutor是前一个线程池的子类,除 了保证没有内存溢出之外,还可以保证channel event的处理次序。具体可以查看API文档,上面有详细说明。

 

总结一下:写这些东西用了一天的时间,其实多数时候都是在测试大文件的问题,网上好多源码分析的文章,都很浅,真正说每种buffer怎么用,每种handler的用法的很少。而详细讲源码的文章多数也都是抄来抄去,但是也还是有好多不错的。所以遇到什么问题多找找资料,在自己探索一下应该都可以解决的。(其实api上面的东西还是不少的,有时间研究的话应该好好看看netty的源码有助于更好的使用netty开发)

 

最后说下这里使用的版本是3.6的。

SVN分支合并的使用记录

 

  • 写在前面
  • 一些相关的概念和原理
  • 进行分支开发的最佳实践
  • 合并的分类
  • eclipse 中进行合并操作
  • 相关资源

 

写在前面

本文是由演讲整理而来的,介绍了 svn 分支与合并的概念、流程和一些实际操作方法,适合对版本控制有基本认识然后想了解 SVN 分支与合并的使用方法的读者。

对应 SVN 版本最低为 1.5,因为分支、合并的很多功能都是 1.5(release notes ) 才加进来的。

 

一些相关的概念和原理

  • 分支(branch)和标记(tag)对于 SVN 来说就只是副本(copy),没有任何其它意义。分支和标记的意义是我们人为给予的。
  • SVN 的副本是通过”cheap copies “来实现的,建立一个副本就类似 Unix 中创建一个硬链接(hard link),空间和时间的消耗都是固定并且很小的,因此不必太过担心副本太多而导致性能问题。
  • SVN 的文件储存 是通过差异(diff)来实现的,底层储存方法有两种:1、Berkeley DB,完整保存一个文件的最新版本(revision),旧版本通过反向差异(reverse diffs)来获取。2、FSFS,跟 BDB 相反,完整保存一个文件的初始版本,后续版本通过正向差异来获取。当然,为了避免版本太多而造成性能下降,SVN 还使用了”skip-deltas “来减少需要追溯的版本数。
  • SVN 属性(property )可以附带在文件、目录和版本(revision)上。文件和目录的属性类似文件内容,会被记录进版本库中的,例如每次提交时的注释,其实就是该版本的一个属性 svn:log。以”svn:”开头的属性是系统预留的,用户不应该自定义这样的属性。

 

进行分支开发的最佳实践

  • 做分支上做开发的时候,必须定期使分支与主干同步,避免开发完成后合并(merge)回主干时出现严重冲突(confict)。
  • 进行合并前,处理掉工作副本上的所有本地修改,方便合并失败时进行回滚(revert)。
  • 进行合并时,特别注意 新增/删除 操作,因为很多冲突都是这类操作引起的。
  • 完成一个分支的功能并合并回主干后,抛弃该分支,后续其它功能的开发使用新建的分支。当然,也有办法继续使用该分支。

 

合并的分类

1、从主干到分支

 

 

Svn代码  收藏代码
  1. svn merge [-r M:N] ^/trunk

假设”^/trunk”是主干的 URL,当前目录为分支的工作副本。该命令同步主干的最新修改到当前工作副本,用于使分支跟主干保持同步。SVN 会通过 svn:mergeinfo 属性来记录当前工作副本已经合并过的版本号,然后在每次合并时选择合适的(eligible)版本进行合并。当然,也可以自己手动指定合并版本M到N的修改。

 

2、从分支到主干

 

 

Svn代码  收藏代码
  1. svn merge –reintegrate ^/branches/quota

假设”^/branches/quota”是分支的 URL,当前目录为主干的工作副本。该命令将分支的最新版本(@HEAD)跟主干的最新版本进行比较,将差异实施到当前工作副本,用于将在分支上完成的工作合并回主干。

分支使用 –reintegrate 合并回主干后,如果继续在该分支上开发,当需要同步主干的修改到分支过来时,默认会包括之前 reintegrate 的修改,而这些修改已经在分支上做过了,所以这样往往会导致冲突。这也是前面“最佳实践”中最后一个建议的一个原因。当然,想要使这个分支继续可用也是可以的,这就需要使用下面这第三种合并。

 

3、仅记录的合并

 

 

Svn代码  收藏代码
  1. svn merge -c 25 –record-only ^/trunk

假设当前目录为分支的工作副本,该命令将主干的版本25标记为已合并到当前工作副本,但并不会进行实质性的合并,这样下次合并主干到分支时,该版本的修改就会被跳过,避免修改被重复实施导致的冲突。其实这种合并就是改一下 svn:mergeinfo 而已,但直接修改太危险了,所以弄了这样一个所谓合并来规范操作。

 

在 Eclipse 中进行合并操作

Subclipse

在 Eclipse 中有两个比较流行的 SVN 插件:Subclipse 和 Subversive,关于两者的讨论有很多,例如这里 。本文只介绍 Subclipse。

 

上图是 Subclipse 进行合并操作时的界面,该图所对应的操作是:将 trunk 上版本 8 至今的修改同步到工作副本 pearbranch,也就是分支 branches/quake。这里可以发现几个问题:

  • 不能进行自动合并,必须手工指定版本号。
  • 不能进行仅记录的合并
  • 不能直接进行 –reintegrate 的合并

 

CollabNet Merge Client

上述 Subclipse 的不足,应该是因为 Subclipse 默认的合并实现是基于 SVN 1.4 之前的,那时还没有 svn:mergeinfo、–reintegrate 和 –record-only 呢。要支持这些 1.5 的新特性,可以安装 CollabNet Merge Client。

 

CollabNet Merge Client 是 Subclipse 的一个可选功能,其实就是一个增强的、支持新特性的合并实现,如上图所示,它的优点有:

  • 支持合并信息自动跟踪和自动合并
  • 支持 –reintegrate 和 –record-only
  • 合并前能对工作副本进行检查

 

相关资源

SVN 设计思想

主要是看 Bubble-Up Method 这一节,是 revision 的基本原理。

 

『Version Control with Subversion』

官方手册,学习 SVN 必读。

Netty学习(2)——Netty使用实例

以下两个例子基于netty-3.5.7.Final.jar用Junit进行测试

第一个例子:简单的发送字符串,接收字符串“Hello, World”

class HelloWorldServerHandler extends SimpleChannelHandler {  
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)  
            throws Exception {  
        e.getChannel().write("Hello, World");  
    }  
 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
        System.out.println("Unexpected exception from downstream."  
                + e.getCause());  
        e.getChannel().close();  
    }  
}  
 
class HelloWorldClientHandler extends SimpleChannelHandler {  
 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
        String message = (String) e.getMessage();  
        System.out.println(message);  
        e.getChannel().close();  
    }  
 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
        System.out.println("Unexpected exception from downstream."  
                + e.getCause());  
        e.getChannel().close();  
    }  
}  
 
 
/** 
 * Netty VS MinaNetty基于Pipeline处理,Mina基于Filter过滤 
 * Netty的事件驱动模型具有更好的扩展性和易用性 
 * Https,SSL,PB,RSTP,Text &Binary等协议支持 
 * Netty中UDP传输有更好的支持官方测试Netty比Mina性能更好 
 * @author Administrator 
 * 
 */  
public class TestCase {  
 
    public void testServer() {  
        //初始化channel的辅助类,为具体子类提供公共数据结构  
        ServerBootstrap bootstrap = new ServerBootstrap(  
                new NioServerSocketChannelFactory(  
                        Executors.newCachedThreadPool(),  
                        Executors.newCachedThreadPool()));  
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
            public ChannelPipeline getPipeline() {  
                ChannelPipeline pipeline = Channels.pipeline();  
                pipeline.addLast("decoder", new StringDecoder());  
                pipeline.addLast("encoder", new StringEncoder());  
                pipeline.addLast("handler", new HelloWorldServerHandler());  
                return pipeline;  
            }  
        });  
        //创建服务器端channel的辅助类,接收connection请求  
        bootstrap.bind(new InetSocketAddress(8080));  
    }  
 
 
 
    public void testClient() {  
        //创建客户端channel的辅助类,发起connection请求   
        ClientBootstrap bootstrap = new ClientBootstrap(  
                new NioClientSocketChannelFactory(  
                        Executors.newCachedThreadPool(),  
                        Executors.newCachedThreadPool()));  
        //It means one same HelloWorldClientHandler instance is going to handle multiple Channels and consequently the data will be corrupted.  
        //基于上面这个描述,必须用到ChannelPipelineFactory每次创建一个pipeline  
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
            public ChannelPipeline getPipeline() {  
                ChannelPipeline pipeline =  Channels.pipeline();  
                pipeline.addLast("decoder", new StringDecoder());  
                pipeline.addLast("encoder", new StringEncoder());  
                pipeline.addLast("handler", new HelloWorldClientHandler());  
                return pipeline;  
            }  
        });  
        //创建无连接传输channel的辅助类(UDP),包括client和server  
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(  
                "localhost", 8080));  
        future.getChannel().getCloseFuture().awaitUninterruptibly();  
        bootstrap.releaseExternalResources();  
    }  
 
 
    @Test  
    public void testNetty(){  
        testServer();  
        testClient();  
    }  
 
}

第二个例子,实际应用中会用到这个,发送POJO类Persons [name=周杰伦123, age=31, salary=10000.44]

/** 
 * 用POJO代替ChannelBuffer 
 */  
 
class TimeServerHandler3 extends SimpleChannelHandler {    
 
    @Override    
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)    
            throws Exception {    
        Persons person = new Persons("周杰伦123",31,10000.44);  
        ChannelFuture future = e.getChannel().write(person);    
        future.addListener(ChannelFutureListener.CLOSE);    
    }    
}    
 
class TimeClientHandler3 extends SimpleChannelHandler{    
 
    @Override    
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)    
            throws Exception {    
        Persons person = (Persons)e.getMessage();    
        System.out.println(person);    
        e.getChannel().close();    
    }    
}  
 
/** 
 * FrameDecoder and ReplayingDecoder allow you to return an object of any type. 
 *  
 */  
class TimeDecoder extends FrameDecoder {    
    private final ChannelBuffer buffer = dynamicBuffer();  
 
    @Override    
    protected Object decode(ChannelHandlerContext ctx, Channel channel,    
            ChannelBuffer channelBuffer) throws Exception {    
        if(channelBuffer.readableBytes()<4) {    
            return null;    
        }    
        if (channelBuffer.readable()) {  
            // 读到,并写入buf  
            channelBuffer.readBytes(buffer, channelBuffer.readableBytes());  
        }  
        int namelength = buffer.readInt();  
        String name = new String(buffer.readBytes(namelength).array(),"GBK");  
        int age = buffer.readInt();  
        double salary = buffer.readDouble();  
        Persons person = new Persons(name,age,salary);  
        return person;    
    }    
 
}    
 
class TimeEncoder extends SimpleChannelHandler {    
    private final ChannelBuffer buffer = dynamicBuffer();  
 
    @Override    
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)    
            throws Exception {    
        Persons person = (Persons)e.getMessage();    
        buffer.writeInt(person.getName().getBytes("GBK").length);  
        buffer.writeBytes(person.getName().getBytes("GBK"));  
        buffer.writeInt(person.getAge());  
        buffer.writeDouble(person.getSalary());  
        Channels.write(ctx, e.getFuture(), buffer);    
    }    
}  
 
class Persons{  
    private String name;  
    private int age;  
    private double salary;  
 
    public Persons(String name,int age,double salary){  
        this.name = name;  
        this.age = age;  
        this.salary = salary;  
    }  
 
    public String getName() {  
        return name;  
    }  
    public void setName(String name) {  
        this.name = name;  
    }  
    public int getAge() {  
        return age;  
    }  
    public void setAge(int age) {  
        this.age = age;  
    }  
    public double getSalary() {  
        return salary;  
    }  
    public void setSalary(double salary) {  
        this.salary = salary;  
    }  
 
    @Override  
    public String toString() {  
        return "Persons [name=" + name + ", age=" + age + ", salary=" + salary  
                + "]";  
    }  
 
 
}  
 
public class TestCase5 {  
    public void testServer() {  
          ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());    
            ServerBootstrap bootstrap = new ServerBootstrap(factory);    
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {    
 
                public ChannelPipeline getPipeline() throws Exception {    
                    return Channels.pipeline(new TimeEncoder(), new TimeServerHandler3());    
                }    
            });    
            bootstrap.setOption("child.tcpNoDelay", true);    
            bootstrap.setOption("child.keepAlive", true);    
 
            bootstrap.bind(new InetSocketAddress("localhost",9999));   
    }  
 
    public void testClient(){  
        //创建客户端channel的辅助类,发起connection请求   
        ClientBootstrap bootstrap = new ClientBootstrap(  
                new NioClientSocketChannelFactory(  
                        Executors.newCachedThreadPool(),  
                        Executors.newCachedThreadPool()));  
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
            public ChannelPipeline getPipeline() {  
                ChannelPipeline pipeline =  Channels.pipeline();  
                pipeline.addLast("decoder", new TimeDecoder());  
                pipeline.addLast("encoder", new TimeEncoder());  
                pipeline.addLast("handler", new TimeClientHandler3());  
                return pipeline;  
            }  
        });  
        //创建无连接传输channel的辅助类(UDP),包括client和server  
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(  
                "localhost", 9999));  
        future.getChannel().getCloseFuture().awaitUninterruptibly();  
        bootstrap.releaseExternalResources();  
    }  
 
    @Test  
    public void testNetty() {  
            testServer();  
            testClient();  
    }  
}

这两段代码是学习的时候参考别人的代码,转于哪想不起来了,但是这两段代码让我了解了netty的通信流程。

Netty学习(1)

其实到这家公司也有一段时间了,现在的公司是个做通信的公司,所以有一些知识不得不去学习。
刚来不久就要求用netty做一个通信的模拟。我来这个公司之前连socket是什么都不知道,突然有点想要放弃,但是既然来了便也希望能够安下心去学习些东西好好工作
既然不得不学习了,那也只有硬着头皮去扣了。这里只是我对自己的一个知识梳理。
首先是SOCKET:
我的理解,就是描述ip地址,端口号的,服务端监听,客户端生成socket连接服务器,服务端生成一个socket,就可以进行通信了。
然后是NIO:
NIO有四大主要概念:Channel, Selector, SelectionKey, Buffer。Netty中同样也少不了这些,并且出于性能和设计上的考虑,对它们进行了扩充。比如netty的buffer包,重新定义了Buffer,提供了各种各样的实现,比jdk中自带的Buffer更加高效与好用。举例来说,jdk中的buffer在读完数据写之前,必须调用一个flip()方法,否则数据出错但不报错,这很容易引起bug。而Netty的Buffer中增加了一个write index,将读写分开,解决了这个问题。另外,jdk的buffer是固定的,而netty还提供了一个可变长的buffer,方便操作。而Channel在Netty中也重新设计,提供了很多种Channel,照顾到不同程序的需要。而Selector和SelectionKey,还是使用jdk自己的。
重要的地方,是Netty定义了很多事件,如connected, messageReceived, messageSent等等。我们可以继承一个Handler,来实现对应的操作,它们将在对应事件发生的时候,被自动调用。这就是“事件驱动”。
为了简化操作,我们还可以使用Encoder和Decoder将传送于网络间的二进制数据与我们操作的pojo进行互转,这样在程序中无须直接操作底层数据。
Netty提供了不少协议的实现,如http等,让Netty有了初步的处理http请求的能力。其http的encoder和decoder,在底层处理http请求的交互数据,将其变为了request, cookie, response等对象。因此现在有很多web框架尝试将Netty作为自己内嵌的http服务器,以达到更佳的性能与方便的操作。不过据我观察,Netty提供的支持还比较简单(如request, response等类中,方法很少),如果真要使用,还需要对它进行扩展。
Netty使用一个Boss线程来处理新连接,而将数据读取和处理等,分给若干个Worker线程。它在内部其实还是使用了“轮询”,去调用selector.select()得到可用事件,最终转变为对Hander中相关回调方法的调用。
我对NIO的理解:
四大概念。对于io操作增加了buffer缓冲区,还有就是一个非阻塞。
非阻塞:应该就是如果有数据什么的需要处理,相应的事件驱动会被调用,不需要轮询。

Channel、Buffer

Selector:这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。