vertx的HttpServer模块

Wesley13
• 阅读 626

Start HttpServer

/**
   * 启动 HttpServer
   * multi instances 采用 synchronized防止线程安全问题
   * addHandlers 方法是actor模式的实现(EventLoopPoolSize >= instances):
  *               1 instances : 1 verticle(actor) : 1 VertxThread(Eventloop)
   */
 public synchronized HttpServer listen(int port, String host, Handler<AsyncResult<HttpServer>> listenHandler) {
    //是否有配置requestHandler或webscoket
    if (requestStream.handler() == null && wsStream.handler() == null) {
      throw new IllegalStateException("Set request or websocket handler first");
    }
    if (listening) {
      throw new IllegalStateException("Already listening");
    }
    listenContext = vertx.getOrCreateContext(); //根据currentThread 获取Context,获取null则create
    serverOrigin = (options.isSsl() ? "https" : "http") + "://" + host + ":" + port;//判断是否启用ssl
    List<HttpVersion> applicationProtocols = options.getAlpnVersions();//获取协议版本,默认支持1.1和2.0
    
    if (listenContext.isWorkerContext()) {//是否使用 Worker Verticles ,不予许使用HTTP2.0
      applicationProtocols = applicationProtocols.stream().filter(v -> v != HttpVersion.HTTP_2).collect(Collectors.toList());
    }
    sslHelper.setApplicationProtocols(applicationProtocols);//应用协议
    
    synchronized (vertx.sharedHttpServers()) {// 监听多个不同网络接口(ip:port) Httpserver 防止并发
      this.actualPort = port; 
      id = new ServerID(port, host);//生成服务id
      HttpServerImpl shared = vertx.sharedHttpServers().get(id);
      
      if (shared == null || port == 0) {// mutil instances 的情况,利用 mutli core cpu
        /**
          * frist instances
          */
        serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE);
        ServerBootstrap bootstrap = new ServerBootstrap();
        //定义两个线程组,accept size 1, 重写的VertxEventLoopGroup
        bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers);
        
        applyConnectionOptions(bootstrap);//添加Connection Accept之后的附属选项
        sslHelper.validate(vertx);//验证ssl相关参数
        bootstrap.childHandler(new ChannelInitializer<Channel>() {
        
          @Override
          /**
            * connection accept 调度切换线程后触发
            */
          protected void initChannel(Channel ch) throws Exception {
           //限流策略,读大于写,导致内存无限扩大,最终 OOM
            if (requestStream.isPaused() || wsStream.isPaused()) {
              ch.close(); //超过服务承载能力,关闭连接
              return;
            }
            ChannelPipeline pipeline = ch.pipeline();
            if (sslHelper.isSSL()) {//是否启用ssl
              io.netty.util.concurrent.Future<Channel> handshakeFuture;
              if (options.isSni()) {//是否启用sni,单服务多证书情况
                VertxSniHandler sniHandler = new VertxSniHandler(sslHelper, vertx);
                pipeline.addLast(sniHandler);
                handshakeFuture = sniHandler.handshakeFuture();
              } else {
                SslHandler handler = new SslHandler(sslHelper.createEngine(vertx));
                pipeline.addLast("ssl", handler);
                handshakeFuture = handler.handshakeFuture();
              }
              //侦听 TLS handshake
              handshakeFuture.addListener(future -> {
                if (future.isSuccess()) {// 握手成功
                  if (options.isUseAlpn()) {//是否启用alpn,协调使用的protocol
                    //获取使用的协议
                    SslHandler sslHandler = pipeline.get(SslHandler.class);
                    String protocol = sslHandler.applicationProtocol();
                    if ("h2".equals(protocol)) {//是否是http2.0
                      handleHttp2(ch);
                    } else {
                      handleHttp1(ch);
                    }
                  } else {
                    handleHttp1(ch);
                  }
                } else {//握手失败
                  HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ch.eventLoop());
                  handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(future.cause()));
                }
              });
            } else {
              //是否是启用http2,通过VM Options: -Dvertx.disableH2c 设置;默认false
              if (DISABLE_H2C) {
                handleHttp1(ch);
              } else {
                IdleStateHandler idle;
                if (options.getIdleTimeout() > 0) {//是否定义最大空闲时间
                  pipeline.addLast("idle", idle = new IdleStateHandler(0, 0, options.getIdleTimeout()));
                } else {
                  idle = null;
                }
                
                /**直接使用明文的http2.0或1.1处理*/
                pipeline.addLast(new Http1xOrH2CHandler() {
                  @Override
                  protected void configure(ChannelHandlerContext ctx, boolean h2c) {
                    if (idle != null) {
                      //移除idleHandler,重新添加,不用注意次序
                      pipeline.remove(idle);
                    }
                    if (h2c) {//判断协议,如果定义idle则会重新添加 idleHandler
                      handleHttp2(ctx.channel());
                    } else {
                      handleHttp1(ch);
                    }
                  }

                  @Override
                  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                    if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) {
                      ctx.close();
                    }
                  }

                  @Override
                  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                    super.exceptionCaught(ctx, cause);
                    //根据eventloop选中对应的handler进行异常传播
                    HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ctx.channel().eventLoop());
                    handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(cause));
                  }
                });
              }
            }
          }
        });

        addHandlers(this, listenContext);////添加一个instaces(verticle的HttpHandlers)到httpHandlerMgr中
        try {
         //listen ip:port
          bindFuture = AsyncResolveConnectHelper.doBind(vertx, SocketAddress.inetSocketAddress(port, host), bootstrap);
          bindFuture.addListener(res -> {
            if (res.failed()) {
              vertx.sharedHttpServers().remove(id);
            } else {
              Channel serverChannel = res.result();
              HttpServerImpl.this.actualPort = ((InetSocketAddress) serverChannel.localAddress()).getPort();
              serverChannelGroup.add(serverChannel);//添加当前的ServerSocketChannel
              //初始化metrcis指标
              VertxMetrics metrics = vertx.metricsSPI();
              this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null;
            }
          });
        } catch (final Throwable t) {
          if (listenHandler != null) {
            vertx.runOnContext(v -> listenHandler.handle(Future.failedFuture(t)));
          } else {
            log.error(t);
          }
          listening = false;
          return this;
        }
        vertx.sharedHttpServers().put(id, this);//启动的HttpServer服务(verticle)添加到Vertx.sharedHttpMap中
        actualServer = this;
      } else {//other instances
        actualServer = shared;
        this.actualPort = shared.actualPort;
        //在actualServer基础上添加一个instaces(verticle的HttpHandlers)到httpHandlerMgr中
        addHandlers(actualServer, listenContext);           
        //初始化metrics
        VertxMetrics metrics = vertx.metricsSPI();
        this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null;
      }
      //服务 bind 状态
      actualServer.bindFuture.addListener(future -> {
        if (listenHandler != null) {
          final AsyncResult<HttpServer> res;
          if (future.succeeded()) {
            res = Future.succeededFuture(HttpServerImpl.this);
          } else {
            res = Future.failedFuture(future.cause());
            listening = false;
          }
          listenContext.runOnContext((v) -> listenHandler.handle(res));//回调处理
        } else if (future.failed()) {
          listening = false;
          log.error(future.cause());
        }
      });
    }
    return this;
}

