网络编程框架t-io的编程基本知识介绍

爱写码
• 阅读 1449

t-io作为目前国内最流行的开源网络编程框架软件,以简单易懂,上手容易而著称,相同的功能比起netty实现起来,要简单的多,代码量也大大减少,如果要使用好t-io,还是要先学习t-io的一些基本知识,这篇文章主要从8个方面介绍了t-io的基础知识。 具体请参考: https://www.wanetech.com/doc/tio/88

t-io收发消息过程

t-io收发消息及处理过程,可以用一张图清晰地表达出来 网络编程框架t-io的编程基本知识介绍

应用层包:Packet

Packet是用于表述业务数据结构的,我们通过继承Packet来实现自己的业务数据结构,对于各位而言,把Packet看作是一个普通的VO对象即可。

注意:不建议直接使用Packet对象,而是要继承Packet

一个简单的Packet可能长这样

package org.tio.study.helloworld.common;
import org.tio.core.intf.Packet;
/**
 * @author tanyaowu
 */
public class HelloPacket extends Packet {
    private static final long serialVersionUID = -172060606924066412L;
    public static final int HEADER_LENGTH = 4;//消息头的长度
    public static final String CHARSET = "utf-8";
    private byte[] body;
    /**
     * @return the body
     */
    public byte[] getBody() {
        return body;
    }
    /**
     * @param body the body to set
     */
    public void setBody(byte[] body) {
        this.body = body;
    }
}

可以结合AioHandler.java理解Packet

import java.nio.ByteBuffer;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
/**
 * 
 * @author tanyaowu 
 * 2017年10月19日 上午9:40:15
 */
public interface AioHandler {
    /**
     * 根据ByteBuffer解码成业务需要的Packet对象.
     * 如果收到的数据不全,导致解码失败,请返回null,在下次消息来时框架层会自动续上前面的收到的数据
     * @param buffer 参与本次希望解码的ByteBuffer
     * @param limit ByteBuffer的limit
     * @param position ByteBuffer的position,不一定是0哦
     * @param readableLength ByteBuffer参与本次解码的有效数据(= limit - position)
     * @param channelContext
     * @return
     * @throws AioDecodeException
     */
    Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException;
    /**
     * 编码
     * @param packet
     * @param tioConfig
     * @param channelContext
     * @return
     * @author: tanyaowu
     */
    ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext);
    /**
     * 处理消息包
     * @param packet
     * @param channelContext
     * @throws Exception
     * @author: tanyaowu
     */
    void handler(Packet packet, ChannelContext channelContext) throws Exception;
}

单条TCP连接上下文:ChannelContext

每一个tcp连接的建立都会产生一个ChannelContext对象,这是个抽象类,如果你是用t-io作tcp客户端,那么就是ClientChannelContext,如果你是用tio作tcp服务器,那么就是ServerChannelContext 网络编程框架t-io的编程基本知识介绍 用户可以把业务数据通过ChannelContext对象和TCP连接关联起来,像下面这样设置属性

ChannelContext.set(String key, Object value)

然后用下面的方式获取属性

 ChannelContext.get(String key)

当然最最常用的还是用t-io提供的强到没对手的bind功能,譬如用下面的代码绑定userid

Tio.bindUser(ChannelContext channelContext, String userid)

然后可以通过userid进行操作,示范代码如下

//获取某用户的ChannelContext集合
SetWithLock<ChannelContext> set = Tio.getChannelContextsByUserid(tioConfig, userid);
//给某用户发消息
Tio.sendToUser(TioConfig, userid, Packet)

除了可以绑定userid,t-io还内置了如下绑定API

  • 无序列表绑定业务id
    Tio.bindBsId(ChannelContext channelContext, String bsId)
  • 绑定token
    Tio.bindToken(ChannelContext channelContext, String token)
  • 绑定群组
    Tio.bindGroup(ChannelContext channelContext, String group)
    ChannelContext对象包含的信息非常多,主要对象见下图 网络编程框架t-io的编程基本知识介绍
  • 说明* ChannelContext是t-io中非常重要的类,他是业务和连接的沟通桥梁!

    服务配置与维护:TioConfig

    场景:我们在写TCP Server时,都会先选好一个端口以监听客户端连接,再创建N组线程池来执行相关的任务,譬如发送消息、解码数据包、处理数据包等任务,还要维护客户端连接的各种数据,为了和业务互动,还要把这些客户端连接和各种业务数据绑定起来,譬如把某个客户端绑定到一个群组,绑定到一个userid,绑定到一个token等。 TioConfig就是解决以上场景的:配置线程池、监听端口,维护客户端各种数据等的。

