SpringCloud升级之路2020.0.x版-39. 改造 resilience4j 粘合 WebClient

干货满满张哈希 等级 56 0 0
标签:

SpringCloud升级之路2020.0.x版-39. 改造 resilience4j 粘合 WebClient

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

要想实现我们上一节中提到的:

  • 需要在重试以及断路中加一些日志,便于日后的优化
  • 需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常
  • 需要在断路器相关的 Operator 中增加类似于 FeignClient 中的负载均衡的数据更新,使得负载均衡更加智能

我们需要将 resilience4j 本身提供的粘合库做一些改造,其实主要就是对 resilience4j 实现的 project reactor 的 Operator 进行改造。

关于断路器的改造

首先,WebClient 的返回对象只可能是 ClientResponse 类型,所以我们这里改造出来的 Operator 不必带上形参,只需要针对 ClientResponse 即可,即:

public class ClientResponseCircuitBreakerOperator implements UnaryOperator<Publisher<ClientResponse>> {
    ...
}

在原有的断路器逻辑中,我们需要加入针对 GET 方法以及之前定义的可以重试的路径匹配配置可以重试的逻辑,这需要我们拿到原有请求的 URL 信息。但是 ClientResponse 中并没有暴露这些信息的接口,其默认实现 DefaultClientResponse(我们只要没有自己给 WebClient 加入特殊的改造逻辑,实现都是 DefaultClientResponse) 中的 request() 方法可以获取请求 HttpRequest,其中包含 url 信息。但是这个类还有方法都是 package-private 的,我们需要反射出来:

ClientResponseCircuitBreakerSubscriber

private static final Class<?> aClass;
private static final Method request;

static {
    try {
        aClass = Class.forName("org.springframework.web.reactive.function.client.DefaultClientResponse");
        request = ReflectionUtils.findMethod(aClass, "request");
        request.setAccessible(true);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

之后,在获取到 ClientResponse 之后记录断路器的逻辑中,需要加入上面提到的关于重试的改造,以及负载均衡器的记录:

ClientResponseCircuitBreakerSubscriber

protected void hookOnNext(ClientResponse clientResponse) {
    if (!isDisposed()) {
        if (singleProducer && successSignaled.compareAndSet(false, true)) {
            int rawStatusCode = clientResponse.rawStatusCode();
            HttpStatus httpStatus = HttpStatus.resolve(rawStatusCode);
            try {
                HttpRequest httpRequest = (HttpRequest) request.invoke(clientResponse);
                //判断方法是否为 GET,以及是否在可重试路径配置中,从而得出是否可以重试
                if (httpRequest.getMethod() != HttpMethod.GET && !webClientProperties.retryablePathsMatch(httpRequest.getURI().getPath())) {
                    //如果不能重试,则直接返回结果
                    circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
                } else {
                    if (httpStatus != null && httpStatus.is2xxSuccessful()) {
                        //如果成功,则直接返回结果
                        circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
                    } else {
                        /**
                         * 如果异常,参考 DefaultClientResponse 的代码进行异常封装
                         * @see org.springframework.web.reactive.function.client.DefaultClientResponse#createException
                         */
                        Exception exception;
                        if (httpStatus != null) {
                            exception = WebClientResponseException.create(rawStatusCode, httpStatus.getReasonPhrase(), clientResponse.headers().asHttpHeaders(), EMPTY, null, null);
                        } else {
                            exception = new UnknownHttpStatusCodeException(rawStatusCode, clientResponse.headers().asHttpHeaders(), EMPTY, null, null);
                        }
                        circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), exception);
                        downstreamSubscriber.onError(exception);
                        return;
                    }
                }
            } catch (Exception e) {
                log.fatal("judge request method in circuit breaker error! the resilience4j feature would not be enabled: {}", e.getMessage(), e);
                circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
            }
        }
        eventWasEmitted.set(true);
        downstreamSubscriber.onNext(clientResponse);
    }
}

同样的,在原有的完成,取消还有失败的记录逻辑中,也加上记录负载均衡数据:

ClientResponseCircuitBreakerSubscriber

@Override
protected void hookOnComplete() {
    if (successSignaled.compareAndSet(false, true)) {
        serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
        circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
    }

    downstreamSubscriber.onComplete();
}

