聊聊AsyncHttpClient的TimeoutTimerTask

白花蛇
• 阅读 108

本文主要研究一下AsyncHttpClient的TimeoutTimerTask

TimerTask

io/netty/util/TimerTask.java

/**
 * A task which is executed after the delay specified with
 * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
 */
public interface TimerTask {

    /**
     * Executed after the delay specified with
     * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
     *
     * @param timeout a handle which is associated with this task
     */
    void run(Timeout timeout) throws Exception;
}
netty的TimerTask接口定义了run方法,其入参为Timeout

Timeout

io/netty/util/Timeout.java

public interface Timeout {

    /**
     * Returns the {@link Timer} that created this handle.
     */
    Timer timer();

    /**
     * Returns the {@link TimerTask} which is associated with this handle.
     */
    TimerTask task();

    /**
     * Returns {@code true} if and only if the {@link TimerTask} associated
     * with this handle has been expired.
     */
    boolean isExpired();

    /**
     * Returns {@code true} if and only if the {@link TimerTask} associated
     * with this handle has been cancelled.
     */
    boolean isCancelled();

    /**
     * Attempts to cancel the {@link TimerTask} associated with this handle.
     * If the task has been executed or cancelled already, it will return with
     * no side effect.
     *
     * @return True if the cancellation completed successfully, otherwise false
     */
    boolean cancel();
}
Timeout接口定义了timer()、task()、isExpired()、isCancelled()、cancel()方法

TimeoutTimerTask

org/asynchttpclient/netty/timeout/TimeoutTimerTask.java

public abstract class TimeoutTimerTask implements TimerTask {

  private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class);

  protected final AtomicBoolean done = new AtomicBoolean();
  protected final NettyRequestSender requestSender;
  final TimeoutsHolder timeoutsHolder;
  volatile NettyResponseFuture<?> nettyResponseFuture;

  TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder) {
    this.nettyResponseFuture = nettyResponseFuture;
    this.requestSender = requestSender;
    this.timeoutsHolder = timeoutsHolder;
  }

  void expire(String message, long time) {
    LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, time);
    requestSender.abort(nettyResponseFuture.channel(), nettyResponseFuture, new TimeoutException(message));
  }

  /**
   * When the timeout is cancelled, it could still be referenced for quite some time in the Timer. Holding a reference to the future might mean holding a reference to the
   * channel, and heavy objects such as SslEngines
   */
  public void clean() {
    if (done.compareAndSet(false, true)) {
      nettyResponseFuture = null;
    }
  }

  void appendRemoteAddress(StringBuilder sb) {
    InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress();
    sb.append(remoteAddress.getHostName());
    if (!remoteAddress.isUnresolved()) {
      sb.append('/').append(remoteAddress.getAddress().getHostAddress());
    }
    sb.append(':').append(remoteAddress.getPort());
  }
}
TimeoutTimerTask声明实现TimerTask接口,它定义了expire方法,执行requestSender.abort;clean方法来重置done及nettyResponseFuture

ReadTimeoutTimerTask

org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java

public class ReadTimeoutTimerTask extends TimeoutTimerTask {

  private final long readTimeout;

  ReadTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
                       NettyRequestSender requestSender,
                       TimeoutsHolder timeoutsHolder,
                       int readTimeout) {
    super(nettyResponseFuture, requestSender, timeoutsHolder);
    this.readTimeout = readTimeout;
  }

  public void run(Timeout timeout) {

    if (done.getAndSet(true) || requestSender.isClosed())
      return;

    if (nettyResponseFuture.isDone()) {
      timeoutsHolder.cancel();
      return;
    }

    long now = unpreciseMillisTime();

    long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch();
    long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now;

    if (durationBeforeCurrentReadTimeout <= 0L) {
      // idleConnectTimeout reached
      StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to ");
      appendRemoteAddress(sb);
      String message = sb.append(" after ").append(readTimeout).append(" ms").toString();
      long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch();
      expire(message, durationSinceLastTouch);
      // cancel request timeout sibling
      timeoutsHolder.cancel();

    } else {
      done.set(false);
      timeoutsHolder.startReadTimeout(this);
    }
  }
}
ReadTimeoutTimerTask继承了TimeoutTimerTask,其run方法会根据readTimeout及nettyResponseFuture.getLastTouch()计算currentReadTimeoutInstant,然后判断是否已经超时,是则执行expire及timeoutsHolder.cancel(),否则执行timeoutsHolder.startReadTimeout(this)

