专栏目录
31. FeignClient 实现断路器以及线程隔离限流的思路 15.UnderTow 订制 19.Eureka的服务端设计与配置 5.所有项目的parent与spring-framework-common说明 16.Eureka架构和核心概念 36. 验证断路器正确性 28.OpenFeign的生命周期-进行调用 4.maven依赖回顾以及项目框架结构 40. spock 单元测试封装的 WebClient(上) 38. 实现自定义 WebClient 的 NamedContextFactory 37. 实现异步的客户端封装配置管理的意义与设计 1. 背景 35. 验证线程隔离正确性 29.Spring Cloud OpenFeign 的解析(1) 41. SpringCloudGateway 基本流程讲解(2) 44.避免链路信息丢失做的设计(1) 14.UnderTow AccessLog 配置介绍 40. spock 单元测试封装的 WebClient(下) 17.Eureka的实例配置 7.从Bean到SpringCloud 6.微服务特性相关的依赖说明 12.UnderTow 简介与内部原理 11.Log4j2 监控相关 21.Spring Cloud LoadBalancer简介 20. 启动一个 Eureka Server 集群 23.订制Spring Cloud LoadBalancer 22.Spring Cloud LoadBalancer核心源码 30. FeignClient 实现重试 24.测试Spring Cloud LoadBalancer 8.理解 NamedContextFactory 43.为何 SpringCloudGateway 中会有链路信息丢失 41. SpringCloudGateway 基本流程讲解(1) 39. 改造 resilience4j 粘合 WebClient 44.避免链路信息丢失做的设计(2) 18.Eureka的客户端核心设计和配置 27.OpenFeign的生命周期-创建代理 42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷 25.OpenFeign简介与使用 10.使用Log4j2以及一些核心配置 26.OpenFeign的组件 33. 实现重试、断路器以及线程隔离源码 13.UnderTow 核心配置 32. 改进负载均衡算法 3.Eureka Server 与 API 网关要考虑的问题 45. 实现公共日志记录 2.微服务框架需要考虑的问题 9.如何理解并定制一个Spring Cloud组件 34.验证重试配置正确性

43.为何 SpringCloudGateway 中会有链路信息丢失

干货满满张哈希
• 阅读 384

43.为何 SpringCloudGateway 中会有链路信息丢失

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

在开始编写我们自己的日志 Filter 之前,还有一个问题我想在这里和大家分享,即在 Spring Cloud Gateway 中可能发生链路信息丢失的问题。

主要冲突 - Project Reactor 与 Java Logger MDC 之间的设计冲突

Poject Reactor 是基于异步响应式设计的编程模式的实现,它的主要实现思路是先编写执行链路,最后 sub 执行整个链路。但是链路的每一部分,究竟是哪个线程执行的,是不确定的

Java 的日志框架设计,其上下文 MDC(Mapped Diagnostic Context)信息,是基于线程设计的,其实可以简单理解为一个 ThreadLocal 的 Map。日志的链路信息,是保存在这个 MDC 中的。

这样其实可以看出 Project Reactor 与日志框架的 MDC 默认是不兼容的,只要发生异步线程切换,这个 MDC 就变了。Spring Cloud Sleuth 为此加了很多粘合代码,但是智者千虑必有一失,Project Reactor 应用场景和库也在不断发展和壮大,Spring Cloud Sleuth 也可能会漏掉一些场景导致链路信息丢失。

一种 Spring Cloud Gateway 常见的链路信息丢失的场景

我们编写一个简单的测试项目(项目地址):

引入依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.6</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-sleuth</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <!--log4j2异步日志需要的依赖,所有项目都必须用log4j2和异步日志配置-->
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>${disruptor.version}</version>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2020.0.3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

对所有路径开启 AdaptCachedBodyGlobalFilter:

@Configuration(proxyBeanMethods = false)
public class ApiGatewayConfiguration {
    @Autowired
    private AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter;
    @Autowired
    private GatewayProperties gatewayProperties;

    @PostConstruct
    public void init() {
        gatewayProperties.getRoutes().forEach(routeDefinition -> {
            //对 spring cloud gateway 路由配置中的每个路由都启用 AdaptCachedBodyGlobalFilter 
            EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(new Object(), routeDefinition.getId());
            adaptCachedBodyGlobalFilter.onApplicationEvent(enableBodyCachingEvent);
        });
    }
}

