Netty 高性能网络协议服务器开发

威尔we
• 阅读 1429

本文通过一个实例来讲解如何使用 Netty 框架来开发网络协议服务器,项目使用 Gradle 工具来构建和运行,并且支持 Docker 部署。项目代码已在 GitHub 开源,JW Netty Demo

Netty 简介

Netty 是一个异步、事件驱动的网络应用框架,使用它可以快速开发出可维护良好的、高性能的网络协议服务器。它大幅简化和流程化了网络编程,比如 TCP 和 UDP 套接字服务器开发。难能可贵的是,在保证快速和易用性的同时,使用 Netty 开发的应用并没有丧失可维护性和性能。

Netty 高性能网络协议服务器开发

上图是来自于官网的 Netty 架构图,可以看到整体结构非常清晰,每一层各司其职。

项目介绍

本项目实现了一个用来接收和存储传感器数据的 TCP 网络协议服务器。这些数据由连接在硬件设备上的传感器采集,然后由硬件设备上报到服务器。一个硬件设备可以连接多个传感器,每次上报硬件设备会把所有传感器的数据一并上报。服务端需要计算所有传感器数据的平均值,并保存起来。这是一个典型的物联网数据采集场景。

协议

考虑到物联网环境硬件设备性能不高、网络带宽较小且稳定性不够,我们需要尽可能降低协议编解码开销、减少报文大小。因此最好使用二进制协议,而不是 JSON 这样的文本格式。

本项目采用的二进制协议如下:

| 起始位(2 bytes) | 报文总长(2 bytes) | 协议版本(1 byte) | 设备号(4 bytes) | 时间戳(4 bytes) | 数据项长度(1 byte) | 数据项值 | 更多数据项... | 校验和(2 bytes) |

  • 所有段都是无符号整数,字节顺序为网络字节序
  • 起始位固定为 0x55 0xaa,用来防止网络传输过程中数据错乱
  • 报文总长包含了起始位和最后的校验和
  • 数据项值占用的字节数由其前面一个段的值决定
  • 数据项可以有很多个,只要报文总长不超过 65535 就行

代码解读

目录结构

.
├── Dockerfile # Docker 镜像构建配置文件
├── README.md
├── bin
├── build
├── build.gradle # Gradle 构建配置文件
├── docker-compose.yml # Docker Compose 配置文件
├── gradle # Gradle 目录
├── gradlew # Gradle 包装脚本,通过它来执行构建任务
├── gradlew.bat # Windows 平台下的包装脚本
├── settings.gradle # Gradle 设置
└── src
    ├── main
    │   ├── java
    │   │   └── net
    │   │       └── jaggerwang
    │   │           └── jwnettydemo
    │   │               ├── Main.java # 应用入口
    │   │               ├── config # 配置
    │   │               │   ├── ApplicationConfig.java
    │   │               │   └── MongoDBConfig.java
    │   │               ├── decoder # 报文解码器
    │   │               │   ├── Message.java
    │   │               │   └── MessageDecoder.java
    │   │               ├── handler # 报文处理器
    │   │               │   └── MessageHandler.java
    │   │               └── saver # 报文保存器
    │   │                   ├── Metric.java
    │   │                   └── MetricSaver.java
    │   └── resources
    │       ├── application.properties # 应用配置
    │       └── log4j2.xml # 日志配置
    └── test
        └── java
            └── net
                └── jaggerwang
                    └── jwnettydemo
                        ├── decoder # 报文解码单元测试
                        │   └── MessageDecoderTests.java
                        └── saver # 报文保存单元测试
                            └── MetricSaverTests.java 

可以看到,项目结构为典型的 Java 项目结构。

配置文件

通过配置文件,使得同一套代码可以在不同环境中都能运行。为了不为每套环境都维护一套配置文件,本项目将依赖环境的配置抽取了出来,使得运行时可以通过环境变量来覆盖指定配置项。这样各个环境的配置文件就保持了一致,可以共用一份。

配置文件有两个,一个是应用配置,一个是日志配置。日志组件用的是 Log4j 2,配置文件里环境变量的嵌入格式遵循的是 Log4j 风格,包括默认值的指定方式。

应用配置文件内容如下:

# Path
path.app=${PATH_APP:-/Users/jagger/projects/jw/jw-netty-demo}
path.data=${PATH_DATA:-/Users/jagger/data/jw/jw-netty-demo}

# Server
server.port=${SERVER_PORT:-8080}

# MongoDB
mongodb.uri=${MONGODB_URI:-mongodb://localhost:27017/}
mongodb.db=${MONGODB_DB:-jw_netty_demo} 

核心代码

整个报文处理过程分两步,第一步解码,由类 MessageDecoder 负责,第二步保存解码出来的 Message,由类 MessageHandler 负责。下面我们重点来看这两个类的实现。

MessageDecoder 类

package net.jaggerwang.jwnettydemo.decoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

public class MessageDecoder extends ReplayingDecoder {

    private static final Logger logger = LogManager.getLogger();

    public static final int HEADER_LENGTH = 13;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        Message msg = new Message();

        msg.setStartFirst(in.readUnsignedByte());
        msg.setStartSecond(in.readUnsignedByte());
        logger.debug("decode start ok: {} {}", msg.getStartFirst(), msg.getStartSecond());

        msg.setLength(in.readUnsignedShort());
        logger.debug("decode length ok: {}", msg.getLength());

        msg.setVersion(in.readUnsignedByte());
        logger.debug("decode version ok: {}", msg.getVersion());

        msg.setDeviceNo(in.readUnsignedInt());
        logger.debug("decode device no ok: {}", msg.getDeviceNo());

        msg.setTime(in.readUnsignedInt());
        logger.debug("decode time ok: {}", msg.getTime());

        int pos = HEADER_LENGTH;
        while (pos < msg.getLength() - 2) {
            short len = in.readUnsignedByte();
            pos += 1;

            long data;
            switch (len) {
                case 1:
                    data = (long) in.readUnsignedByte();
                    pos += 1;
                    break;
                case 2:
                    data = (long) in.readUnsignedShort();
                    pos += 2;
                    break;
                case 4:
                    data = in.readUnsignedInt();
                    pos += 4;
                    break;
                default:
                    logger.error("unsupported data length: {}", len);
                    continue;
            }

            msg.getDatas().add(data);
            logger.debug("decode one data ok: {}", data);
        }

        msg.setChecksum(in.readUnsignedShort());
        logger.debug("decode checksum ok: {}", msg.getChecksum());

        out.add(msg);
    }
} 

MessageDecoder 类继承自 Netty 的 ReplayingDecoder 类,该类解决了网络报文接收不完整的问题。从网络中接收报文时,不能保证一次接收到整个报文,很可能只是其中一部分。如果在应用里来检测报文完整性,在接收到完整报文时才解码,会比较麻烦。通过继承 ReplayingDecoder 类,实现它的 decode 方法,应用就可以假设报文是完整的。实际处理过程中,如果解码时发现报文不完整,decode 会抛出异常,ReplayingDecoder 捕获该异常后将解码位置归零,下次再重头开始。

MessageHandler 类

package net.jaggerwang.jwnettydemo.handler;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import net.jaggerwang.jwnettydemo.decoder.Message;
import net.jaggerwang.jwnettydemo.saver.Metric;
import net.jaggerwang.jwnettydemo.saver.MetricSaver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Sharable
public class MessageHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LogManager.getLogger();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        Message message = (Message) msg;
        if (!message.isValid()) {
            logger.error("invalid message: {}", message);
            ctx.close();
            return;
        }
        logger.debug("received message: {}", message);

        MetricSaver saver = new MetricSaver();
        Metric metric = new Metric(message);
        saver.save(metric);
        logger.debug("saved metric: {}", metric);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
} 

MessageHandler 在 Netty Channel 处理管道里位于 MessageDecoder 之后,因此它接收到的是已经解码出来的 Message 对象。它不用关心协议细节,只需要简单地把 Message 对象保存到数据库里就可以。

开发

构建工具

本项目使用 Gradle 工具来构建和运行,但是不需要额外去安装。只要使用 ./gradlew <task> 执行任意任务,如果没有检测到 Gradle 工具,就会自动下载并安装到项目下。

本项目的 build.gradle 配置文件如下:

apply plugin : 'application'