startReadTimeout

org/asynchttpclient/netty/timeout/TimeoutsHolder.java

  void startReadTimeout(ReadTimeoutTimerTask task) {
    if (requestTimeout == null || (!requestTimeout.isExpired() && readTimeoutValue < (requestTimeoutMillisTime - unpreciseMillisTime()))) {
      // only schedule a new readTimeout if the requestTimeout doesn't happen first
      if (task == null) {
        // first call triggered from outside (else is read timeout is re-scheduling itself)
        task = new ReadTimeoutTimerTask(nettyResponseFuture, requestSender, this, readTimeoutValue);
      }
      this.readTimeout = newTimeout(task, readTimeoutValue);

    } else if (task != null) {
      // read timeout couldn't re-scheduling itself, clean up
      task.clean();
    }
  }
startReadTimeout会判断readTimeoutValue+当前时间是否小于requestTimeoutMillisTime,是则通过newTimeout调度,否则执行task.clean()

RequestTimeoutTimerTask

org/asynchttpclient/netty/timeout/RequestTimeoutTimerTask.java

public class RequestTimeoutTimerTask extends TimeoutTimerTask {

  private final long requestTimeout;

  RequestTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
                                 NettyRequestSender requestSender,
                                 TimeoutsHolder timeoutsHolder,
                                 int requestTimeout) {
    super(nettyResponseFuture, requestSender, timeoutsHolder);
    this.requestTimeout = requestTimeout;
  }

  public void run(Timeout timeout) {

    if (done.getAndSet(true) || requestSender.isClosed())
      return;

    // in any case, cancel possible readTimeout sibling
    timeoutsHolder.cancel();

    if (nettyResponseFuture.isDone())
      return;

    StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Request timeout to ");
    appendRemoteAddress(sb);
    String message = sb.append(" after ").append(requestTimeout).append(" ms").toString();
    long age = unpreciseMillisTime() - nettyResponseFuture.getStart();
    expire(message, age);
  }
}
RequestTimeoutTimerTask继承了TimeoutTimerTask,其run方法在done为true或者requestSender为closed则直接返回,对于nettyResponseFuture.isDone()也直接返回,其余的执行expire方法

TimeoutsHolder

org/asynchttpclient/netty/timeout/TimeoutsHolder.java

public class TimeoutsHolder {

  private final Timeout requestTimeout;
  private final AtomicBoolean cancelled = new AtomicBoolean();
  private final Timer nettyTimer;
  private final NettyRequestSender requestSender;
  private final long requestTimeoutMillisTime;
  private final int readTimeoutValue;
  private volatile Timeout readTimeout;
  private volatile NettyResponseFuture<?> nettyResponseFuture;
  private volatile InetSocketAddress remoteAddress;

  public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) {
    this.nettyTimer = nettyTimer;
    this.nettyResponseFuture = nettyResponseFuture;
    this.requestSender = requestSender;
    this.remoteAddress = originalRemoteAddress;

    final Request targetRequest = nettyResponseFuture.getTargetRequest();

    final int readTimeoutInMs = targetRequest.getReadTimeout();
    this.readTimeoutValue = readTimeoutInMs == 0 ? config.getReadTimeout() : readTimeoutInMs;

    int requestTimeoutInMs = targetRequest.getRequestTimeout();
    if (requestTimeoutInMs == 0) {
      requestTimeoutInMs = config.getRequestTimeout();
    }

    if (requestTimeoutInMs != -1) {
      requestTimeoutMillisTime = unpreciseMillisTime() + requestTimeoutInMs;
      requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, requestSender, this, requestTimeoutInMs), requestTimeoutInMs);
    } else {
      requestTimeoutMillisTime = -1L;
      requestTimeout = null;
    }
  }

  //......
}  
TimeoutsHolder的构造器对于requestTimeoutInMs不为-1的会创建RequestTimeoutTimerTask,然后通过newTimeout进行调度