配置(我们只有一个路由,将请求转发到 httpbin.org 这个 http 请求测试网站):

server:
  port: 8181
spring:
  application:
    name: apiGateway
  cloud:
    gateway:
      httpclient:
        connect-timeout: 500
        response-timeout: 60000
      routes:
        - id: first_route
          uri: http://httpbin.org
          predicates:
              - Path=/httpbin/**
          filters:
              - StripPrefix=1

添加两个全局 Filter,一个在 AdaptCachedBodyGlobalFilter 之前,一个在 AdaptCachedBodyGlobalFilter 之后。这两个 Filter 非常简单,只是打一行日志。

@Log4j2
@Component
public class PreLogFilter implements GlobalFilter, Ordered {

    public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() - 1;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("before AdaptCachedBodyGlobalFilter");
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return ORDER;
    }
}

@Log4j2
@Component
public class PostLogFilter implements GlobalFilter, Ordered {

    public static final int ORDER = new AdaptCachedBodyGlobalFilter().getOrder() + 1;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("after AdaptCachedBodyGlobalFilter");
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return ORDER;
    }
}

最后指定 Log4j2 的输出格式中包含链路信息,就像系列文章开头中指定的那样。

启动这个应用,之后访问 http://127.0.0.1:8181/httpbin/anything,查看日志,发现 PostLogFilter 中的日志,没有链路信息了:

2021-09-08 06:32:35.457  INFO [service-apiGateway,51063d6f1fe264d0,51063d6f1fe264d0] [30600] [reactor-http-nio-2][?:]: before AdaptCachedBodyGlobalFilter
2021-09-08 06:32:35.474  INFO [service-apiGateway,,] [30600] [reactor-http-nio-2][?:]: after AdaptCachedBodyGlobalFilter

Spring Cloud Sleuth 是如何增加链路信息

通过系列之前的源码分析,我们知道,在最开始的 TraceWebFilter,我们将 Mono 封装成了一个 MonoWebFilterTrace,它的核心源码是:

@Override
public void subscribe(CoreSubscriber<? super Void> subscriber) {
    Context context = contextWithoutInitialSpan(subscriber.currentContext());
    Span span = findOrCreateSpan(context);
    //将 Span 放入执行上下文中,对于日志其实就是将链路信息放入 org.slf4j.MDC
    //日志的 MDC 一般都是 ThreadLocal 的 Map,对于 Log4j2 的实现类就是 org.apache.logging.log4j.ThreadContext,其核心 contextMap 就是一个基于 ThreadLocal 实现的 Map
    //简单理解就是将链路信息放入一个 ThreadLocal 的 Map 中,每个线程访问自己的 Map 获取链路信息
    try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(span.context())) {
        //将实际的 subscribe 用 Span 所在的 Context 包裹住,结束时关闭 Span
        this.source.subscribe(new WebFilterTraceSubscriber(subscriber, context, span, this));
    }
    //在 scope.close() 之后,会将链路信息从  ThreadLocal 的 Map 中剔除
}

@Override
public Object scanUnsafe(Attr key) {
    if (key == Attr.RUN_STYLE) {
        //执行的方式必须是不能切换线程,也就是同步的
        //因为,日志的链路信息是放在 ThreadLocal 对象中,切换线程,链路信息就没了
        return Attr.RunStyle.SYNC; 
    }
    return super.scanUnsafe(key);
}

WebFilterTraceSubscriber 干了些什么呢?出现异常,以及 http 请求结束的时候,我们可能想将响应信息,异常信息记录进入 Span 中,就是通过这个类封装实现的。

经过 MonoWebFilterTrace 的封装,由于 Spring-WebFlux 处理请求,其实就是封装成我们上面得出的 Mono 之后进行 subscribe 处理的请求,所以这样,整个内部 Mono 的 publish 链路以及 subscribe 链路,就被 WebFilterTraceSubscriber 中的 scope 包裹起来了。只要我们自己不在 GatewayFilter 中转换成某些强制异步的 Mono 或者 Flux 导致切换线程,链路信息是不会丢失的。

为何上面的测试项目中链路信息会丢失

我们来看经过 AdaptCachedBodyGlobalFilter 之后,我们前面拼的 Mono 链路会变成什么样:

return Mono.defer(() ->
    new MonoWebFilterTrace(source, 
        RoutePredicateHandlerMapping.this.lookupRoute(exchange) //根据请求寻找路由
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); //将路由放入 Attributes 中,后面我们还会用到
                    return Mono.just(RoutePredicateHandlerMapping.this.webHandler); //返回 RoutePredicateHandlerMapping 的 FilteringWebHandler
                }).switchIfEmpty( //如果为 Mono.empty(),也就是没找到路由
                    Mono.empty() 
                    .then(Mono.fromRunnable(() -> { //返回 Mono.empty() 之后,记录日志
                        if (logger.isTraceEnabled()) {
                            logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })))
            .switchIfEmpty(DispatcherHandler.this.createNotFoundError()) //如果没有返回不为 Mono.empty() 的 handlerMapping,则直接返回 404
            .then(
                Mono.defer(() -> {
                    //省略在 AdaptCachedBodyGlobalFilter 前面的链路嵌套
                    //读取 Body,由于 TCP 拆包,所以需要他们拼接到一起
                    DataBufferUtils.join(exchange.getRequest().getBody())
                        //如果没有 Body,则直接返回空 DataBuffer
                        .defaultIfEmpty(factory.wrap(new EmptyByteBuf(factory.getByteBufAllocator())))
                        //decorate方法中将 dataBuffer 放入 exchange 的 Attributes 列表,只是为了防止重复进入这个 `AdaptCachedBodyGlobalFilter` 的情况导致重复缓存请求 Body
                        //之后,使用新的 body 以及原始请求封装成新的请求,继续 GatewayFilters 链路
                        .map(dataBuffer -> decorate(exchange, dataBuffer, cacheDecoratedRequest))
                        .switchIfEmpty(Mono.just(exchange.getRequest())).flatMap(function);
                })
                .then(Mono.empty()))
            ), //调用对应的 Handler
    TraceWebFilter.this.isTracePresent(), TraceWebFilter.this, TraceWebFilter.this.spanFromContextRetriever()).transformDeferred((call) -> {
        //MetricsWebFilter 相关的处理,在前面的代码中给出了,这里省略
    });
);

其中 DataBufferUtils.join(exchange.getRequest().getBody()) 其实是一个 FluxReceive,这里我们可以理解为:提交一个尝试读取请求 Body 的任务,将之后的 GatewayFilter 的链路处理加到在读取完 Body 之后的回调当中,提交这个任务后,立刻返回。这么看可能比较复杂,我们用一个类似的例子类比下:

//首先我们创建一个新的 Span
Span span = tracer.newTrace();
//声明一个类似于 TraceWebFilter 中封装的 MonoWebFilterTrace 的 MonoOperator
class MonoWebFilterTrace<T> extends MonoOperator<T, T> {
    protected MonoWebFilterTrace(Mono<? extends T> source) {
        super(source);
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        //将 subscribe 用 span 包裹
        try (Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span)) {
            source.subscribe(actual);
            //在将要关闭 spanInScope 的时候(即从 ThreadLocal 的 Map 中移除链路信息),打印日志
            log.info("stopped");
        }
    }
}

Mono.defer(() -> new MonoWebFilterTrace(
        Mono.fromRunnable(() -> {
            log.info("first");
        })
        //模拟 FluxReceive
        .then(Mono.delay(Duration.ofSeconds(1))
        .doOnSuccess(longSignal -> log.info(longSignal))))
).subscribe(aLong -> log.info(aLong));

Mono.delay 和 FluxReceive 表现类似,都是异步切换线程池执行。执行上面的代码,我们可以从日志上面就能看出来:

2021-09-08 07:12:45.236  INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: first
2021-09-08 07:12:45.240  INFO [service-apiGateway,7b2f5c190e1406cb,7b2f5c190e1406cb] [31868] [reactor-http-nio-2][?:]: stopped
2021-09-08 07:12:46.241  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: doOnEach_onNext(0)
2021-09-08 07:12:46.242  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: onComplete()
2021-09-08 07:12:46.242  INFO [service-apiGateway,,] [31868] [parallel-1][?:]: 0

在 Spring Cloud Gateway 中,Request Body 的 FluxReceive 使用的线程池和调用 GatewayFilter 的是同一个线程池,所以可能线程还是同一个,但是由于 Span 已经结束,从 ThreadLocal 的 Map 中已经移除了链路信息,所以日志中还是没有链路信息。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

43.为何 SpringCloudGateway 中会有链路信息丢失

点赞
收藏
评论区
推荐文章

暂无数据