专栏目录
37. 实现异步的客户端封装配置管理的意义与设计 44.避免链路信息丢失做的设计(2) 45. 实现公共日志记录 42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷 41. SpringCloudGateway 基本流程讲解(1) 4.maven依赖回顾以及项目框架结构 39. 改造 resilience4j 粘合 WebClient 13.UnderTow 核心配置 5.所有项目的parent与spring-framework-common说明 30. FeignClient 实现重试 35. 验证线程隔离正确性 11.Log4j2 监控相关 19.Eureka的服务端设计与配置 23.订制Spring Cloud LoadBalancer 38. 实现自定义 WebClient 的 NamedContextFactory 28.OpenFeign的生命周期-进行调用 14.UnderTow AccessLog 配置介绍 41. SpringCloudGateway 基本流程讲解(2) 21.Spring Cloud LoadBalancer简介 40. spock 单元测试封装的 WebClient(下) 20. 启动一个 Eureka Server 集群 17.Eureka的实例配置 3.Eureka Server 与 API 网关要考虑的问题 15.UnderTow 订制 18.Eureka的客户端核心设计和配置 22.Spring Cloud LoadBalancer核心源码 44.避免链路信息丢失做的设计(1) 16.Eureka架构和核心概念 40. spock 单元测试封装的 WebClient(上) 8.理解 NamedContextFactory 9.如何理解并定制一个Spring Cloud组件 26.OpenFeign的组件 33. 实现重试、断路器以及线程隔离源码 27.OpenFeign的生命周期-创建代理 10.使用Log4j2以及一些核心配置 43.为何 SpringCloudGateway 中会有链路信息丢失 29.Spring Cloud OpenFeign 的解析(1) 24.测试Spring Cloud LoadBalancer 1. 背景 25.OpenFeign简介与使用 12.UnderTow 简介与内部原理 7.从Bean到SpringCloud 2.微服务框架需要考虑的问题 6.微服务特性相关的依赖说明 32. 改进负载均衡算法 34.验证重试配置正确性 31. FeignClient 实现断路器以及线程隔离限流的思路 36. 验证断路器正确性

44.避免链路信息丢失做的设计(2)

unknown
• 阅读 734

44.避免链路信息丢失做的设计(2)

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

我们在这一节我们将继续讲解避免链路信息丢失做的设计,主要针对获取到现有 Span 之后,如何保证每个 GlobalFilter 都能保持链路信息。首先,我们自定义 Reactor 的核心 Publisher 即 Mono 和 Flux 的工厂,将链路信息封装进去,保证由这个工厂生成的 Mono 和 Flux,都是只要是这个工厂生成的 Mono 和 Flux 之间无论怎么拼接都会保持链路信息的:

自定义 Mono 和 Flux 的工厂

公共 Subscriber 封装,将 reactor Subscriber 的所有关键接口,都检查当前上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则直接执行即可。

public class TracedCoreSubscriber<T> implements Subscriber<T>{
    private final Subscriber<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void onSubscribe(Subscription s) {
        executeWithinScope(() -> {
            delegate.onSubscribe(s);
        });
    }

    @Override
    public void onError(Throwable t) {
        executeWithinScope(() -> {
            delegate.onError(t);
        });
    }

    @Override
    public void onComplete() {
        executeWithinScope(() -> {
            delegate.onComplete();
        });
    }

    @Override
    public void onNext(T o) {
        executeWithinScope(() -> {
            delegate.onNext(o);
        });
    }

    private void executeWithinScope(Runnable runnable) {
        //如果当前没有链路信息,强制包裹
        if (tracer.currentSpan() == null) {
            try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {
                runnable.run();
            }
        } else {
            //如果当前已有链路信息,则直接执行
            runnable.run();
        }
    }
}

之后分别定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:

public class TracedFlux<T> extends Flux<T> {
    private final Flux<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
    }
}

public class TracedMono<T> extends Mono<T> {
    private final Mono<T> delegate;
    private final Tracer tracer;
    private final CurrentTraceContext currentTraceContext;
    private final Span span;

    TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.currentTraceContext = currentTraceContext;
        this.span = span;
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
    }
}

定义工厂类,使用请求 ServerWebExchange 和原始 Flux 创建 TracedFlux,以及使用请求 ServerWebExchange 和原始 Mono 创建 TracedMono,并且 Span 是通过 Attributes 获取的,根据前文的源码分析我们知道,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。由于我们只在 GatewayFilter 中使用,一定在 TraceWebFilter 之后 所以这个 Attribute 一定存在。

@Component
public class TracedPublisherFactory {
    protected static final String TRACE_REQUEST_ATTR = Span.class.getName();

    @Autowired
    private Tracer tracer;
    @Autowired
    private CurrentTraceContext currentTraceContext;

    public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {
        return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
    }

    public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {
        return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
    }
}

公共抽象 GlobalFilter - CommonTraceFilter

我们编写所有我们后面要实现的 GlobalFilter 的抽象类,这个抽象类的主要功能是:

  • 保证继承这个抽象类的 GlobalFilter 本身以及拼接的链路中,是有链路信息的,其实就是保证它 filter 返回的 Mono 是由我们上面实现的 Factory 生成的即可。
  • 不同 GlobalFilter 之间需要排序,有顺序的执行,这个通过实现 Ordered 接口即可
package com.github.jojotech.spring.cloud.apigateway.filter;

import com.github.jojotech.spring.cloud.apigateway.common.TraceWebFilterUtil;
import com.github.jojotech.spring.cloud.apigateway.common.TracedPublisherFactory;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.sleuth.CurrentTraceContext;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.core.Ordered;
import org.springframework.web.server.ServerWebExchange;


/**
 * 所有 filter 的子类
 * 主要保证 span 的完整性,在某些情况下,span 会半途停止,导致日志中没有 traceId 和 spanId
 * 参考:https://github.com/spring-cloud/spring-cloud-sleuth/issues/2004
 */
public abstract class AbstractTracedFilter implements GlobalFilter, Ordered {
    @Autowired
    protected Tracer tracer;
    @Autowired
    protected TracedPublisherFactory tracedPublisherFactory;
    @Autowired
    protected CurrentTraceContext currentTraceContext;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Mono<Void> traced;
        if (tracer.currentSpan() == null) {
            try (CurrentTraceContext.Scope scope = this.currentTraceContext
                    .maybeScope(((Span) exchange.getAttributes().get(TraceWebFilterUtil.TRACE_REQUEST_ATTR))
                            .context())) {
                traced = traced(exchange, chain);
            }
        }
        else {
            //如果当前已有链路信息,则直接执行
            traced = traced(exchange, chain);
        }
        return tracedPublisherFactory.getTracedMono(traced, exchange);
    }

    protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);
}

这样,我们就可以基于这个抽象类去实现需要定制的 GlobalFilter 了

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

44.避免链路信息丢失做的设计(2)

点赞
收藏
评论区
推荐文章

暂无数据

unknown
unknown
Lv1
男 · rrrr · rrrrrrrr
rrrrr
文章
0
粉丝
17
获赞
0
热门文章

暂无数据