scheduleRequestTimeout

org/asynchttpclient/netty/request/NettyRequestSender.java

  private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T> future,
                                                             AsyncHandler<T> asyncHandler,
                                                             Channel channel) {

    try {
      asyncHandler.onConnectionPooled(channel);
    } catch (Exception e) {
      LOGGER.error("onConnectionPooled crashed", e);
      abort(channel, future, e);
      return future;
    }

    SocketAddress channelRemoteAddress = channel.remoteAddress();
    if (channelRemoteAddress != null) {
      // otherwise, bad luck, the channel was closed, see bellow
      scheduleRequestTimeout(future, (InetSocketAddress) channelRemoteAddress);
    }

    future.setChannelState(ChannelState.POOLED);
    future.attachChannel(channel, false);

    if (LOGGER.isDebugEnabled()) {
      HttpRequest httpRequest = future.getNettyRequest().getHttpRequest();
      LOGGER.debug("Using open Channel {} for {} '{}'", channel, httpRequest.method(), httpRequest.uri());
    }

    // channelInactive might be called between isChannelValid and writeRequest
    // so if we don't store the Future now, channelInactive won't perform
    // handleUnexpectedClosedChannel
    Channels.setAttribute(channel, future);

    if (Channels.isChannelActive(channel)) {
      writeRequest(future, channel);
    } else {
      // bad luck, the channel was closed in-between
      // there's a very good chance onClose was already notified but the
      // future wasn't already registered
      handleUnexpectedClosedChannel(channel, future);
    }

    return future;
  }

  private void scheduleRequestTimeout(NettyResponseFuture<?> nettyResponseFuture,
                                      InetSocketAddress originalRemoteAddress) {
    nettyResponseFuture.touch();
    TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config,
            originalRemoteAddress);
    nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
  }

  public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {

    NettyRequest nettyRequest = future.getNettyRequest();
    HttpRequest httpRequest = nettyRequest.getHttpRequest();
    AsyncHandler<T> asyncHandler = future.getAsyncHandler();

    // if the channel is dead because it was pooled and the remote server decided to
    // close it,
    // we just let it go and the channelInactive do its work
    if (!Channels.isChannelActive(channel))
      return;

    try {
      if (asyncHandler instanceof TransferCompletionHandler) {
        configureTransferAdapter(asyncHandler, httpRequest);
      }

      boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue()
              && httpRequest.method() != HttpMethod.CONNECT && nettyRequest.getBody() != null;

      if (!future.isHeadersAlreadyWrittenOnContinue()) {
        try {
          asyncHandler.onRequestSend(nettyRequest);
        } catch (Exception e) {
          LOGGER.error("onRequestSend crashed", e);
          abort(channel, future, e);
          return;
        }

        // if the request has a body, we want to track progress
        if (writeBody) {
          // FIXME does this really work??? the promise is for the request without body!!!
          ChannelProgressivePromise promise = channel.newProgressivePromise();
          ChannelFuture f = channel.write(httpRequest, promise);
          f.addListener(new WriteProgressListener(future, true, 0L));
        } else {
          // we can just track write completion
          ChannelPromise promise = channel.newPromise();
          ChannelFuture f = channel.writeAndFlush(httpRequest, promise);
          f.addListener(new WriteCompleteListener(future));
        }
      }

      if (writeBody)
        nettyRequest.getBody().write(channel, future);

      // don't bother scheduling read timeout if channel became invalid
      if (Channels.isChannelActive(channel)) {
        scheduleReadTimeout(future);
      }

    } catch (Exception e) {
      LOGGER.error("Can't write request", e);
      abort(channel, future, e);
    }
  }  