@Override
public void hookOnCancel() {
    if (!successSignaled.get()) {
        serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
        if (eventWasEmitted.get()) {
            circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
        } else {
            circuitBreaker.releasePermission();
        }
    }
}

@Override
protected void hookOnError(Throwable e) {
    serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
    circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);
    downstreamSubscriber.onError(e);
}

粘合 WebClient 与 resilience4j 的同时覆盖重试逻辑

由于前面的断路器中,我们针对可以重试的非 2XX 响应封装成为 WebClientResponseException。所以在重试器中,我们需要加上针对这个异常的重试。

同时,需要将重试器放在负载均衡器之前,因为每次重试,都要从负载均衡器中获取一个新的实例。同时,断路器需要放在负载均衡器之后,因为只有在这个之后,才能获取到本次调用的实例,我们的的断路器是针对实例方法级别的:

WebClientDefaultConfiguration.java

@Bean
public WebClient getWebClient(
        ReactorLoadBalancerExchangeFilterFunction lbFunction,
        WebClientConfigurationProperties webClientConfigurationProperties,
        Environment environment,
        RetryRegistry retryRegistry,
        CircuitBreakerRegistry circuitBreakerRegistry,
        ServiceInstanceMetrics serviceInstanceMetrics
) {
    String name = environment.getProperty(WebClientNamedContextFactory.PROPERTY_NAME);
    Map<String, WebClientConfigurationProperties.WebClientProperties> configs = webClientConfigurationProperties.getConfigs();
    if (configs == null || configs.size() == 0) {
        throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs");
    }
    WebClientConfigurationProperties.WebClientProperties webClientProperties = configs.get(name);
    if (webClientProperties == null) {
        throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs." + name);
    }
    String serviceName = webClientProperties.getServiceName();
    //如果没填写微服务名称,就使用配置 key 作为微服务名称
    if (StringUtils.isBlank(serviceName)) {
        serviceName = name;
    }
    String baseUrl = webClientProperties.getBaseUrl();
    //如果没填写 baseUrl,就使用微服务名称填充
    if (StringUtils.isBlank(baseUrl)) {
        baseUrl = "http://" + serviceName;
    }

    Retry retry = null;
    try {
        retry = retryRegistry.retry(serviceName, serviceName);
    } catch (ConfigurationNotFoundException e) {
        retry = retryRegistry.retry(serviceName);
    }
    //覆盖其中的异常判断
    retry = Retry.of(serviceName, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {
        //WebClientResponseException 会重试,因为在这里能 catch 的 WebClientResponseException 只对可以重试的请求封装了 WebClientResponseException
        //参考 ClientResponseCircuitBreakerSubscriber 的代码
        if (throwable instanceof WebClientResponseException) {
            log.info("should retry on {}", throwable.toString());
            return true;
        }
        //断路器异常重试,因为请求没有发出去
        if (throwable instanceof CallNotPermittedException) {
            log.info("should retry on {}", throwable.toString());
            return true;
        }
        if (throwable instanceof WebClientRequestException) {
            WebClientRequestException webClientRequestException = (WebClientRequestException) throwable;
            HttpMethod method = webClientRequestException.getMethod();
            URI uri = webClientRequestException.getUri();
            //判断是否为响应超时,响应超时代表请求已经发出去了,对于非 GET 并且没有标注可以重试的请求则不能重试
            boolean isResponseTimeout = false;
            Throwable cause = throwable.getCause();
            //netty 的读取超时一般是 ReadTimeoutException
            if (cause instanceof ReadTimeoutException) {
                log.info("Cause is a ReadTimeoutException which indicates it is a response time out");
                isResponseTimeout = true;
            } else {
                //对于其他一些框架,使用了 java 底层 nio 的一般是 SocketTimeoutException,message 为 read time out
                //还有一些其他异常,但是 message 都会有 read time out 字段,所以通过 message 判断
                String message = throwable.getMessage();
                if (StringUtils.isNotBlank(message) && StringUtils.containsIgnoreCase(message.replace(" ", ""), "readtimeout")) {
                    log.info("Throwable message contains readtimeout which indicates it is a response time out");
                    isResponseTimeout = true;
                }
            }
            //如果请求是 GET 或者标注了重试,则直接判断可以重试
            if (method == HttpMethod.GET || webClientProperties.retryablePathsMatch(uri.getPath())) {
                log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
                return true;
            } else {
                //否则,只针对请求还没有发出去的异常进行重试
                if (isResponseTimeout) {
                    log.info("should not retry on {}-{}, {}", method, uri, throwable.toString());
                } else {
                    log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
                    return true;
                }
            }
        }
        return false;
    }).build());


    HttpClient httpClient = HttpClient
            .create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) webClientProperties.getConnectTimeout().toMillis())
            .doOnConnected(connection ->
                    connection
                            .addHandlerLast(new ReadTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
                            .addHandlerLast(new WriteTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
            );

    Retry finalRetry = retry;
    String finalServiceName = serviceName;
    return WebClient.builder()
            .exchangeStrategies(ExchangeStrategies.builder()
            .codecs(configurer -> configurer
                    .defaultCodecs()
                    //最大 body 占用 16m 内存
                    .maxInMemorySize(16 * 1024 * 1024))
            .build())
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            //Retry在负载均衡前
            .filter((clientRequest, exchangeFunction) -> {
                return exchangeFunction
                        .exchange(clientRequest)
                        .transform(ClientResponseRetryOperator.of(finalRetry));
            })
            //负载均衡器,改写url
            .filter(lbFunction)
            //实例级别的断路器需要在负载均衡获取真正地址之后
            .filter((clientRequest, exchangeFunction) -> {
                ServiceInstance serviceInstance = getServiceInstance(clientRequest);
                serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);
                CircuitBreaker circuitBreaker;
                //这时候的url是经过负载均衡器的,是实例的url
                //需要注意的一点是,使用异步 client 的时候,最好不要带路径参数,否则这里的断路器效果不好
                //断路器是每个实例每个路径一个断路器
                String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort() + clientRequest.url().getPath();
                try {
                    //使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
                    circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);
                } catch (ConfigurationNotFoundException e) {
                    circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
                }
                log.info("webclient circuit breaker [{}-{}] status: {}, data: {}", finalServiceName, instancId, circuitBreaker.getState(), JSON.toJSONString(circuitBreaker.getMetrics()));
                return exchangeFunction.exchange(clientRequest).transform(ClientResponseCircuitBreakerOperator.of(circuitBreaker, serviceInstance, serviceInstanceMetrics, webClientProperties));
            }).baseUrl(baseUrl)
            .build();
}