mainClassName = 'net.jaggerwang.jwnettydemo.Main'

dependencies {
    compile 'io.netty:netty-all:4.1.21.Final'
    compile 'org.apache.logging.log4j:log4j-api:2.10.0'
    compile 'org.apache.logging.log4j:log4j-core:2.10.0'
    compile 'org.mongodb:mongodb-driver:3.6.3'
    compile 'com.aliyun:hitsdb-client:0.0.5'
    compile 'org.apache.kafka:kafka-clients:0.10.0.0'
    compile 'org.apache.kafka:connect-json:0.10.0.0'
    compile 'com.aliyun.openservices:ons-sasl-client:0.1'
    compile 'com.fasterxml.jackson.core:jackson-core:2.9.4'
    compile 'com.fasterxml.jackson.core:jackson-databind:2.9.4'
    compile 'com.fasterxml.jackson.module:jackson-module-parameter-names:2.9.4'
    compile 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.9.4'
    compile 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.9.4'
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    compile 'commons-codec:commons-codec:1.10'

    compileOnly 'org.projectlombok:lombok:1.16.18'

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.1.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.1.0'
}

repositories {
    mavenCentral()
}

test {
    useJUnitPlatform()
} 

运行

执行下面的命令来运行项目:

$ ./gradlew run
  Starting a Gradle Daemon (subsequent builds will be faster)

  > Task :run
  2018-四月-26 21:21:49 INFO  net.jaggerwang.jwnettydemo.config.ApplicationConfig - load properties ok
  SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  SLF4J: Defaulting to no-operation (NOP) logger implementation
  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  2018-四月-26 21:21:50 INFO  net.jaggerwang.jwnettydemo.Main - server started on port 8080
  <=========----> 75% EXECUTING [17s]
  > :run 

运行起来的 Server 会在 8080 端口监听请求,按照协议组装报文后发往 localhost:8080 就可以上报数据。

测试

使用下面的命令来运行单元测试:

$ ./gradlew test
  Starting a Gradle Daemon (subsequent builds will be faster)

  Deprecated Gradle features were used in this build, making it incompatible with Gradle 5.0.
  See https://docs.gradle.org/4.6/userguide/command_line_interface.html#sec:command_line_warnings

  BUILD SUCCESSFUL in 7s
  4 actionable tasks: 4 up-to-date 

本项目使用 JUnit 5 来编写和运行单元测试。目前有两个单元测试,一个是报文解码,另一个是报文保存。

打包

使用下面的命令来打包和运行:

$ ./gradlew installDist

  Deprecated Gradle features were used in this build, making it incompatible with Gradle 5.0.
  See https://docs.gradle.org/4.6/userguide/command_line_interface.html#sec:command_line_warnings

  BUILD SUCCESSFUL in 2s
  5 actionable tasks: 3 executed, 2 up-to-date

$ ./build/install/jw-netty-demo/bin/jw-netty-demo
  2018-四月-26 22:23:22 INFO  net.jaggerwang.jwnettydemo.config.ApplicationConfig - load properties ok
  SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  SLF4J: Defaulting to no-operation (NOP) logger implementation
  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  2018-四月-26 22:23:22 INFO  net.jaggerwang.jwnettydemo.Main - server started on port 8080 

打包好的应用会安装到目录 ./build/install/jw-netty-demo 下, 可以使用其下的 bin/jw-netty-demo 脚本来运行应用。

Docker 部署

构建镜像

执行下面的命令来构建镜像:

$ docker build -t jw-netty-demo . 

构建镜像的配置文件如下:

FROM java:8

ENV APP_PATH=/app
ENV DATA_PATH=/data

WORKDIR $APP_PATH

COPY . .
RUN ./gradlew installDist

VOLUME $DATA_PATH

EXPOSE 8080

CMD ./build/install/jw-netty-demo/bin/jw-netty-demo 

在容器里运行服务

执行下面的命令来运行项目所有服务:

$ docker-compose up 

Docker Compose 配置文件如下:

version: "2"
services:
  app:
    image: jw-netty-demo:latest
    environment:
      PATH_APP: /app
      PATH_DATA: /data
      MONGODB_URI: mongodb:27017
    ports:
    - 19900:8080
    volumes:
    - ~/data/jw-netty-demo/app:/data
  mongodb:
    image: mongo:3
    volumes:
    - ~/data/jw-netty-demo/mongodb:/data/db 