如何实现隔离(actor模型)

/**
  * 添加一个verticle instances handlers
  * @param server  First Actual Server(multi instances)
  *    mutil instances 情况下第一个instance启动成功,other instances 仅仅是
  *    利用multi core cpu,所以以 first instances actual Server为主,后续在
  *    Current HttpServerImpl instance 添加handlers(verticle)
  * @param context current Thread context
  *    multi instances 下EventLoopGroup.next 方法挑选(choose)出一个Eventloop
  *    与Context 映射. netty EventExecutor调度DefaultEventExecutorChooserFactory类
  *    两种实现:
  *           ①求余取模 
  *           ②位运算取模(2的幂)
  *    所以防止实例数量大于EventloopGroup数量,Default : 2 * CpuCoreSensor.availableProcessors()
  *    ,linux下以读取/proc/self/status 文件为主,而不是Runtime.getRuntime().availableProcessors()
  */
private void addHandlers(HttpServerImpl server, ContextImpl context) {
    server.httpHandlerMgr.addHandler(
      new HttpHandlers(
        requestStream.handler(),
        wsStream.handler(),
        connectionHandler,
        exceptionHandler == null ? DEFAULT_EXCEPTION_HANDLER : exceptionHandler)
      , context);
}


public class HttpHandlers {
  final Handler<HttpServerRequest> requestHandler;
  final Handler<ServerWebSocket> wsHandler;
  final Handler<HttpConnection> connectionHandler;
  final Handler<Throwable> exceptionHandler;

