专栏目录
17.Eureka的实例配置 18.Eureka的客户端核心设计和配置 1. 背景 2.微服务框架需要考虑的问题 3.Eureka Server 与 API 网关要考虑的问题 4.maven依赖回顾以及项目框架结构 5.所有项目的parent与spring-framework-common说明 6.微服务特性相关的依赖说明 19.Eureka的服务端设计与配置 7.从Bean到SpringCloud 8.理解 NamedContextFactory 9.如何理解并定制一个Spring Cloud组件 10.使用Log4j2以及一些核心配置 11.Log4j2 监控相关 12.UnderTow 简介与内部原理 13.UnderTow 核心配置 14.UnderTow AccessLog 配置介绍 15.UnderTow 订制 16.Eureka架构和核心概念 20. 启动一个 Eureka Server 集群 21.Spring Cloud LoadBalancer简介 22.Spring Cloud LoadBalancer核心源码 23.订制Spring Cloud LoadBalancer 24.测试Spring Cloud LoadBalancer 25.OpenFeign简介与使用 26.OpenFeign的组件 27.OpenFeign的生命周期-创建代理 28.OpenFeign的生命周期-进行调用 29.Spring Cloud OpenFeign 的解析(1) 30. FeignClient 实现重试 31. FeignClient 实现断路器以及线程隔离限流的思路 32. 改进负载均衡算法 33. 实现重试、断路器以及线程隔离源码 34.验证重试配置正确性 35. 验证线程隔离正确性 36. 验证断路器正确性 37. 实现异步的客户端封装配置管理的意义与设计 38. 实现自定义 WebClient 的 NamedContextFactory 39. 改造 resilience4j 粘合 WebClient 40. spock 单元测试封装的 WebClient(上) 40. spock 单元测试封装的 WebClient(下) 41. SpringCloudGateway 基本流程讲解(1) 41. SpringCloudGateway 基本流程讲解(2) 42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷 43.为何 SpringCloudGateway 中会有链路信息丢失 44.避免链路信息丢失做的设计(1) 44.避免链路信息丢失做的设计(2) 45. 实现公共日志记录

39. 改造 resilience4j 粘合 WebClient

干货满满张哈希
• 阅读 290

39. 改造 resilience4j 粘合 WebClient

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

要想实现我们上一节中提到的:

  • 需要在重试以及断路中加一些日志,便于日后的优化
  • 需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常
  • 需要在断路器相关的 Operator 中增加类似于 FeignClient 中的负载均衡的数据更新,使得负载均衡更加智能

我们需要将 resilience4j 本身提供的粘合库做一些改造,其实主要就是对 resilience4j 实现的 project reactor 的 Operator 进行改造。

关于断路器的改造

首先,WebClient 的返回对象只可能是 ClientResponse 类型,所以我们这里改造出来的 Operator 不必带上形参,只需要针对 ClientResponse 即可,即:

public class ClientResponseCircuitBreakerOperator implements UnaryOperator<Publisher<ClientResponse>> {
    ...
}

在原有的断路器逻辑中,我们需要加入针对 GET 方法以及之前定义的可以重试的路径匹配配置可以重试的逻辑,这需要我们拿到原有请求的 URL 信息。但是 ClientResponse 中并没有暴露这些信息的接口,其默认实现 DefaultClientResponse(我们只要没有自己给 WebClient 加入特殊的改造逻辑,实现都是 DefaultClientResponse) 中的 request() 方法可以获取请求 HttpRequest,其中包含 url 信息。但是这个类还有方法都是 package-private 的,我们需要反射出来:

ClientResponseCircuitBreakerSubscriber

private static final Class<?> aClass;
private static final Method request;