TioConfig是个抽象类

如果你是用tio作tcp客户端,那么你需要创建ClientTioConfig对象 服务器端对应一个ClientTioConfig对象 如果你是用tio作tcp服务器,那么你需要创建ServerTioConfig 一个监听端口对应一个ServerTioConfig ,一个jvm可以监听多个端口,所以一个jvm可以有多个ServerTioConfig对象 TioConfig对象包含的信息非常多,主要对象见下图 网络编程框架t-io的编程基本知识介绍 如何获取TioConfig对象 见:https://www.wanetech.com/doc/tio/253?pageNumber=1

编码、解码、处理:AioHandler

AioHandler是处理消息的核心接口,它有两个子接口,ClientAioHandler和ServerAioHandler,当用tio作tcp客户端时需要实现ClientAioHandler,当用tio作tcp服务器时需要实现ServerAioHandler,它主要定义了3个方法,见下

import java.nio.ByteBuffer;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
/**
 * 
 * @author tanyaowu 
 * 2017年10月19日 上午9:40:15
 */
public interface AioHandler {
    /**
     * 根据ByteBuffer解码成业务需要的Packet对象.
     * 如果收到的数据不全,导致解码失败,请返回null,在下次消息来时框架层会自动续上前面的收到的数据
     * @param buffer 参与本次希望解码的ByteBuffer
     * @param limit ByteBuffer的limit
     * @param position ByteBuffer的position,不一定是0哦
     * @param readableLength ByteBuffer参与本次解码的有效数据(= limit - position)
     * @param channelContext
     * @return
     * @throws AioDecodeException
     */
    Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException;
    /**
     * 编码
     * @param packet
     * @param tioConfig
     * @param channelContext
     * @return
     * @author: tanyaowu
     */
    ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext);
    /**
     * 处理消息包
     * @param packet
     * @param channelContext
     * @throws Exception
     * @author: tanyaowu
     */
    void handler(Packet packet, ChannelContext channelContext) throws Exception;
}

消息来往监听:AioListener

AioListener是处理消息的核心接口,它有两个子接口:ClientAioListener和ServerAioListener

当用tio作tcp客户端时需要实现ClientAioListener 当用tio作tcp服务器时需要实现ServerAioListener 它主要定义了如下方法

package org.tio.core.intf;
import org.tio.core.ChannelContext;
/**
 *
 * @author tanyaowu
 * 2017年4月1日 上午9:34:08
 */
public interface AioListener {
    /**
     * 建链后触发本方法,注:建链不一定成功,需要关注参数isConnected
     * @param channelContext
     * @param isConnected 是否连接成功,true:表示连接成功,false:表示连接失败
     * @param isReconnect 是否是重连, true: 表示这是重新连接,false: 表示这是第一次连接
     * @throws Exception
     * @author: tanyaowu
     */
    public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception;
    /**
     * 原方法名:onAfterDecoded
     * 解码成功后触发本方法
     * @param channelContext
     * @param packet
     * @param packetSize
     * @throws Exception
     * @author: tanyaowu
     */
    public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception;
    /**
     * 接收到TCP层传过来的数据后
     * @param channelContext
     * @param receivedBytes 本次接收了多少字节
     * @throws Exception
     */
    public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception;
    /**
     * 消息包发送之后触发本方法
     * @param channelContext
     * @param packet
     * @param isSentSuccess true:发送成功,false:发送失败
     * @throws Exception
     * @author tanyaowu
     */
    public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception;
    /**
     * 处理一个消息包后
     * @param channelContext
     * @param packet
     * @param cost 本次处理消息耗时,单位:毫秒
     * @throws Exception
     */
    public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception;
    /**
     * 连接关闭前触发本方法
     * @param channelContext the channelcontext
     * @param throwable the throwable 有可能为空
     * @param remark the remark 有可能为空
     * @param isRemove
     * @author tanyaowu
     * @throws Exception 
     */
    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception;
    /**
     * 连接关闭前后触发本方法
     * 警告:走到这个里面时,很多绑定的业务都已经解绑了,所以这个方法一般是空着不实现的
     * @param channelContext the channelcontext
     * @param throwable the throwable 有可能为空
     * @param remark the remark 有可能为空
     * @param isRemove 是否是删除
     * @throws Exception
     * @author: tanyaowu
     */
//    public void onAfterClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception;
}

