专栏目录
31. FeignClient 实现断路器以及线程隔离限流的思路 15.UnderTow 订制 19.Eureka的服务端设计与配置 5.所有项目的parent与spring-framework-common说明 16.Eureka架构和核心概念 36. 验证断路器正确性 28.OpenFeign的生命周期-进行调用 4.maven依赖回顾以及项目框架结构 40. spock 单元测试封装的 WebClient(上) 38. 实现自定义 WebClient 的 NamedContextFactory 37. 实现异步的客户端封装配置管理的意义与设计 1. 背景 35. 验证线程隔离正确性 29.Spring Cloud OpenFeign 的解析(1) 41. SpringCloudGateway 基本流程讲解(2) 44.避免链路信息丢失做的设计(1) 14.UnderTow AccessLog 配置介绍 40. spock 单元测试封装的 WebClient(下) 17.Eureka的实例配置 7.从Bean到SpringCloud 6.微服务特性相关的依赖说明 12.UnderTow 简介与内部原理 11.Log4j2 监控相关 20. 启动一个 Eureka Server 集群 21.Spring Cloud LoadBalancer简介 23.订制Spring Cloud LoadBalancer 22.Spring Cloud LoadBalancer核心源码 30. FeignClient 实现重试 24.测试Spring Cloud LoadBalancer 8.理解 NamedContextFactory 43.为何 SpringCloudGateway 中会有链路信息丢失 41. SpringCloudGateway 基本流程讲解(1) 39. 改造 resilience4j 粘合 WebClient 44.避免链路信息丢失做的设计(2) 18.Eureka的客户端核心设计和配置 27.OpenFeign的生命周期-创建代理 25.OpenFeign简介与使用 42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷 10.使用Log4j2以及一些核心配置 26.OpenFeign的组件 33. 实现重试、断路器以及线程隔离源码 13.UnderTow 核心配置 32. 改进负载均衡算法 3.Eureka Server 与 API 网关要考虑的问题 45. 实现公共日志记录 2.微服务框架需要考虑的问题 9.如何理解并定制一个Spring Cloud组件 34.验证重试配置正确性

44.避免链路信息丢失做的设计(2)

干货满满张哈希
• 阅读 430

44.避免链路信息丢失做的设计(2)

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

我们在这一节我们将继续讲解避免链路信息丢失做的设计,主要针对获取到现有 Span 之后,如何保证每个 GlobalFilter 都能保持链路信息。首先,我们自定义 Reactor 的核心 Publisher 即 Mono 和 Flux 的工厂,将链路信息封装进去,保证由这个工厂生成的 Mono 和 Flux,都是只要是这个工厂生成的 Mono 和 Flux 之间无论怎么拼接都会保持链路信息的:

自定义 Mono 和 Flux 的工厂

公共 Subscriber 封装,将 reactor Subscriber 的所有关键接口,都检查当前上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则直接执行即可。

public class TracedCoreSubscriber<T> implements Subscriber<T>{
    private final Subscriber<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void onSubscribe(Subscription s) {
        executeWithinScope(() -> {
            delegate.onSubscribe(s);
        });
    }

    @Override
    public void onError(Throwable t) {
        executeWithinScope(() -> {
            delegate.onError(t);
        });
    }

    @Override
    public void onComplete() {
        executeWithinScope(() -> {
            delegate.onComplete();
        });
    }

    @Override
    public void onNext(T o) {
        executeWithinScope(() -> {
            delegate.onNext(o);
        });
    }

    private void executeWithinScope(Runnable runnable) {
        //如果当前没有链路信息,强制包裹
        if (tracer.currentSpan() == null) {
            try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {
                runnable.run();
            }
        } else {
            //如果当前已有链路信息,则直接执行
            runnable.run();
        }
    }
}

之后分别定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:

public class TracedFlux<T> extends Flux<T> {
    private final Flux<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
    }
}

public class TracedMono<T> extends Mono<T> {
    private final Mono<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
    }
}

定义工厂类,使用请求 ServerWebExchange 和原始 Flux 创建 TracedFlux,以及使用请求 ServerWebExchange 和原始 Mono 创建 TracedMono,并且 Span 是通过 Attributes 获取的,根据前文的源码分析我们知道,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。由于我们只在 GatewayFilter 中使用,一定在 TraceWebFilter 之后 所以这个 Attribute 一定存在。

@Component
public class TracedPublisherFactory {
    protected static final String TRACE_REQUEST_ATTR = Span.class.getName();

    @Autowired
    private Tracer tracer;
    @Autowired
    private CurrentTraceContext currentTraceContext;

    public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {
        return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
    }

    public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {
        return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
    }
}

公共抽象 GlobalFilter - CommonTraceFilter

我们编写所有我们后面要实现的 GlobalFilter 的抽象类,这个抽象类的主要功能是:

  • 保证继承这个抽象类的 GlobalFilter 本身以及拼接的链路中,是有链路信息的,其实就是保证它 filter 返回的 Mono 是由我们上面实现的 Factory 生成的即可。
  • 不同 GlobalFilter 之间需要排序,有顺序的执行,这个通过实现 Ordered 接口即可
package com.github.jojotech.spring.cloud.apigateway.filter;

import com.github.jojotech.spring.cloud.apigateway.common.TraceWebFilterUtil;
import com.github.jojotech.spring.cloud.apigateway.common.TracedPublisherFactory;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.core.Ordered;
import org.springframework.web.server.ServerWebExchange;


/**
 * 所有 filter 的子类
 * 主要保证 span 的完整性,在某些情况下,span 会半途停止,导致日志中没有 traceId 和 spanId
 * 参考:https://github.com/spring-cloud/spring-cloud-sleuth/issues/2004
 */
public abstract class AbstractTracedFilter implements GlobalFilter, Ordered {
    @Autowired
    protected Tracer tracer;
    @Autowired
    protected TracedPublisherFactory tracedPublisherFactory;
    @Autowired
    protected CurrentTraceContext currentTraceContext;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Mono<Void> traced;
        if (tracer.currentSpan() == null) {
            try (CurrentTraceContext.Scope scope = this.currentTraceContext
                    .maybeScope(((Span) exchange.getAttributes().get(TraceWebFilterUtil.TRACE_REQUEST_ATTR))
                            .context())) {
                traced = traced(exchange, chain);
            }
        }
        else {
            //如果当前已有链路信息,则直接执行
            traced = traced(exchange, chain);
        }
        return tracedPublisherFactory.getTracedMono(traced, exchange);
    }

    protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);
}

这样,我们就可以基于这个抽象类去实现需要定制的 GlobalFilter 了

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

44.避免链路信息丢失做的设计(2)

点赞
收藏
评论区
推荐文章

暂无数据