static {
    try {
        aClass = Class.forName("org.springframework.web.reactive.function.client.DefaultClientResponse");
        request = ReflectionUtils.findMethod(aClass, "request");
        request.setAccessible(true);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

之后,在获取到 ClientResponse 之后记录断路器的逻辑中,需要加入上面提到的关于重试的改造,以及负载均衡器的记录:

ClientResponseCircuitBreakerSubscriber

protected void hookOnNext(ClientResponse clientResponse) {
    if (!isDisposed()) {
        if (singleProducer && successSignaled.compareAndSet(false, true)) {
            int rawStatusCode = clientResponse.rawStatusCode();
            HttpStatus httpStatus = HttpStatus.resolve(rawStatusCode);
            try {
                HttpRequest httpRequest = (HttpRequest) request.invoke(clientResponse);
                //判断方法是否为 GET,以及是否在可重试路径配置中,从而得出是否可以重试
                if (httpRequest.getMethod() != HttpMethod.GET && !webClientProperties.retryablePathsMatch(httpRequest.getURI().getPath())) {
                    //如果不能重试,则直接返回结果
                    circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
                } else {
                    if (httpStatus != null && httpStatus.is2xxSuccessful()) {
                        //如果成功,则直接返回结果
                        circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
                    } else {
                        /**
                         * 如果异常,参考 DefaultClientResponse 的代码进行异常封装
                         * @see org.springframework.web.reactive.function.client.DefaultClientResponse#createException
                         */
                        Exception exception;
                        if (httpStatus != null) {
                            exception = WebClientResponseException.create(rawStatusCode, httpStatus.getReasonPhrase(), clientResponse.headers().asHttpHeaders(), EMPTY, null, null);
                        } else {
                            exception = new UnknownHttpStatusCodeException(rawStatusCode, clientResponse.headers().asHttpHeaders(), EMPTY, null, null);
                        }
                        circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), exception);
                        downstreamSubscriber.onError(exception);
                        return;
                    }
                }
            } catch (Exception e) {
                log.fatal("judge request method in circuit breaker error! the resilience4j feature would not be enabled: {}", e.getMessage(), e);
                circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
            }
        }
        eventWasEmitted.set(true);
        downstreamSubscriber.onNext(clientResponse);
    }
}

同样的,在原有的完成,取消还有失败的记录逻辑中,也加上记录负载均衡数据:

ClientResponseCircuitBreakerSubscriber

@Override
protected void hookOnComplete() {
    if (successSignaled.compareAndSet(false, true)) {
        serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
        circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
    }

    downstreamSubscriber.onComplete();
}

@Override
public void hookOnCancel() {
    if (!successSignaled.get()) {
        serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
        if (eventWasEmitted.get()) {
            circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
        } else {
            circuitBreaker.releasePermission();
        }
    }
}

@Override
protected void hookOnError(Throwable e) {
    serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
    circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);
    downstreamSubscriber.onError(e);
}

粘合 WebClient 与 resilience4j 的同时覆盖重试逻辑

由于前面的断路器中,我们针对可以重试的非 2XX 响应封装成为 WebClientResponseException。所以在重试器中,我们需要加上针对这个异常的重试。

同时,需要将重试器放在负载均衡器之前,因为每次重试,都要从负载均衡器中获取一个新的实例。同时,断路器需要放在负载均衡器之后,因为只有在这个之后,才能获取到本次调用的实例,我们的的断路器是针对实例方法级别的:

WebClientDefaultConfiguration.java