  /**
    * @param requestHandler     Http Request Handler
    * @param wsHandler          WebScoket Handler
    * @param connectionHander   TCP Connection Handler
    * @param exceptionHander    Exception Handlet
    */
  public HttpHandlers(
    Handler<HttpServerRequest> requestHandler,
    Handler<ServerWebSocket> wsHandler,
    Handler<HttpConnection> connectionHandler,
    Handler<Throwable> exceptionHandler) {
    this.requestHandler = requestHandler;
    this.wsHandler = wsHandler;
    this.connectionHandler = connectionHandler;
    this.exceptionHandler = exceptionHandler;
  }
}

public class HandlerManager<T> {
    public synchronized void addHandler(T handler, ContextImpl context) {
        /**
          * 添加一个eventloop(Thread)到 VertxEventLoopGroup 集合中.
          * accept状态后的read/write事件,线程调度在VertxEventLoopGroup类的next方法,
          * vertx重写choose策略
          */
        EventLoop worker = context.nettyEventLoop();
        availableWorkers.addWorker(worker);
        /**
          * 添加handlers,并且绑定handler和context映射关系.
          * 注意部署的instances size不要超过EventLoopPoolSize,
          * 否则出现 1 EventLoop : N handler(verticle)           * 导致一个eventloop上执行 N 个verticle
          */
        Handlers<T> handlers = new Handlers<>();
        Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers);
        if (prev != null) {
          handlers = prev;
        }
        handlers.addHandler(new HandlerHolder<>(context, handler));
        hasHandlers = true;
    }
}

Connection scheduling process:

vertx的HttpServer模块

add handler to eventloop structure:

  1. an eventloop corresponds to a handlers
  2. an eventloop corresponds to multiple instances verticles(HandlerHolder)

vertx的HttpServer模块

HttpServer option

public class HttpServerOptions extends NetServerOptions {  
  //是否启用压缩,默认false
  private boolean compressionSupported;
  
  //压缩级别越高cpu负荷越大,默认gzip
  private int compressionLevel;
  
  //websocket最大的 Frame 大小,默认65536
  private int maxWebsocketFrameSize;
  
  //websocket 最大消息大小,默认65536*4
  private int maxWebsocketMessageSize;
  
  //处理WebSocket消息的约定的子协议
  private String websocketSubProtocols;
  
  //是否自动处理100-Continue,默认false
  private boolean handle100ContinueAutomatically;
  
  //分段传输chunk 大小,默认8192
  private int maxChunkSize;
  
  //initial line 最大长度,默认 4096
  private int maxInitialLineLength;
  
  //Header 最大大小,默认 8192
  private int maxHeaderSize;
  
  //http2.0最大的并发流,默认100
  private Http2Settings initialSettings;
  
  //支持alpn的版本,默认Http1.1和Http2.0
  private List<HttpVersion> alpnVersions;
  
  //设置连接的窗口大小,默认无限制
  private int http2ConnectionWindowSize;
  
  //是否启用压缩解码
  private boolean decompressionSupported;
  
  //WebSocket Masked位为true。 PerformingUnMasking将是错误的,默认为false
  private boolean acceptUnmaskedFrames;
  
  //默认HttpObjectDecoder的初始缓冲区大小,默认128
  private int decoderInitialBufferSize;
}

备注

1.建立HttpServer,配置最大的idle时间,默认tcpkeepalive配置是false,
  网络故障等造成TCP挥手交互失败从而导致epoll的达到FileMax,阻止后续连接,导致
  服务器无法提供服务; 或者启用keepalive,依靠内核TCP模块去侦测(默认2小时一次).
  可用netstat工具查看应用当前网络状况
  
2.启用HTTP2,使用jetty开源apln-boot jar包,对JDK版本依赖关系强,需下载对应JDK版本的apln;
   或者使用openssl,当前服务环境都需安装,迁移服务麻烦,但是性能稍高.

3.具体 Route和其它HttpServer功能在 Web 模块中, core 模块只是实现Tcp相关、TLS、   Choose vertcile.handlers Scheduling 和codec等.
点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这