Voovan开发指南 (二) Socket客户端开发

Wesley13
• 阅读 409

Voovan 框架介绍

Voovan开源项目启动于2015年,始于自己在使用 Netty 和 Mina 时有较多难以理解的部分,同时在使用过程中遇到对粘包等问题的困扰,后来经过不断的对源码的学习以及对 java 异步通信的深入理解发现 自 java 1.7以后 JDK 提供了更优秀的异步通信模型 AIO,随后决定自己参照 AIO 模型重新造一个轮子。并在开发的过程中对使用到的各类工具方法等做了整理,形成了一个常用并且简单易用的工具包。

  • 可灵活实现Socket通信粘包的支持(代码中包含 HTTP协议,字符串换行,定长报文的粘包实现)。
  • 支持 SSL/TLS 加密通信。
  • 提供线程池依据系统负载情况自动动态调整。
  • 同时支持 NIO 和 AIO 特性。
  • 采用非阻塞方式的异步传输。
  • 事件驱动(Connect、Recive、Sent、Close、Exception),采用回调的方式完成调用。
  • 可灵活的加载过滤器机制。

###步骤介绍###

想要发起一个 Socket 连接仅仅需要一下四个步骤:

  • 实例化一个Socket对象: AioSocket 或 NioSocket 或 UdpSocket,用于连接。
  • 实例化一个消息分割器用来处理粘包问题。
  • 实例化一个过滤器,IoFilter。
  • 实例化一个Socket业务处理句柄: IoHandle。

###Step1: 实现一个Socket连接对象### 实例化Socket连接有两个类可以采取实例化动作.

  • AioSocket: 采用 JDK 的 AIO 模型的异步通信,JDK > 1.7。
  • NioSocket: 采用 JDK 的 NIO 模型的异步通信,JDK > 1.4。
  • UdpSocket: 采用 JDK 的 UDP 模型的异步通信,JDK > 1.4。

下面我们来看看这两个类的构造方法:

public NioSocket(String host,int port,int readTimeout) throws IOException
public AioSocket(String host,int port,int readTimeout) throws IOException
public UdpSocket(String host,int port,int readTimeout) throws IOException

我们可以看到这两个类的构造方法都具有三个参数:

  • host: 服务发布地址。
  • port: 服务发布端口。
  • readTimeout: 读取超时时间

下面我们来实例化一个Socket连接对象:

AioSocket socket = new AioSocket("127.0.0.1",2031,300);

实际使用中如果你想构造一个 Nio 模型的 Socket 连接,请将 AioSocket 替换成 NioSocket 即可。


###Step2: 实现一个消息分割器### 消息分割器是用来处理消息粘包的一个补充类,相对于 Netty 和 Mina 是一个特殊的地方. 注意:消息分割器是工作在过滤器之前的. 消息分割器是在 Socket 连接器接受到消息后对消息的内容进行判断是否是一个完成消息报文,如果是一个完成消息报文则返回给过滤器来处理,如果不是则等待消息报文被完整接受,如果一直接受的消息报文都不完整则一直等待,这个时候我们可以通过超时来控制尝试间接收不到消息的情况,具体参考TimeOutMesssageSplitter分割器的实现,也可以直接实例化这个分割器在你自定义的分割器中通过业务代码来判断是否需要使用超时。

下面我们给第一步实例化的 Socket 连接对象增加一个分割器:

socket.messageSplitter(new LineMessageSplitter());

Voovan 框架已经包括了一些消息分割器的实现在org.voovan.network.messagesplitter包内。

  • BufferLengthSplitter: 消息定长分割器
  • LineMessageSplitter: 换行消息分割器
  • HttpMessageSplitter: Http1.1消息分割器
  • TimeOutMesssageSplitter: 超时消息分割器

自定义一个消息过滤器需要实现MessageSplitte接口,接口的源码:

package org.voovan.network;

public interface MessageSplitter {

    public int canSplite(IoSession session,byte[] buffer);
    
}