服务器端入口:TioServer

这个对象大家稍微了解一下即可,服务器启动时会用到这个对象,简单贴一下它的源代码吧,大家只需要关注它有一个start()方法是用来启动网络服务的即可

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Node;
import org.tio.utils.SysConst;
import org.tio.utils.date.DateUtils;
import org.tio.utils.hutool.StrUtil;
/**
 * @author tanyaowu
 *
 */
public class TioServer {
    private static Logger log = LoggerFactory.getLogger(TioServer.class);
    private ServerTioConfig serverTioConfig;
    private AsynchronousServerSocketChannel serverSocketChannel;
    private AsynchronousChannelGroup channelGroup = null;
    private Node serverNode;
    private boolean isWaitingStop = false;
    /**
     *
     * @param serverTioConfig
     *
     * @author tanyaowu
     * 2017年1月2日 下午5:53:06
     *
     */
    public TioServer(ServerTioConfig serverTioConfig) {
        super();
        this.serverTioConfig = serverTioConfig;
    }
    /**
     * @return the serverTioConfig
     */
    public ServerTioConfig getServerTioConfig() {
        return serverTioConfig;
    }
    /**
     * @return the serverNode
     */
    public Node getServerNode() {
        return serverNode;
    }
    /**
     * @return the serverSocketChannel
     */
    public AsynchronousServerSocketChannel getServerSocketChannel() {
        return serverSocketChannel;
    }
    /**
     * @return the isWaitingStop
     */
    public boolean isWaitingStop() {
        return isWaitingStop;
    }
    /**
     * @param serverTioConfig the serverTioConfig to set
     */
    public void setServerTioConfig(ServerTioConfig serverTioConfig) {
        this.serverTioConfig = serverTioConfig;
    }
    /**
     * @param isWaitingStop the isWaitingStop to set
     */
    public void setWaitingStop(boolean isWaitingStop) {
        this.isWaitingStop = isWaitingStop;
    }
    public void start(String serverIp, int serverPort) throws IOException {
        long start = System.currentTimeMillis();
        this.serverNode = new Node(serverIp, serverPort);
        channelGroup = AsynchronousChannelGroup.withThreadPool(serverTioConfig.groupExecutor);
        serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
        serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 64 * 1024);
        InetSocketAddress listenAddress = null;
        if (StrUtil.isBlank(serverIp)) {
            listenAddress = new InetSocketAddress(serverPort);
        } else {
            listenAddress = new InetSocketAddress(serverIp, serverPort);
        }
        serverSocketChannel.bind(listenAddress, 0);
        AcceptCompletionHandler acceptCompletionHandler = serverTioConfig.getAcceptCompletionHandler();
        serverSocketChannel.accept(this, acceptCompletionHandler);
        serverTioConfig.startTime = System.currentTimeMillis();
        //下面这段代码有点无聊,写得随意,纯粹是为了打印好看些
        String baseStr = "|----------------------------------------------------------------------------------------|";
        int baseLen = baseStr.length();
        StackTraceElement[] ses = Thread.currentThread().getStackTrace();
        StackTraceElement se = ses[ses.length - 1];
        int xxLen = 18;
        int aaLen = baseLen - 3;
        List<String> infoList = new ArrayList<>();
        infoList.add(StrUtil.fillAfter("Tio gitee address", ' ', xxLen) + "| " + SysConst.TIO_URL_GITEE);
        infoList.add(StrUtil.fillAfter("Tio site address", ' ', xxLen) + "| " + SysConst.TIO_URL_SITE);
        infoList.add(StrUtil.fillAfter("Tio version", ' ', xxLen) + "| " + SysConst.TIO_CORE_VERSION);
        infoList.add(StrUtil.fillAfter("-", '-', aaLen));
        infoList.add(StrUtil.fillAfter("TioConfig name", ' ', xxLen) + "| " + serverTioConfig.getName());
        infoList.add(StrUtil.fillAfter("Started at", ' ', xxLen) + "| " + DateUtils.formatDateTime(new Date()));
        infoList.add(StrUtil.fillAfter("Listen on", ' ', xxLen) + "| " + this.serverNode);
        infoList.add(StrUtil.fillAfter("Main Class", ' ', xxLen) + "| " + se.getClassName());
        try {
            RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
            String runtimeName = runtimeMxBean.getName();
            String pid = runtimeName.split("@")[0];
            long startTime = runtimeMxBean.getStartTime();
            long startCost = System.currentTimeMillis() - startTime;
            infoList.add(StrUtil.fillAfter("Jvm start time", ' ', xxLen) + "| " + startCost + " ms");
            infoList.add(StrUtil.fillAfter("Tio start time", ' ', xxLen) + "| " + (System.currentTimeMillis() - start) + " ms");
            infoList.add(StrUtil.fillAfter("Pid", ' ', xxLen) + "| " + pid);
        } catch (Exception e) {
        }
        //100
        String printStr = "\r\n"+baseStr+"\r\n";
        //        printStr += "|--" + leftStr + " " + info + " " + rightStr + "--|\r\n";
        for (String string : infoList) {
            printStr += "| " + StrUtil.fillAfter(string, ' ', aaLen) + "|\r\n";
        }
        printStr += baseStr + "\r\n";
        if (log.isInfoEnabled()) {
            log.info(printStr);
        } else {
            System.out.println(printStr);
        }
    }
    /**
     * 
     * @return
     * @author tanyaowu
     */
    public boolean stop() {
        isWaitingStop = true;
        boolean ret = true;
        try {
            channelGroup.shutdownNow();
        } catch (Exception e) {
            log.error("channelGroup.shutdownNow()时报错", e);
        }
        try {
            serverSocketChannel.close();
        } catch (Exception e1) {
            log.error("serverSocketChannel.close()时报错", e1);
        }
        try {
            serverTioConfig.groupExecutor.shutdown();
        } catch (Exception e1) {
            log.error(e1.toString(), e1);
        }
        try {
            serverTioConfig.tioExecutor.shutdown();
        } catch (Exception e1) {
            log.error(e1.toString(), e1);
        }
        serverTioConfig.setStopped(true);
        try {
            ret = ret && serverTioConfig.groupExecutor.awaitTermination(6000, TimeUnit.SECONDS);
            ret = ret && serverTioConfig.tioExecutor.awaitTermination(6000, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error(e.getLocalizedMessage(), e);
        }
        log.info(this.serverNode + " stopped");
        return ret;
    }
}

