SpringCloud升级之路2020.0.x版-38. 实现自定义 WebClient 的 NamedContextFactory

干货满满张哈希 等级 57 0 0
标签:

SpringCloud升级之路2020.0.x版-38. 实现自定义 WebClient 的 NamedContextFactory

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

实现 WeClient 的 NamedContextFactory

我们要实现的是不同微服务自动配置装载不同的 WebClient Bean,这样就可以通过 NamedContextFactory 实现。我们先来编写下实现这个 NamedContextFactory 整个的加载流程的代码,其结构图如下所示:

SpringCloud升级之路2020.0.x版-38. 实现自定义 WebClient 的 NamedContextFactory

spring.factories

# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.jojotech.spring.cloud.webflux.auto.WebClientAutoConfiguration

在 spring.factories 定义了自动装载的自动配置类 WebClientAutoConfiguration

WebClientAutoConfiguration

@Import(WebClientConfiguration.class)
@Configuration(proxyBeanMethods = false)
public class WebClientAutoConfiguration {
}

WebClientAutoConfiguration 这个自动配置类 Import 了 WebClientConfiguration

WebClientConfiguration

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(WebClientConfigurationProperties.class)
public class WebClientConfiguration {
    @Bean
    public WebClientNamedContextFactory getWebClientNamedContextFactory() {
        return new WebClientNamedContextFactory();
    }
}

WebClientConfiguration 中创建了 WebClientNamedContextFactory 这个 NamedContextFactory 的 Bean。在这个 NamedContextFactory 中,定义了默认配置 WebClientDefaultConfiguration。在这个默认配置中,主要是给每个微服务都定义了一个 WebClient

定义 WebClient 的配置类

我们编写下上一节定义的配置,包括:

  • 微服务名称
  • 微服务地址,服务地址,不填写则为 http://微服务名称
  • 连接超时,使用 Duration,这样我们可以用更直观的配置了,例如 5ms,6s,7m 等等
  • 响应超时,使用 Duration,这样我们可以用更直观的配置了,例如 5ms,6s,7m 等等
  • 可以重试的路径,默认只对 GET 方法重试,通过这个配置增加针对某些非 GET 方法的路径的重试;同时,这些路径可以使用 * 等路径匹配符,即 Spring 中的 AntPathMatcher 进行路径匹配多个路径。例如 /query/order/**

WebClientConfigurationProperties

@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "webclient")
public class WebClientConfigurationProperties {
    private Map<String, WebClientProperties> configs;
    @Data
    @NoArgsConstructor
    public static class WebClientProperties {
        private static AntPathMatcher antPathMatcher = new AntPathMatcher();
        private Cache<String, Boolean> retryablePathsMatchResult = Caffeine.newBuilder().build();
        /**
         * 服务地址,不填写则为 http://serviceName
         */
        private String baseUrl;
        /**
         * 微服务名称,不填写就是 configs 这个 map 的 key
         */
        private String serviceName;
        /**
         * 可以重试的路径,默认只对 GET 方法重试,通过这个配置增加针对某些非 GET 方法的路径的重试
         */
        private List<String> retryablePaths;
        /**
         * 连接超时
         */
        private Duration connectTimeout = Duration.ofMillis(500);
        /**
         * 响应超时
         */
        private Duration responseTimeout = Duration.ofSeconds(8);

        /**
         * 是否匹配
         * @param path
         * @return
         */
        public boolean retryablePathsMatch(String path) {
            if (CollectionUtils.isEmpty(retryablePaths)) {
                return false;
            }
            return retryablePathsMatchResult.get(path, k -> {
                return retryablePaths.stream().anyMatch(pattern -> antPathMatcher.match(pattern, path));
            });
        }
    }
}

粘合 WebClient 与 resilience4j

接下来粘合 WebClient 与 resilience4j 实现断路器以及重试逻辑,WebClient 基于 project-reactor 实现,resilience4j 官方提供了与 project-reactor 的粘合库:

<!--粘合 project-reactor 与 resilience4j,这个在异步场景经常会用到-->
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
</dependency>

参考官方文档,我们可以像下面这样给普通的 WebClient 增加相关组件:

增加重试器

//由于还是在前面弄好的 spring-cloud 环境下,所以还是可以这样获取配置对应的 retry
Retry retry;
try {
    retry = retryRegistry.retry(name, name);
} catch (ConfigurationNotFoundException e) {
    retry = retryRegistry.retry(name);
}

Retry finalRetry = retry;
WebClient.builder().filter((clientRequest, exchangeFunction) -> {
    return exchangeFunction.exchange(clientRequest)
        //核心就是加入 RetryOperator
        .transform(RetryOperator.of(finalRetry));
})

这个 RetryOperator 其实就是使用了 project-reactor 中的 retryWhen 方法实现了 resilience4j 的 retry 机制:

RetryOperator