private ServiceInstance getServiceInstance(ClientRequest clientRequest) {
    URI url = clientRequest.url();
    DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();
    defaultServiceInstance.setHost(url.getHost());
    defaultServiceInstance.setPort(url.getPort());
    return defaultServiceInstance;
}

这样,我们就实现了我们封装的基于配置的 WebClient

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

SpringCloud升级之路2020.0.x版-39. 改造 resilience4j 粘合 WebClient

收藏
评论区

相关推荐

2021升级版微服务教程5—通过IDEA运行多个项目实例「模拟集群」
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」 ![](https://oscimg.oschina.net/oscnet/c90af336-21f6-4812-a448-cdce3e5d903a.png) **教程全目录「含视频」**:https://gitee.com/bingqilinpe
2021升级版微服务教程4—Nacos 服务注册和发现
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」 ![](https://oscimg.oschina.net/oscnet/f2a7c1f4-d28b-48a9-b156-11d0a33ad613.png) 默认文件1610014380163 **教程全目录「含视频」**:https://gi
2021升级版微服务教程4—Nacos 服务注册和发现
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」 ![](https://oscimg.oschina.net/oscnet/f2a7c1f4-d28b-48a9-b156-11d0a33ad613.png) 默认文件1610014380163 **教程全目录「含视频」**:https://gi
Spring Cloud Config Server迁移节点或容器化带来的问题 原因,解决
版本 == * springboot 2.0.1.RELEASE * springcloud Finchley.RC1 问题 == 看程序猿 dd 的博客 [http://blog.didispace.com/Spring-Cloud-Config-Server-ip-change-problem/](https://www.oschina.n
Spring5的WebClient使用详解
前言 -- Spring5带来了新的响应式web开发框架WebFlux,同时,也引入了新的HttpClient框架WebClient。WebClient是Spring5中引入的执行 HTTP 请求的非阻塞、反应式客户端。它对同步和异步以及流方案都有很好的支持,WebClient发布后,RestTemplate将在将来版本中弃用,并且不会向前添加主要新功能。
SpringBoot与SpringCloud的版本对应详细版
大版本对应: Spring Boot Spring Cloud 1.2.x Angel版本 1.3.x Brixton版本 1.4.x stripes Camden版本 1.5.x Dalston版本、Edgware版本 2.0.x Finchley版本 2.1.x Greenwich.SR2 在实际开发过程中,我们需要更**详
SpringCloud demo
1.首先创建一个maven项目 ===============   这个maven项目会包含springcloud相关的项目,目录结构如下图:     本项目所有的springcloud版本为Finchley.SR2,对应的springboot的版本为2.0.7.RELEASE。     ![](https://img2018.cnblogs.com/
SpringCloud的版本
Spring Cloud 项目目前仍然是快速迭代期,版本变化很快。这里整理一下版本相关的东西,备忘一下。 大版本 --- ### 版本号规则 Spring Cloud并没有熟悉的数字版本号,而是对应一个开发代号。 Cloud代号 Boot版本(train) Boot版本(tested) lifecycle Angle 1.2.x inco
springCloud Finchley升级记录
最近开发新项目顺便升级 Springcloud Dalston.SR5 到当前最新版 Finchley.SR1 由于 springboot1.5.10到当前最新版 spring2.0.1版本 升级修改比较大,记录一下 首先修改一下springboot cloud 版本号其他不用变 部分配置文件名称修改了 添加一下以下依赖会提示如何迁移 <
springcloud知识点总结
一.SpringCloud面试题口述 1.SpringCloud和Dubbo SpringCloud和Dubbo都是现在主流的微服务架构 SpringCloud是Apache旗下的Spring体系下的微服务解决方案 Dubbo是阿里系的分布式服务治理框架 从技术维度上,其实SpringCloud远远的超过Dubbo,Dubbo本身只是实现
SpringCloud升级之路2020.0.x版-37. 实现异步的客户端封装配置管理的意义与设计
本系列代码地址:https://github.com/JoJoTec/springcloudparent 为何需要封装异步 HTTP 客户端 WebClient对于同步的请求,我们使用 springcloudopenfeign 封装的 FeignClient,并做了额外的定制。对于异步的请求,使用的是异步 Http 客户端即 WebClient。WebCli
SpringCloud升级之路2020.0.x版-38. 实现自定义 WebClient 的 NamedContextFactory
本系列代码地址:https://github.com/JoJoTec/springcloudparent 实现 WeClient 的 NamedContextFactory我们要实现的是不同微服务自动配置装载不同的 WebClient Bean,这样就可以通过 NamedContextFactory 实现。我们先来编写下实现这个 NamedContextFa
SpringCloud升级之路2020.0.x版-39. 改造 resilience4j 粘合 WebClient
本系列代码地址:https://github.com/JoJoTec/springcloudparent要想实现我们上一节中提到的: 需要在重试以及断路中加一些日志,便于日后的优化 需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常 需要在断路器相关的 Operator 中增加类似于 FeignClient
SpringCloud升级之路2020.0.x版-40. spock 单元测试封装的 WebClient(上)
本系列代码地址:https://github.com/JoJoTec/springcloudparent我们来测试下前面封装好的 WebClient,这里开始,我们使用 spock 编写 groovy 单元测试,这种编写出来的单元测试,代码更加简洁,同时更加灵活,我们在接下来的单元测试代码中就能看出来。 编写基于 spock 的 springboot con
SpringCloud升级之路2020.0.x版-40. spock 单元测试封装的 WebClient(下)
本系列代码地址:https://github.com/JoJoTec/springcloudparent我们继续上一节,继续使用 spock 测试我们自己封装的 WebClient 测试针对 readTimeout 重试针对响应超时,我们需要验证重试仅针对可以重试的方法(包括 GET 方法以及配置的可重试方法),针对不可重试的方法没有重试。我们可以通过 sp