Docker Compose 会同时启动 app 服务,及其依赖的 mongodb 服务。

参考资料

  1. JW Netty Demo

本文转自 https://blog.jaggerwang.net/netty-high-performance-protocol-server-develop/,如有侵权,请联系删除。

点赞
收藏
评论区
推荐文章
爱写码 爱写码
2年前
要想编程效率高,熟悉t-io很必要,省去你的APP中自己开发网络通信的模块
1.是基于javaaio的网络编程框架,和netty属于同类,它的使命是:让天下没有难开发的网络程序。2.基于tiocore来开发IM、TCP私有协议、RPC、游戏服务器端、推送服务、实时监控、物联网、UDP、Socket将会变得空前的简单。3.tio家族除了tiocore外,还有tiowebsocketserver、tiohttpserver、ti
Stella981 Stella981
2年前
Netty HTTP on Android
Netty是一个NIO的客户端服务器框架,它使我们可以快速而简单地开发网络应用程序,比如协议服务器和客户端。它大大简化了网络编程,比如TCP和UDPsocket服务器。“快速而简单”并不意味着开发出来的应用可维护性或性能不好。Netty已经实现了大量的协议,比如FTP,SMTP,HTTP,以及各种基于二进制和文本的传统协议。可以说Netty已经找到了一
Stella981 Stella981
2年前
Netty序章之BIO NIO AIO演变
Netty序章之BIONIOAIO演变Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠的网络服务器和客户端程序。Netty简化了网络程序的开发,是很多框架和公司都在使用的技术。更是面试的加分项。Netty并非横空出世,它是在BIO,NIO,AIO演变中的产物,是一种
Stella981 Stella981
2年前
BIO、NIO、AIO系列二:Netty
一、概述Netty是一个Java的开源框架。提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty是一个NIO客户端,服务端框架。允许快速简单的开发网络应用程序。例如:服务端和客户端之间的协议,它简化了网络编程规范。二、NIO开发的问题
Stella981 Stella981
2年前
Netty 入门,这一篇文章就够了
Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo、Rocketmq、Hadoop等,针对高性能RPC,一般都是基于Netty来构建,比如sockbolt。总之一句话,Java小伙伴们需要且有必要学会使用Netty并理解其实现原理。netty旨在为可维护的高性能、高可扩展
Stella981 Stella981
2年前
Netty 入门初体验
Netty简介Netty是一款异步的事件驱动的网络应用程序框架,支持快速开发可维护的高性能的面向协议的服务器和客户端。Netty主要是对java的nio包进行的封装为什么要使用Netty上面介绍到Netty是一款高性能的网络通讯框架,那么我们为什么要使用Netty,换句话说,
Stella981 Stella981
2年前
Netty网络编程(初识)
Netty简单介绍核心架构图(现在还看不是很懂):!netty(https://static.oschina.net/uploads/img/201712/02192156_X1e5.png"netty")Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高
Stella981 Stella981
2年前
Netty(RPC高性能之道)原理剖析
1,Netty简述Netty是一个基于JAVANIO类库的异步通信框架,用于创建异步非阻塞、基于事件驱动、高性能、高可靠性和高可定制性的网络客户端和服务器端RPC高性能分析,请参考文章“【总结】RPC性能之道”特点异步、非阻塞、基于事件驱动的NIO框架支持多种传输层通信协议,包括T
Stella981 Stella981
2年前
Netty入门
一、是什么  Netty是一个高性能、异步事件驱动、基于JavaNIO的异步的可扩展的客户端/服务器网络编程框架。  Netty提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过FutureListener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果
Stella981 Stella981
2年前
Netty堆外内存泄露排查与总结
导读Netty是一个异步事件驱动的网络通信层框架,用于快速开发高可用高性能的服务端网络框架与客户端程序,它极大地简化了TCP和UDP套接字服务器等网络编程。Netty底层基于JDK的NIO,我们为什么不直接基于JDK的NIO或者其他NIO框架:1.使用JDK自带的NIO需要了解太多的概念,编程复杂。2