@Override
public Publisher<T> apply(Publisher<T> publisher) {
    //对于 mono 的处理
    if (publisher instanceof Mono) {
        Context<T> context = new Context<>(retry.asyncContext());
        Mono<T> upstream = (Mono<T>) publisher;
        return upstream.doOnNext(context::handleResult)
            .retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors)))
            .doOnSuccess(t -> context.onComplete());
    } else if (publisher instanceof Flux) {
        //对于 flux 的处理
        Context<T> context = new Context<>(retry.asyncContext());
        Flux<T> upstream = (Flux<T>) publisher;
        return upstream.doOnNext(context::handleResult)
            .retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors)))
            .doOnComplete(context::onComplete);
    } else {
        //不可能是 mono 或者 flux 以外的其他的
        throw new IllegalPublisherException(publisher);
    }
}

可以看出,其实主要填充了:

  • doOnNext(context::handleResult): 在有响应之后调用,将响应结果传入 retry 的 Context,判断是否需要重试以及重试间隔是多久,并且抛出异常 RetryDueToResultException
  • retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors))):捕捉异常 RetryDueToResultException,根据其中的间隔时间,返回 reactor 的重试间隔:Mono.delay(Duration.ofMillis(waitDurationMillis))
  • doOnComplete(context::onComplete):请求完成,没有异常之后,调用 retry 的 complete 进行清理

增加断路器

//由于还是在前面弄好的 spring-cloud 环境下,所以还是可以这样获取配置对应的 circuitBreaker
CircuitBreaker circuitBreaker;
try {
    circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);
} catch (ConfigurationNotFoundException e) {
    circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
}

CircuitBreaker finalCircuitBreaker = circuitBreaker;
WebClient.builder().filter((clientRequest, exchangeFunction) -> {
    return exchangeFunction.exchange(clientRequest)
        //核心就是加入 CircuitBreakerOperator
        .transform(CircuitBreakerOperator.of(finalCircuitBreaker));
})

类似的,CircuitBreakerOperator 其实也是粘合断路器与 reactor 的 publisher 中的一些 stage 方法,将结果的成功或者失败记录入断路器,这里需要注意,可能有的链路能走到 onNext,可能有的链路能走到 onComplete,也有可能都走到,所以这两个方法都要记录成功,并且保证只记录一次

CircuitBreakerSubscriber

class CircuitBreakerSubscriber<T> extends AbstractSubscriber<T> {

    private final CircuitBreaker circuitBreaker;

    private final long start;
    private final boolean singleProducer;

    private final AtomicBoolean successSignaled = new AtomicBoolean(false);
    private final AtomicBoolean eventWasEmitted = new AtomicBoolean(false);

    protected CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
        CoreSubscriber<? super T> downstreamSubscriber,
        boolean singleProducer) {
        super(downstreamSubscriber);
        this.circuitBreaker = requireNonNull(circuitBreaker);
        this.singleProducer = singleProducer;
        this.start = circuitBreaker.getCurrentTimestamp();
    }

    @Override
    protected void hookOnNext(T value) {
        if (!isDisposed()) {
             //正常完成时,断路器也标记成功,因为可能会触发多次(因为 onComplete 也会记录),所以需要 successSignaled 标记只记录一次
            if (singleProducer && successSignaled.compareAndSet(false, true)) {
                circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), value);
            }
            //标记事件已经发出,就是已经执行完 WebClient 的请求,后面判断取消的时候会用到
            eventWasEmitted.set(true);

            downstreamSubscriber.onNext(value);
        }
    }

    @Override
    protected void hookOnComplete() {
        //正常完成时,断路器也标记成功,因为可能会触发多次(因为 onNext 也会记录),所以需要 successSignaled 标记只记录一次
        if (successSignaled.compareAndSet(false, true)) {
            circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
        }

        downstreamSubscriber.onComplete();
    }

    @Override
    public void hookOnCancel() {
        if (!successSignaled.get()) {
            //如果事件已经发出,那么也记录成功
            if (eventWasEmitted.get()) {
                circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
            } else {
                //否则取消
                circuitBreaker.releasePermission();
            }
        }
    }

    @Override
    protected void hookOnError(Throwable e) {
        //记录失败
        circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);
        downstreamSubscriber.onError(e);
    }
}

我们会使用这个库进行粘合,但是不会直接使用上面的代码,因为考虑到:

  • 需要在重试以及断路中加一些日志,便于日后的优化
  • 需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常
  • 需要在断路器相关的 Operator 中增加类似于 FeignClient 中的负载均衡的数据更新,使得负载均衡更加智能

在下面一节我们会详细说明我们是如何实现的有断路器以及重试逻辑和负载均衡数据更新的 WebClient。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

SpringCloud升级之路2020.0.x版-38. 实现自定义 WebClient 的 NamedContextFactory

收藏
评论区

相关推荐