客户端入口:TioClient

只有当你在用t-io作为TCP客户端时,才用得到TioClient,此处简单贴一下它的源代码,它的用法,见后面的showcase示范工程

package org.tio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.core.ssl.SslFacadeContext;
import org.tio.core.stat.ChannelStat;
import org.tio.utils.SystemTimer;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.lock.SetWithLock;
/**
 *
 * @author tanyaowu
 * 2017年4月1日 上午9:29:58
 */
public class TioClient {
    /**
     * 自动重连任务
     * @author tanyaowu
     *
     */
    private static class ReconnRunnable implements Runnable {
        ClientChannelContext channelContext = null;
        TioClient tioClient = null;
        //        private static Map<Node, Long> cacheMap = new HashMap<>();
        public ReconnRunnable(ClientChannelContext channelContext, TioClient tioClient) {
            this.channelContext = channelContext;
            this.tioClient = tioClient;
        }
        /**
         * @see java.lang.Runnable#run()
         *
         * @author tanyaowu
         * 2017年2月2日 下午8:24:40
         *
         */
        @Override
        public void run() {
            ReentrantReadWriteLock closeLock = channelContext.closeLock;
            WriteLock writeLock = closeLock.writeLock();
            writeLock.lock();
            try {
                if (!channelContext.isClosed) //已经连上了,不需要再重连了
                {
                    return;
                }
                long start = SystemTimer.currTime;
                tioClient.reconnect(channelContext, 2);
                long end = SystemTimer.currTime;
                long iv = end - start;
                if (iv >= 100) {
                    log.error("{},重连耗时:{} ms", channelContext, iv);
                } else {
                    log.info("{},重连耗时:{} ms", channelContext, iv);
                }
                if (channelContext.isClosed) {
                    channelContext.setReconnCount(channelContext.getReconnCount() + 1);
                    //                    cacheMap.put(channelContext.getServerNode(), SystemTimer.currTime);
                    return;
                }
            } catch (java.lang.Throwable e) {
                log.error(e.toString(), e);
            } finally {
                writeLock.unlock();
            }
        }
    }
    private static Logger log = LoggerFactory.getLogger(TioClient.class);
    private AsynchronousChannelGroup channelGroup;
    private ClientTioConfig clientTioConfig;
    /**
     * @param serverIp 可以为空
     * @param serverPort
     * @param aioDecoder
     * @param aioEncoder
     * @param aioHandler
     *
     * @author tanyaowu
     * @throws IOException
     *
     */
    public TioClient(final ClientTioConfig clientTioConfig) throws IOException {
        super();
        this.clientTioConfig = clientTioConfig;
        this.channelGroup = AsynchronousChannelGroup.withThreadPool(clientTioConfig.groupExecutor);
        startHeartbeatTask();
        startReconnTask();
    }
    /**
     *
     * @param serverNode
     * @throws Exception
     *
     * @author tanyaowu
     *
     */
    public void asynConnect(Node serverNode) throws Exception {
        asynConnect(serverNode, null);
    }
    /**
     *
     * @param serverNode
     * @param timeout
     * @throws Exception
     *
     * @author tanyaowu
     *
     */
    public void asynConnect(Node serverNode, Integer timeout) throws Exception {
        asynConnect(serverNode, null, null, timeout);
    }
    /**
     *
     * @param serverNode
     * @param bindIp
     * @param bindPort
     * @param timeout
     * @throws Exception
     *
     * @author tanyaowu
     *
     */
    public void asynConnect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception {
        connect(serverNode, bindIp, bindPort, null, timeout, false);
    }
    /**
     *
     * @param serverNode
     * @return
     * @throws Exception
     *
     * @author tanyaowu
     *
     */
    public ClientChannelContext connect(Node serverNode) throws Exception {
        return connect(serverNode, null);
    }
    /**
     *
     * @param serverNode
     * @param timeout
     * @return
     * @throws Exception
     * @author tanyaowu
     */
    public ClientChannelContext connect(Node serverNode, Integer timeout) throws Exception {
        return connect(serverNode, null, 0, timeout);
    }
    /**
     *
     * @param serverNode
     * @param bindIp
     * @param bindPort
     * @param initClientChannelContext
     * @param timeout 超时时间,单位秒
     * @return
     * @throws Exception
     * @author tanyaowu
     */
    public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout) throws Exception {
        return connect(serverNode, bindIp, bindPort, initClientChannelContext, timeout, true);
    }
    /**
     *
     * @param serverNode
     * @param bindIp
     * @param bindPort
     * @param initClientChannelContext
     * @param timeout 超时时间,单位秒
     * @param isSyn true: 同步, false: 异步
     * @return
     * @throws Exception
     * @author tanyaowu
     */
    private ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout, boolean isSyn)
            throws Exception {
        AsynchronousSocketChannel asynchronousSocketChannel = null;
        ClientChannelContext channelContext = null;
        boolean isReconnect = initClientChannelContext != null;
        //        ClientAioListener clientAioListener = clientTioConfig.getClientAioListener();
        long start = SystemTimer.currTime;
        asynchronousSocketChannel = AsynchronousSocketChannel.open(channelGroup);
        long end = SystemTimer.currTime;
        long iv = end - start;
        if (iv >= 100) {
            log.error("{}, open 耗时:{} ms", channelContext, iv);
        }
        asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
        InetSocketAddress bind = null;
        if (bindPort != null && bindPort > 0) {
            if (false == StrUtil.isBlank(bindIp)) {
                bind = new InetSocketAddress(bindIp, bindPort);
            } else {
                bind = new InetSocketAddress(bindPort);
            }
        }
        if (bind != null) {
            asynchronousSocketChannel.bind(bind);
        }
        channelContext = initClientChannelContext;
        start = SystemTimer.currTime;
        InetSocketAddress inetSocketAddress = new InetSocketAddress(serverNode.getIp(), serverNode.getPort());
        ConnectionCompletionVo attachment = new ConnectionCompletionVo(channelContext, this, isReconnect, asynchronousSocketChannel, serverNode, bindIp, bindPort);
        if (isSyn) {
            Integer realTimeout = timeout;
            if (realTimeout == null) {
                realTimeout = 5;
            }
            CountDownLatch countDownLatch = new CountDownLatch(1);
            attachment.setCountDownLatch(countDownLatch);
            asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientTioConfig.getConnectionCompletionHandler());
            boolean f = countDownLatch.await(realTimeout, TimeUnit.SECONDS);
            if (f) {
                return attachment.getChannelContext();
            } else {
                log.error("countDownLatch.await(realTimeout, TimeUnit.SECONDS) 返回false ");
                return attachment.getChannelContext();
            }
        } else {
            asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientTioConfig.getConnectionCompletionHandler());
            return null;
        }
    }
    /**
     *
     * @param serverNode
     * @param bindIp
     * @param bindPort
     * @param timeout 超时时间,单位秒
     * @return
     * @throws Exception
     *
     * @author tanyaowu
     *
     */
    public ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, Integer timeout) throws Exception {
        return connect(serverNode, bindIp, bindPort, null, timeout);
    }
    /**
     * @return the channelGroup
     */
    public AsynchronousChannelGroup getChannelGroup() {
        return channelGroup;
    }
    /**
     * @return the clientTioConfig
     */
    public ClientTioConfig getClientTioConfig() {
        return clientTioConfig;
    }
    /**
     *
     * @param channelContext
     * @param timeout
     * @return
     * @throws Exception
     *
     * @author tanyaowu
     *
     */
    public void reconnect(ClientChannelContext channelContext, Integer timeout) throws Exception {
        connect(channelContext.getServerNode(), channelContext.getBindIp(), channelContext.getBindPort(), channelContext, timeout);
    }
    /**
     * @param clientTioConfig the clientTioConfig to set
     */
    public void setClientTioConfig(ClientTioConfig clientTioConfig) {
        this.clientTioConfig = clientTioConfig;
    }
    /**
     * 定时任务:发心跳
     * @author tanyaowu
     *
     */
    private void startHeartbeatTask() {
        final ClientGroupStat clientGroupStat = (ClientGroupStat)clientTioConfig.groupStat;
        final ClientAioHandler aioHandler = clientTioConfig.getClientAioHandler();
        final String id = clientTioConfig.getId();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (!clientTioConfig.isStopped()) {
//                    final long heartbeatTimeout = clientTioConfig.heartbeatTimeout;
                    if (clientTioConfig.heartbeatTimeout <= 0) {
                        log.warn("用户取消了框架层面的心跳定时发送功能,请用户自己去完成心跳机制");
                        break;
                    }
                    SetWithLock<ChannelContext> setWithLock = clientTioConfig.connecteds;
                    ReadLock readLock = setWithLock.readLock();
                    readLock.lock();
                    try {
                        Set<ChannelContext> set = setWithLock.getObj();
                        long currtime = SystemTimer.currTime;
                        for (ChannelContext entry : set) {
                            ClientChannelContext channelContext = (ClientChannelContext) entry;
                            if (channelContext.isClosed || channelContext.isRemoved) {
                                continue;
                            }
                            ChannelStat stat = channelContext.stat;
                            long compareTime = Math.max(stat.latestTimeOfReceivedByte, stat.latestTimeOfSentPacket);
                            long interval = currtime - compareTime;
                            if (interval >= clientTioConfig.heartbeatTimeout / 2) {
                                Packet packet = aioHandler.heartbeatPacket(channelContext);
                                if (packet != null) {
                                    if (log.isInfoEnabled()) {
                                        log.info("{}发送心跳包", channelContext.toString());
                                    }
                                    Tio.send(channelContext, packet);
                                }
                            }
                        }
                        if (log.isInfoEnabled()) {
                            log.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b)", id, set.size(), clientGroupStat.closed.get(),
                                    clientGroupStat.receivedPackets.get(), clientGroupStat.receivedBytes.get(), clientGroupStat.handledPackets.get(),
                                    clientGroupStat.sentPackets.get(), clientGroupStat.sentBytes.get());
                        }
                    } catch (Throwable e) {
                        log.error("", e);
                    } finally {
                        try {
                            readLock.unlock();
                            Thread.sleep(clientTioConfig.heartbeatTimeout / 4);
                        } catch (Throwable e) {
                            log.error(e.toString(), e);
                        } finally {
                        }
                    }
                }
            }
        }, "tio-timer-heartbeat" + id).start();
    }
    /**
     * 启动重连任务
     *
     *
     * @author tanyaowu
     *
     */
    private void startReconnTask() {
        final ReconnConf reconnConf = clientTioConfig.getReconnConf();
        if (reconnConf == null || reconnConf.getInterval() <= 0) {
            return;
        }
        final String id = clientTioConfig.getId();
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!clientTioConfig.isStopped()) {
                    //log.info("准备重连");
                    LinkedBlockingQueue<ChannelContext> queue = reconnConf.getQueue();
                    ClientChannelContext channelContext = null;
                    try {
                        channelContext = (ClientChannelContext) queue.take();
                    } catch (InterruptedException e1) {
                        log.error(e1.toString(), e1);
                    }
                    if (channelContext == null) {
                        continue;
                        //                        return;
                    }
                    if (channelContext.isRemoved) //已经删除的,不需要重新再连
                    {
                        continue;
                    }
                    SslFacadeContext sslFacadeContext = channelContext.sslFacadeContext;
                    if (sslFacadeContext != null) {
                        sslFacadeContext.setHandshakeCompleted(false);
                    }
                    long sleeptime = reconnConf.getInterval() - (SystemTimer.currTime - channelContext.stat.timeInReconnQueue);
                    //log.info("sleeptime:{}, closetime:{}", sleeptime, timeInReconnQueue);
                    if (sleeptime > 0) {
                        try {
                            Thread.sleep(sleeptime);
                        } catch (InterruptedException e) {
                            log.error(e.toString(), e);
                        }
                    }
                    if (channelContext.isRemoved || !channelContext.isClosed) //已经删除的和已经连上的,不需要重新再连
                    {
                        continue;
                    }
                    ReconnRunnable runnable = new ReconnRunnable(channelContext, TioClient.this);
                    reconnConf.getThreadPoolExecutor().execute(runnable);
                }
            }
        });
        thread.setName("tio-timer-reconnect-" + id);
        thread.setDaemon(true);
        thread.start();
    }
    /**
     * 
     * @return
     * @author tanyaowu
     */
    public boolean stop() {
        boolean ret = true;
        try {
            clientTioConfig.groupExecutor.shutdown();
        } catch (Exception e1) {
            log.error(e1.toString(), e1);
        }
        try {
            clientTioConfig.tioExecutor.shutdown();
        } catch (Exception e1) {
            log.error(e1.toString(), e1);
        }
        clientTioConfig.setStopped(true);
        try {
            ret = ret && clientTioConfig.groupExecutor.awaitTermination(6000, TimeUnit.SECONDS);
            ret = ret && clientTioConfig.tioExecutor.awaitTermination(6000, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error(e.getLocalizedMessage(), e);
        }
        log.info("client resource has released");
        return ret;
    }
}
点赞
收藏
评论区
推荐文章
爱写码 爱写码
2年前
国产开源网络编程框架t-io使用必备:极速开发器Tio.java
Tio.java简介为了让用户减少查找API的时间,tio把常用API以静态方法的形式汇集于一个类,这就是Tio.javaTio.java本身并不实现过复杂的业务,各业务实现仍然分布在其它类中,Tio.java只是把用户关心的API集中起来,便于用IDE查找而已具体请参考:业务数据绑定无序列表资源绑定是指把业务相关的数据和Tcp连接(即Channel
爱写码 爱写码
2年前
十年磨一剑,匠心打造中国人自己的网络编程架构t-io
中国人拥有了完全自主的高性能网络编程框架tio。通过十多年对低层技术的潜心研究,终于打磨出中国人自己的完全自主的开源网络编程软件tio。1、什么是tiotio是基于java开发的一个开源的网络编程架构,大家都知道现在手机上或者电脑上都装了很多APP,这些APP都不是一个个在手机上或电脑上孤立的使用,而是能访问其他的地方数据或者与其他节点进行实时聊天,故每个A
Easter79 Easter79
2年前
tio
本文讲述如何快速将tio服务整合到SpringBoot项目首先,你需要在pom.xml中引入tiocorespringbootstarter构件<dependency<groupIdorg.tio</groupId<artifactIdtiocorespringboot
爱写码 爱写码
2年前
国产开源网络编程框架tio的得意之作—谭聊介绍
想各位对即时通讯源码有追求人,必然有所了解谭聊,谭聊是完全基于开源网络编程框架tio开发的一款即时通讯软件,也是tio作者亲自操刀,性能上的强大能力完全继承了tio的特性,即单机版可以达到近百万并发,而集群版可以达到过亿的并发能力。所以各位如果想开发即时通讯软件或者类似的公司内部沟通软件,完全可以以tio作为网络编程软件拿来使用,这样不仅能省去这块的开发工作
爱写码 爱写码
2年前
要想轻松驾驭t-io,提高编程效率,学习示范工程很重要
tio作为国内知名的开源网络编程框架,受到业界的广泛赞誉和使用,要一个想要学习或者想要使用tio的人员,最快的了解tio的方法就是学习tio相关的工程文档,主要包含五个部分:tiostudy工程tiowebsocketshowcase工程tiohttpservershowcase工程tioudpshowcase工程tiowebsocketclient 工程具
爱写码 爱写码
2年前
t-io 3.7.5 发布,口碑炸裂的国产网络编程框架
标题说明看到"口碑炸裂"四字,应该又有不少"闻风而至"的同学要来"口吐芬芳",所以先上3张"炸裂封条"如果3张"炸裂封条"还不够,那就再上一张王炸"唵嘛呢叭咪吽",没错,就是封印孙悟空500年的"六字大明咒"言归正传,tio其实是一位三流程序员写的国产网络编程框架,为了自我证明tio的优秀,这位程序员还用tio写了HTTP服务器、WebSocket服务器,再
爱写码 爱写码
2年前
要想编程效率高,熟悉t-io很必要,省去你的APP中自己开发网络通信的模块
1.是基于javaaio的网络编程框架,和netty属于同类,它的使命是:让天下没有难开发的网络程序。2.基于tiocore来开发IM、TCP私有协议、RPC、游戏服务器端、推送服务、实时监控、物联网、UDP、Socket将会变得空前的简单。3.tio家族除了tiocore外,还有tiowebsocketserver、tiohttpserver、ti
爱写码 爱写码
2年前
聊聊t-io和netty的差异
引言tio和netty的差异,这是个被大量问及的问题,在此,作为tio作者,列一些差异化的东西tio的最大优势API设计易懂,尽量避免引入自创概念——最大限度降低学习成本接管了大量业务资源的绑定与自动解绑,开发人员只需要无脑地调用API即可完成绑定与解绑功能,这个处理如果丢给业务开发人员是极其复杂易错的:多线程环境下的集合管理都是要同步安全的,同步的设计既
爱写码 爱写码
2年前
t-io应用场景和能力
tio历史、应用场景(图示——简)tio应用场景(文字描述——详)tio是基于JVM的网络编程框架,和netty属同类,所以netty能做的tio都能做,考虑到tio是从项目抽象出来的框架,所以tio提供了更多的和业务相关的API,大体上tio具有如下特点和能力:内置完备的监控和流控能力内置半包粘包处理一骑绝尘的资源管理能力内置心跳检查和心跳发送能力
爱写码 爱写码
2年前
唯一入驻华为开源优选库的国产网络框架t-io
在2020年5月份,tio在版本tio3.6.1发布的时候,就被华为选中作为网络中台,入驻华为开源软件优选库。主要原因还是因为tio一路走来也有将近十年的精心打磨了,被华为业软部的某测试部严格测试的3个月中,配合华为的测试要求,不断完善tio,把tio磨练成一个相对更加完美的产品了,所以tio相对比较完善了,能满足目前各行各业的应用需求,而且tio周边的产