@Bean
public WebClient getWebClient(
        ReactorLoadBalancerExchangeFilterFunction lbFunction,
        WebClientConfigurationProperties webClientConfigurationProperties,
        Environment environment,
        RetryRegistry retryRegistry,
        CircuitBreakerRegistry circuitBreakerRegistry,
        ServiceInstanceMetrics serviceInstanceMetrics
) {
    String name = environment.getProperty(WebClientNamedContextFactory.PROPERTY_NAME);
    Map<String, WebClientConfigurationProperties.WebClientProperties> configs = webClientConfigurationProperties.getConfigs();
    if (configs == null || configs.size() == 0) {
        throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs");
    }
    WebClientConfigurationProperties.WebClientProperties webClientProperties = configs.get(name);
    if (webClientProperties == null) {
        throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs." + name);
    }
    String serviceName = webClientProperties.getServiceName();
    //如果没填写微服务名称,就使用配置 key 作为微服务名称
    if (StringUtils.isBlank(serviceName)) {
        serviceName = name;
    }
    String baseUrl = webClientProperties.getBaseUrl();
    //如果没填写 baseUrl,就使用微服务名称填充
    if (StringUtils.isBlank(baseUrl)) {
        baseUrl = "http://" + serviceName;
    }

    Retry retry = null;
    try {
        retry = retryRegistry.retry(serviceName, serviceName);
    } catch (ConfigurationNotFoundException e) {
        retry = retryRegistry.retry(serviceName);
    }
    //覆盖其中的异常判断
    retry = Retry.of(serviceName, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {
        //WebClientResponseException 会重试,因为在这里能 catch 的 WebClientResponseException 只对可以重试的请求封装了 WebClientResponseException
        //参考 ClientResponseCircuitBreakerSubscriber 的代码
        if (throwable instanceof WebClientResponseException) {
            log.info("should retry on {}", throwable.toString());
            return true;
        }
        //断路器异常重试,因为请求没有发出去
        if (throwable instanceof CallNotPermittedException) {
            log.info("should retry on {}", throwable.toString());
            return true;
        }
        if (throwable instanceof WebClientRequestException) {
            WebClientRequestException webClientRequestException = (WebClientRequestException) throwable;
            HttpMethod method = webClientRequestException.getMethod();
            URI uri = webClientRequestException.getUri();
            //判断是否为响应超时,响应超时代表请求已经发出去了,对于非 GET 并且没有标注可以重试的请求则不能重试
            boolean isResponseTimeout = false;
            Throwable cause = throwable.getCause();
            //netty 的读取超时一般是 ReadTimeoutException
            if (cause instanceof ReadTimeoutException) {
                log.info("Cause is a ReadTimeoutException which indicates it is a response time out");
                isResponseTimeout = true;
            } else {
                //对于其他一些框架,使用了 java 底层 nio 的一般是 SocketTimeoutException,message 为 read time out
                //还有一些其他异常,但是 message 都会有 read time out 字段,所以通过 message 判断
                String message = throwable.getMessage();
                if (StringUtils.isNotBlank(message) && StringUtils.containsIgnoreCase(message.replace(" ", ""), "readtimeout")) {
                    log.info("Throwable message contains readtimeout which indicates it is a response time out");
                    isResponseTimeout = true;
                }
            }
            //如果请求是 GET 或者标注了重试,则直接判断可以重试
            if (method == HttpMethod.GET || webClientProperties.retryablePathsMatch(uri.getPath())) {
                log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
                return true;
            } else {
                //否则,只针对请求还没有发出去的异常进行重试
                if (isResponseTimeout) {
                    log.info("should not retry on {}-{}, {}", method, uri, throwable.toString());
                } else {
                    log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
                    return true;
                }
            }
        }
        return false;
    }).build());


    HttpClient httpClient = HttpClient
            .create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) webClientProperties.getConnectTimeout().toMillis())
            .doOnConnected(connection ->
                    connection
                            .addHandlerLast(new ReadTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
                            .addHandlerLast(new WriteTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
            );

    Retry finalRetry = retry;
    String finalServiceName = serviceName;
    return WebClient.builder()
            .exchangeStrategies(ExchangeStrategies.builder()
            .codecs(configurer -> configurer
                    .defaultCodecs()
                    //最大 body 占用 16m 内存
                    .maxInMemorySize(16 * 1024 * 1024))
            .build())
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            //Retry在负载均衡前
            .filter((clientRequest, exchangeFunction) -> {
                return exchangeFunction
                        .exchange(clientRequest)
                        .transform(ClientResponseRetryOperator.of(finalRetry));
            })
            //负载均衡器,改写url
            .filter(lbFunction)
            //实例级别的断路器需要在负载均衡获取真正地址之后
            .filter((clientRequest, exchangeFunction) -> {
                ServiceInstance serviceInstance = getServiceInstance(clientRequest);
                serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);
                CircuitBreaker circuitBreaker;
                //这时候的url是经过负载均衡器的,是实例的url
                //需要注意的一点是,使用异步 client 的时候,最好不要带路径参数,否则这里的断路器效果不好
                //断路器是每个实例每个路径一个断路器
                String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort() + clientRequest.url().getPath();
                try {
                    //使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
                    circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);
                } catch (ConfigurationNotFoundException e) {
                    circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
                }
                log.info("webclient circuit breaker [{}-{}] status: {}, data: {}", finalServiceName, instancId, circuitBreaker.getState(), JSON.toJSONString(circuitBreaker.getMetrics()));
                return exchangeFunction.exchange(clientRequest).transform(ClientResponseCircuitBreakerOperator.of(circuitBreaker, serviceInstance, serviceInstanceMetrics, webClientProperties));
            }).baseUrl(baseUrl)
            .build();
}

private ServiceInstance getServiceInstance(ClientRequest clientRequest) {
    URI url = clientRequest.url();
    DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();
    defaultServiceInstance.setHost(url.getHost());
    defaultServiceInstance.setPort(url.getPort());
    return defaultServiceInstance;
}

这样,我们就实现了我们封装的基于配置的 WebClient

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

39. 改造 resilience4j 粘合 WebClient

评论区
推荐文章

暂无数据