SpringCloud升级之路2020.0.x版-8.理解 NamedContextFactory
本系列为之前系列的整理重启版,随着项目的发展以及项目中的使用,之前系列里面很多东西发生了变化,并且还有一些东西之前系列并没有提到,所以重启这个系列重新整理下,欢迎各位留言交流,谢谢!springcloudcommons 中参考了 springcloudnetflix 的设计,引入了 NamedContextFactory 机制,一般用于对于不同微服务的客户端
2021升级版微服务教程5—通过IDEA运行多个项目实例「模拟集群」
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」 ![](https://oscimg.oschina.net/oscnet/c90af336-21f6-4812-a448-cdce3e5d903a.png) **教程全目录「含视频」**:https://gitee.com/bingqilinpe
2021升级版微服务教程4—Nacos 服务注册和发现
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」 ![](https://oscimg.oschina.net/oscnet/f2a7c1f4-d28b-48a9-b156-11d0a33ad613.png) 默认文件1610014380163 **教程全目录「含视频」**:https://gi
2021升级版微服务教程4—Nacos 服务注册和发现
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」 ![](https://oscimg.oschina.net/oscnet/f2a7c1f4-d28b-48a9-b156-11d0a33ad613.png) 默认文件1610014380163 **教程全目录「含视频」**:https://gi
Spring Cloud Config Server迁移节点或容器化带来的问题 原因,解决
版本 == * springboot 2.0.1.RELEASE * springcloud Finchley.RC1 问题 == 看程序猿 dd 的博客 [http://blog.didispace.com/Spring-Cloud-Config-Server-ip-change-problem/](https://www.oschina.n
Spring5的WebClient使用详解
前言 -- Spring5带来了新的响应式web开发框架WebFlux,同时,也引入了新的HttpClient框架WebClient。WebClient是Spring5中引入的执行 HTTP 请求的非阻塞、反应式客户端。它对同步和异步以及流方案都有很好的支持,WebClient发布后,RestTemplate将在将来版本中弃用,并且不会向前添加主要新功能。
SpringBoot与SpringCloud的版本对应详细版
大版本对应: Spring Boot Spring Cloud 1.2.x Angel版本 1.3.x Brixton版本 1.4.x stripes Camden版本 1.5.x Dalston版本、Edgware版本 2.0.x Finchley版本 2.1.x Greenwich.SR2 在实际开发过程中,我们需要更**详
SpringBoot与SpringCloud的版本对应详细版
大版本对应: Spring Boot Spring Cloud 1.2.x Angel版本 1.3.x Brixton版本 1.4.x stripes Camden版本 1.5.x Dalston版本、Edgware版本 2.0.x Finchley版本 2.1.x Greenwich.SR2 在实际开发过程中,我们需要更**详
SpringCloud demo
1.首先创建一个maven项目 ===============   这个maven项目会包含springcloud相关的项目,目录结构如下图:     本项目所有的springcloud版本为Finchley.SR2,对应的springboot的版本为2.0.7.RELEASE。     ![](https://img2018.cnblogs.com/
SpringCloud的版本
Spring Cloud 项目目前仍然是快速迭代期,版本变化很快。这里整理一下版本相关的东西,备忘一下。 大版本 --- ### 版本号规则 Spring Cloud并没有熟悉的数字版本号,而是对应一个开发代号。 Cloud代号 Boot版本(train) Boot版本(tested) lifecycle Angle 1.2.x inco
SpringCloud升级之路2020.0.x版-37. 实现异步的客户端封装配置管理的意义与设计
本系列代码地址:https://github.com/JoJoTec/springcloudparent 为何需要封装异步 HTTP 客户端 WebClient对于同步的请求,我们使用 springcloudopenfeign 封装的 FeignClient,并做了额外的定制。对于异步的请求,使用的是异步 Http 客户端即 WebClient。WebCli
SpringCloud升级之路2020.0.x版-38. 实现自定义 WebClient 的 NamedContextFactory
本系列代码地址:https://github.com/JoJoTec/springcloudparent 实现 WeClient 的 NamedContextFactory我们要实现的是不同微服务自动配置装载不同的 WebClient Bean,这样就可以通过 NamedContextFactory 实现。我们先来编写下实现这个 NamedContextFa
SpringCloud升级之路2020.0.x版-39. 改造 resilience4j 粘合 WebClient
本系列代码地址:https://github.com/JoJoTec/springcloudparent要想实现我们上一节中提到的: 需要在重试以及断路中加一些日志,便于日后的优化 需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常 需要在断路器相关的 Operator 中增加类似于 FeignClient
SpringCloud升级之路2020.0.x版-40. spock 单元测试封装的 WebClient(上)
本系列代码地址:https://github.com/JoJoTec/springcloudparent我们来测试下前面封装好的 WebClient,这里开始,我们使用 spock 编写 groovy 单元测试,这种编写出来的单元测试,代码更加简洁,同时更加灵活,我们在接下来的单元测试代码中就能看出来。 编写基于 spock 的 springboot con
SpringCloud升级之路2020.0.x版-40. spock 单元测试封装的 WebClient(下)
本系列代码地址:https://github.com/JoJoTec/springcloudparent我们继续上一节,继续使用 spock 测试我们自己封装的 WebClient 测试针对 readTimeout 重试针对响应超时,我们需要验证重试仅针对可以重试的方法(包括 GET 方法以及配置的可重试方法),针对不可重试的方法没有重试。我们可以通过 sp