NettyRequestSender的sendRequestWithOpenChannel方法在channelRemoteAddress不为null时会执行scheduleRequestTimeout,创建TimeoutsHolder调度RequestTimeoutTimerTask;其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,若channel还是active的则通过scheduleReadTimeout(future)调度ReadTimeoutTimerTask

小结

AsyncHttpClient的TimeoutTimerTask声明实现了netty的TimerTask接口,它定义了expire方法,执行requestSender.abort;clean方法来重置done及nettyResponseFuture;它有一个抽象子类为TimeoutTimerTask,RequestTimeoutTimerTask及ReadTimeoutTimerTask继承了TimeoutTimerTask;AsyncHttpClient用TimeoutsHolder来封装了这些timeout timer,NettyRequestSender的sendRequestWithOpenChannel方法会触发调度RequestTimeoutTimerTask,而其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,通过scheduleReadTimeout(future)调度ReadTimeoutTimerTask。

可以看到requestTimeoutMillisTime是总的请求时间,它包含了写入数据之后的readTimeoutValue
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
科工人 科工人
4年前
聊聊golang的DDD项目结构
序本文主要研究一下golang的DDD项目结构interfacesfoodappserver/interfacesinterfacesgit:(master)tree.|____fileupload||____fileformat.go||____fileupload.go|____food_handler.go|__
Wesley13 Wesley13
3年前
jmxtrans+influxdb+grafana监控zookeeper实战
序本文主要研究一下如何使用jmxtransinfluxdbgranfa监控zookeeper配置zookeeperjmx在conf目录下新增zookeeperenv.sh,并使用chmodx赋予执行权限,内容如下JMXLOCALONLYfalseJMXDISABLEfals
Wesley13 Wesley13
3年前
redis的HyperLogLog实战
序本文主要研究一下redis的HyperLogLog的用场相关命令pfadd每添加一个元素的复杂度为O(1)127.0.0.1:6379pfadduv0907uid1uid2uid3(integer)1添加元素到HyperLogLog中,如果内部有变动返回1,没有
虾米大王 虾米大王
2年前
java代码073
code073.javapackagepack02;importjava.io.IOException;importjava.io.PrintWriter;importjava.util.UUID;importjavax.servlet.ServletException;importjavax.servlet.annotation.Multip
Wesley13 Wesley13
3年前
HTTP面试题(二):HTTP请求报文和响应报文格式
!(https://oscimg.oschina.net/oscnet/0406894fb1274bee91fc53c84c516576.jpg)看都看了还不点个赞!(https://oscimg.oschina.net/oscnet/095d444dc9a449ee85afd19b00fdf52b.png)!(h
Wesley13 Wesley13
3年前
MYSQL主从同步故障解决(主键重复)
MYSQL主从同步故障解决(主键重复)转载2010年04月05日18:52:00标签:mysql(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fso.csdn.net%2Fso%2Fsearch%2Fs.do%3Fq%
Stella981 Stella981
3年前
RedisTemplate读取slowlog
序本文主要研究一下如何使用RedisTemplate(lettuce类库)读取slowlogmaven<dependency<groupIdorg.springframework.boot</groupId<artifactIdspringbootstarterdata
Stella981 Stella981
3年前
SpringBoot开发案例之整合Dubbo提供者(二)
!00.jpg(https://blog.52itstyle.com/usr/uploads/2017/07/1329278006.jpg)大家有没有注意到,上一篇中提供者,暴露接口的方式?混搭。springboot本身接口实现使用了注解的方式,而Dubbo暴露接口使用的是配置文件的实现方式,即如下:代码importorg.s
Easter79 Easter79
3年前
SpringBoot开发案例之整合Dubbo提供者(二)
!00.jpg(https://blog.52itstyle.com/usr/uploads/2017/07/1329278006.jpg)大家有没有注意到,上一篇中提供者,暴露接口的方式?混搭。springboot本身接口实现使用了注解的方式,而Dubbo暴露接口使用的是配置文件的实现方式,即如下:代码importorg.s