通过源码我们可以发现,如果想实现一个消息分割器我们需要实现一个canSplite方法:

  • canSplite 方法: 判断消息是否可分割。 这两个方法有两个相同的参数:
  • IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。
  • buffer参数: Socket 接收到的所有字节。 ***`返回对象:如果消息是可分割的则返回 true,buffer 参数中的字节将会交给过滤器来处理,如果返回 false 则继续等待 Socket 接受新的字节。

下面我们给出框架实现的TimeOutMesssageSplitter的源码以供参考:

public class TimeOutMesssageSplitter implements MessageSplitter {

   private long initTime;
   
   public TimeOutMesssageSplitter(){
       initTime = -1;
   }
   
   @Override
   public int canSplite(IoSession session, byte[] buffer) {
        int timeOut = session.sockContext().getReadTimeout();
       long currentTime = System.currentTimeMillis();
       if(initTime==-1){
               initTime = currentTime;
       } 
    
       if(currentTime-initTime >= timeOut){
               return byteBuffer.limit();
       }else{
               return -1;
       }
   }

}

###Step3: 实现一个过滤器### 过滤器可以在 Socket 通信中对传递的字节流进行解码和编码操作,比如:我们传递的报文是 JSON 数据格式,那么我们可以通过实现一个过滤器在发送一个对象作为消息时将对象转换成 JSON 字符串通过 Socket 发送,同时在Socket接受到消息后将接收到的 JSON 字符传转换成对象。

我们可以定义多个过滤器形成一个过滤器链,这样可以提高部分过滤器的复用性. 在第一步实例化好的Socket连接对象中调用增加过滤器方法可以向 Socket 连接对象增加过滤器。 增加的过滤器在过滤器链中是有先后顺序的,例如:在使用 add 方法加入的过滤器则在过滤器的最后一个.在解码的过程中过滤器的方法 decode 时是按照加入的从第一个到最后一个的顺序调用的.在编码的过程中过滤器方法 encode 是按照最后一个到第一个的顺序调用的。

下面我们给第一步实例化的 Socket 连接对象增加一个过滤器:

socket.filterChain().add(new StringFilter());

其中我们通过socket.filterChain()获取过滤器链,然后通过过滤器链的 add 方法增加一个名为StringFilter的过滤器。

Voovan 框架已经包括了一个过滤器的实现: StringFilter过滤器,用于将字节流转换成字符串。

如果我们要根据自己的需求定义一个自定义过滤器,那么我们的过滤器实现一个 IoFilter 接口. 下面我们给出 IoFilter 接口的源码:

package org.voovan.network;

import org.voovan.network.exception.IoFilterException;

public interface IoFilter {

public Object decode(IoSession session,Object object) throws IoFilterException;

public Object encode(IoSession session,Object object)throws IoFilterException;
}

通过源码我们可以发现,如果想实现一个过滤器我们需要实现两个过滤器方法:

  • decode 方法: 过滤器解码函数,接收事件(onRecive)前调用
  • encode 方法: 过滤器编码函数,发送事件(onSend)前调用 这两个方法有两个相同的参数:
  • IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等.
  • object参数: 上一个过滤器的处理结果,如果只有一个过滤器则是业务代码中发送数据对象. ***返回对象 *** 过滤器处理过的返回结果,被下一个过滤器调用,如果是最后一个过滤器那么这个结果则会传入Socket业务处理句柄的 onRecive 方法。

下面我们给出框架实现的StringFilter的源码以供参考:

public class StringFilter implements IoFilter {

    @Override
    public Object encode(IoSession session,Object object) {
        if(object instanceof String){
            String sourceString = TObject.cast(object);
            return ByteBuffer.wrap(sourceString.getBytes());
        }
        return object;
    }

    @Override
    public Object decode(IoSession session,Object object) {
        if(object instanceof ByteBuffer){
            return TByteBuffer.toString((ByteBuffer)object);
        }
        return object;
    }
}

###Step4: 实现一个Socket业务处理句柄### 定义Socket 业务处理句柄需要实现IoHandler接口. 下面我们给出 IoFilter 接口的源码:

package org.voovan.network;

public interface IoHandler {
    public Object onConnect(IoSession session);
    
    public void onDisconnect(IoSession session);
    
    public Object onReceive(IoSession session,Object obj);
    
    public void onSent(IoSession session,Object obj);    
    
    public void onException(IoSession session,Exception e);
}

下面我们对5个方法做逐个说明:

 public Object onConnect(IoSession session);

当Socket 连接成功后会回调这个方法。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。 ***返回值:***返回一个对象,这个对象将会由 Socket 进行发送,如果返回 null 则不发送任何数据。

 public void onDisconnect(IoSession session);

当Socket 连接断开后会回调这个方法。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。

 public Object onReceive(IoSession session,Object obj);

当Socket 接受到数据,并且经过消息分割器分割后再经过过滤器的decode方法处理后的数据。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。 obj参数: 接受的数据,这个数据是经过消息分割器和过滤器处理后的数据。 ***返回值:***返回一个对象,这个对象将会由 Socket 进行发送,如果返回 null 则不发送任何数据。

 public void onSent(IoSession session,Object obj);

当Socket 发送成功后会回调这个方法. IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等. obj参数: 发送的数据,这个数据是经过过滤器处理后的数据。

 public void onException(IoSession session,Exception e);

当Socket 处理过程中发生异常则回调这个方法。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等. e参数: Exception 对象描述这个异常。

使用 session.close() 来关闭 socket 连接。

下面我们给第一步实例化的 Socket 连接对象增加一个业务处理句柄:

socket.handler(new ClientHandlerTest());

下面我们给出一个实现的样例:

public class ClientHandlerTest implements IoHandler {

   @Override
   public Object onConnect(IoSession session) {
       Logger.simple("onConnect");
       session.setAttribute("key", "attribute value");
       String msg = new String("test message\r\n");
       return msg;
   }

   @Override
   public void onDisconnect(IoSession session) {
       Logger.simple("onDisconnect");
   }

   @Override
   public Object onReceive(IoSession session, Object obj) {
       //+"["+session.remoteAddress()+":"+session.remotePort()+"]"
       Logger.simple("Client onRecive: "+obj.toString());
       Logger.simple("Attribute onRecive: "+session.getAttribute("key"));
       session.close();
       return obj;
   }

   @Override
   public void onException(IoSession session, Exception e) {
       Logger.simple("Client Exception");
       Logger.error(e);
       session.close();
   }

   @Override
   public void onSent(IoSession session, Object obj) {
       ByteBuffer sad = (ByteBuffer)obj;
       sad = (ByteBuffer)sad.rewind();
       Logger.simple("Client onSent: "+new String(sad.array()));
   }
}

###Step5: 启动socket### 完整的服务实例:

public class AioSocketTest {
    
    public static void main(String[] args) throws Exception {
        AioSocket socket = new AioSocket("127.0.0.1",2031,300);
        socket.handler(new ClientHandlerTest());
        socket.filterChain().add(new StringFilter());
        socket.messageSplitter(new LineMessageSplitter());
        socket.start();
        Logger.simple("Terminate");
    }
}

你可能发现我们的过滤器、分割器、业务处理句柄没有按照我们上面的顺序来设置,是的这个设置顺序是没有要求的,只要在 start()方法被调用前设置都可以生效。

点赞
收藏
评论区
推荐文章
Jacquelyn38 Jacquelyn38
1年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Karen110 Karen110
1年前
一篇文章带你了解JavaScript日期
日期对象允许您使用日期(年、月、日、小时、分钟、秒和毫秒)。一、JavaScript的日期格式一个JavaScript日期可以写为一个字符串:ThuFeb02201909:59:51GMT0800(中国标准时间)或者是一个数字:1486000791164写数字的日期,指定的毫秒数自1970年1月1日00:00:00到现在。1\.显示日期使用
Stella981 Stella981
1年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
1年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
1年前
MySQL查询按照指定规则排序
1.按照指定(单个)字段排序selectfromtable_nameorderiddesc;2.按照指定(多个)字段排序selectfromtable_nameorderiddesc,statusdesc;3.按照指定字段和规则排序selec
Stella981 Stella981
1年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
1年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
helloworld_34035044 helloworld_34035044
5个月前
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
3A网络 3A网络
2个月前
开发一个不需要重写成 Hive QL 的大数据 SQL 引擎
开发一个不需要重写成HiveQL的大数据SQL引擎学习大数据技术的核心原理,掌握一些高效的思考和思维方式,构建自己的技术知识体系。明白了原理,有时甚至不需要学习,顺着原理就可以推导出各种实现细节。各种知识表象看杂乱无章,若只是学习
3A网络 3A网络
2个月前
理解 virt、res、shr 之间的关系(linux 系统篇)
理解virt、res、shr之间的关系(linux系统篇)前言想必在linux上写过程序的同学都有分析进程占用多少内存的经历,或者被问到这样的问题——你的程序在运行时占用了多少内存(物理内存)?通常我们可以通过t