专栏目录
17.Eureka的实例配置 18.Eureka的客户端核心设计和配置 1. 背景 2.微服务框架需要考虑的问题 3.Eureka Server 与 API 网关要考虑的问题 4.maven依赖回顾以及项目框架结构 5.所有项目的parent与spring-framework-common说明 6.微服务特性相关的依赖说明 19.Eureka的服务端设计与配置 7.从Bean到SpringCloud 8.理解 NamedContextFactory 9.如何理解并定制一个Spring Cloud组件 10.使用Log4j2以及一些核心配置 11.Log4j2 监控相关 12.UnderTow 简介与内部原理 13.UnderTow 核心配置 14.UnderTow AccessLog 配置介绍 15.UnderTow 订制 16.Eureka架构和核心概念 20. 启动一个 Eureka Server 集群 21.Spring Cloud LoadBalancer简介 22.Spring Cloud LoadBalancer核心源码 23.订制Spring Cloud LoadBalancer 24.测试Spring Cloud LoadBalancer 25.OpenFeign简介与使用 26.OpenFeign的组件 27.OpenFeign的生命周期-创建代理 28.OpenFeign的生命周期-进行调用 29.Spring Cloud OpenFeign 的解析(1) 30. FeignClient 实现重试 31. FeignClient 实现断路器以及线程隔离限流的思路 32. 改进负载均衡算法 33. 实现重试、断路器以及线程隔离源码 34.验证重试配置正确性 35. 验证线程隔离正确性 36. 验证断路器正确性 37. 实现异步的客户端封装配置管理的意义与设计 38. 实现自定义 WebClient 的 NamedContextFactory 39. 改造 resilience4j 粘合 WebClient 40. spock 单元测试封装的 WebClient(上) 40. spock 单元测试封装的 WebClient(下) 41. SpringCloudGateway 基本流程讲解(1) 41. SpringCloudGateway 基本流程讲解(2) 42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷 43.为何 SpringCloudGateway 中会有链路信息丢失 44.避免链路信息丢失做的设计(1) 44.避免链路信息丢失做的设计(2) 45. 实现公共日志记录

33. 实现重试、断路器以及线程隔离源码

干货满满张哈希
• 阅读 247

33. 实现重试、断路器以及线程隔离源码

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

在前面两节,我们梳理了实现 Feign 断路器以及线程隔离的思路,并说明了如何优化目前的负载均衡算法。但是如何更新负载均衡的数据缓存,以及实现重试、断路器以及线程隔离的源码还没提,这一节我们会详细分析。

首先,从 spring.factories 引入,增加我们自定义 OpenFeign 配置的加载:

spring.factories

# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.jojotech.spring.cloud.webmvc.auto.OpenFeignAutoConfiguration

自动配置类是 OpenFeignAutoConfiguration,其内容是:

OpenFeignAutoConfiguration.java

//设置 `@Configuration(proxyBeanMethods=false)`,因为没有 @Bean 的方法互相调用需要每次返回同一个 Bean,没必要代理,关闭增加启动速度
@Configuration(proxyBeanMethods = false)
//加载配置,CommonOpenFeignConfiguration
@Import(CommonOpenFeignConfiguration.class)
//启用 OpenFeign 注解扫描和配置,默认配置为 DefaultOpenFeignConfiguration,其实就是 Feign 的 NamedContextFactory(即 FeignContext)的默认配置类是 DefaultOpenFeignConfiguration
@EnableFeignClients(value = "com.github.jojotech", defaultConfiguration = DefaultOpenFeignConfiguration.class)
public class OpenFeignAutoConfiguration {
}

为何要加这一层而不是直接使用 Import 的 CommonOpenFeignConfiguration?使用 @AutoConfigurationBefore@AutoConfigurationAfter 配置和其他 AutoConfiguration 加载的前后顺序。 @AutoConfigurationBefore@AutoConfigurationAfter 是 spring-boot 的注解,只对于 spring.factories 加载的 AutoConfiguration 生效。所以在设计上要加上这一层,防止我们未来可能会用到这些注解

CommonOpenFeignConfiguration 中包含所有 OpenFeign 的共用的一些 Bean,这些 Bean 是单例被所有 FeignClient 公用的,包括:

  1. FeignClient 要用的 Client 的底层 HTTP Client,我们这里使用 Apache HttpClient
  2. 将 Apache HttpClient 封装成 FeignClient 要用的 Client 的 ApacheHttpClient
  3. spring-cloud-openfeign 的 FeignClient 用的 Client 的负载均衡实现核心类是 FeignBlockingLoadBalancerClient,我们需要将其封装代理从而实现断路器和线程隔离以及负载均衡数据采集,封装类是我们自己实现的 FeignBlockingLoadBalancerClientDelegate。核心实现断路器和线程隔离逻辑的类是 Resilience4jFeignClient。

CommonOpenFeignConfiguration.java

@Configuration(proxyBeanMethods = false)
public class CommonOpenFeignConfiguration {
    //创建 Apache HttpClient,自定义一些配置
    @Bean
    public HttpClient getHttpClient() {
        // 长连接保持5分钟
        PoolingHttpClientConnectionManager pollingConnectionManager = new PoolingHttpClientConnectionManager(5, TimeUnit.MINUTES);
        // 总连接数
        pollingConnectionManager.setMaxTotal(1000);
        // 同路由的并发数
        pollingConnectionManager.setDefaultMaxPerRoute(1000);
        HttpClientBuilder httpClientBuilder = HttpClients.custom();
        httpClientBuilder.setConnectionManager(pollingConnectionManager);
        // 保持长连接配置,需要在头添加Keep-Alive
        httpClientBuilder.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy());
        return httpClientBuilder.build();
    }

    //创建使用 HttpClient 实现的 OpenFeign 的 Client 接口的 Bean
    @Bean
    public ApacheHttpClient apacheHttpClient(HttpClient httpClient) {
        return new ApacheHttpClient(httpClient);
    }

    //FeignBlockingLoadBalancerClient 的代理类,也是实现 OpenFeign 的 Client 接口的 Bean
    @Bean
    //使用 Primary 让 FeignBlockingLoadBalancerClientDelegate 成为所有 FeignClient 实际使用的 Bean
    @Primary
    public FeignBlockingLoadBalancerClientDelegate feignBlockingLoadBalancerCircuitBreakableClient(
            ServiceInstanceMetrics serviceInstanceMetrics,
            //我们上面创建的 ApacheHttpClient Bean
            ApacheHttpClient apacheHttpClient,
            //为何使用 ObjectProvider 请参考 FeignBlockingLoadBalancerClientDelegate 源码的注释
            ObjectProvider<LoadBalancerClient> loadBalancerClientProvider,
            //resilience4j 的线程隔离
            ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry,
            //resilience4j 的断路器
            CircuitBreakerRegistry circuitBreakerRegistry,
            //Sleuth 的 Tracer,用于获取请求上下文
            Tracer tracer,
            //负载均衡属性
            LoadBalancerProperties properties,
            //为何使用这个不直接用 FeignBlockingLoadBalancerClient 请参考 FeignBlockingLoadBalancerClientDelegate 的注释
            LoadBalancerClientFactory loadBalancerClientFactory
    ) {
        return new FeignBlockingLoadBalancerClientDelegate(
                //我们自己封装的核心 Client 实现,加入了断路器,线程隔离以及负载均衡数据采集
                new Resilience4jFeignClient(
                        serviceInstanceMetrics, apacheHttpClient,
                        threadPoolBulkheadRegistry,
                        circuitBreakerRegistry,
                        tracer
                ),
                loadBalancerClientProvider,
                properties,
                loadBalancerClientFactory
        );
    }
}

其中,Resilience4jFeignClient 粘合断路器,线程隔离的核心代码,同时也记录了负载均衡的实际调用数据

Resilience4jFeignClient.java

public class Resilience4jFeignClient implements Client {
    private final ServiceInstanceMetrics serviceInstanceMetrics;
    private final ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry;
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    private final Tracer tracer;
    private ApacheHttpClient apacheHttpClient;


    public Resilience4jFeignClient(
            ServiceInstanceMetrics serviceInstanceMetrics, ApacheHttpClient apacheHttpClient,
            ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry,
            CircuitBreakerRegistry circuitBreakerRegistry,
            Tracer tracer
    ) {
        this.serviceInstanceMetrics = serviceInstanceMetrics;
        this.apacheHttpClient = apacheHttpClient;
        this.threadPoolBulkheadRegistry = threadPoolBulkheadRegistry;
        this.circuitBreakerRegistry = circuitBreakerRegistry;
        this.tracer = tracer;
    }

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        //获取定义 FeignClient 的接口的 FeignClient 注解
        FeignClient annotation = request.requestTemplate().methodMetadata().method().getDeclaringClass().getAnnotation(FeignClient.class);
        //和 Retry 保持一致,使用 contextId,而不是微服务名称
        //contextId 会作为我们后面读取断路器以及线程隔离配置的 key
        String contextId = annotation.contextId();
        //获取实例唯一id
        String serviceInstanceId = getServiceInstanceId(contextId, request);
        //获取实例+方法唯一id
        String serviceInstanceMethodId = getServiceInstanceMethodId(contextId, request);

        ThreadPoolBulkhead threadPoolBulkhead;
        CircuitBreaker circuitBreaker;
        try {
            //每个实例一个线程池
            threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(serviceInstanceId, contextId);
        } catch (ConfigurationNotFoundException e) {
            threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(serviceInstanceId);
        }
        try {
            //每个服务实例具体方法一个resilience4j熔断记录器,在服务实例具体方法维度做熔断,所有这个服务的实例具体方法共享这个服务的resilience4j熔断配置
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(serviceInstanceMethodId, contextId);
        } catch (ConfigurationNotFoundException e) {
            circuitBreaker = circuitBreakerRegistry.circuitBreaker(serviceInstanceMethodId);
        }
        //保持traceId
        Span span = tracer.currentSpan();
        ThreadPoolBulkhead finalThreadPoolBulkhead = threadPoolBulkhead;
        CircuitBreaker finalCircuitBreaker = circuitBreaker;
        Supplier<CompletionStage<Response>> completionStageSupplier = ThreadPoolBulkhead.decorateSupplier(threadPoolBulkhead,
                OpenfeignUtil.decorateSupplier(circuitBreaker, () -> {
                    try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                        log.info("call url: {} -> {}, ThreadPoolStats({}): {}, CircuitBreakStats({}): {}",
                                request.httpMethod(),
                                request.url(),
                                serviceInstanceId,
                                JSON.toJSONString(finalThreadPoolBulkhead.getMetrics()),
                                serviceInstanceMethodId,
                                JSON.toJSONString(finalCircuitBreaker.getMetrics())
                        );
                        Response execute = apacheHttpClient.execute(request, options);
                        log.info("response: {} - {}", execute.status(), execute.reason());
                        return execute;
                    } catch (IOException e) {
                        throw new CompletionException(e);
                    }
                })
        );
        ServiceInstance serviceInstance = getServiceInstance(request);
        try {
            serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);
            Response response = Try.ofSupplier(completionStageSupplier).get().toCompletableFuture().join();
            serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
            return response;
        } catch (BulkheadFullException e) {
            //线程池限流异常
            serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
            return Response.builder()
                    .request(request)
                    .status(SpecialHttpStatus.BULKHEAD_FULL.getValue())
                    .reason(e.getLocalizedMessage())
                    .requestTemplate(request.requestTemplate()).build();
        } catch (CompletionException e) {
            serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
            //内部抛出的所有异常都被封装了一层 CompletionException,所以这里需要取出里面的 Exception
            Throwable cause = e.getCause();
            //对于断路器打开,返回对应特殊的错误码
            if (cause instanceof CallNotPermittedException) {
                return Response.builder()
                        .request(request)
                        .status(SpecialHttpStatus.CIRCUIT_BREAKER_ON.getValue())
                        .reason(cause.getLocalizedMessage())
                        .requestTemplate(request.requestTemplate()).build();
            }
            //对于 IOException,需要判断是否请求已经发送出去了
            //对于 connect time out 的异常,则可以重试,因为请求没发出去,但是例如 read time out 则不行,因为请求已经发出去了
            if (cause instanceof IOException) {
                boolean containsRead = cause.getMessage().toLowerCase().contains("read");
                if (containsRead) {
                    log.info("{}-{} exception contains read, which indicates the request has been sent", e.getMessage(), cause.getMessage());
                    //如果是 read 异常,则代表请求已经发了出去,则不能重试(除非是 GET 请求或者有 RetryableMethod 注解,这个在 DefaultErrorDecoder 判断)
                    return Response.builder()
                            .request(request)
                            .status(SpecialHttpStatus.NOT_RETRYABLE_IO_EXCEPTION.getValue())
                            .reason(cause.getLocalizedMessage())
                            .requestTemplate(request.requestTemplate()).build();
                } else {
                    return Response.builder()
                            .request(request)
                            .status(SpecialHttpStatus.RETRYABLE_IO_EXCEPTION.getValue())
                            .reason(cause.getLocalizedMessage())
                            .requestTemplate(request.requestTemplate()).build();
                }
            }
            throw e;
        }
    }

    private ServiceInstance getServiceInstance(Request request) throws MalformedURLException {
        URL url = new URL(request.url());
        DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();
        defaultServiceInstance.setHost(url.getHost());
        defaultServiceInstance.setPort(url.getPort());
        return defaultServiceInstance;
    }

    //获取微服务实例id,格式为:FeignClient 的 contextId:host:port,例如: test1Client:10.238.45.78:8251
    private String getServiceInstanceId(String contextId, Request request) throws MalformedURLException {
        //解析 URL
        URL url = new URL(request.url());
        //拼接微服务实例id
        return contextId + ":" + url.getHost() + ":" + url.getPort();
    }

    //获取微服务实例方法id,格式为:FeignClient 的 contextId:host:port:methodName,例如:test1Client:10.238.45.78:8251:
    private String getServiceInstanceMethodId(String contextId, Request request) throws MalformedURLException {
        URL url = new URL(request.url());
        //通过微服务名称 + 实例 + 方法的方式,获取唯一id
        String methodName = request.requestTemplate().methodMetadata().method().toGenericString();
        return contextId + ":" + url.getHost() + ":" + url.getPort() + ":" + methodName;
    }
}

在上面,我们定义了几种特殊的 HTTP 返回码,主要目的是想将一些异常封装成响应返回,然后通过我们后面 Feign 错误解码器解码成统一的 RetryableException,这样在 resilience4j 的重试配置中,我们就不用配置很复杂的异常重试,仅针对 RetryableException 进行重试即可

我们想让 spring-cloud-openfeign 的核心负载均衡 Client, 在完成调用 LoadBalancer 选择实例并替换 url 之后,调用的 client 直接是 ApacheHttpClient 而是我们上面这个类,所以加入了 FeignBlockingLoadBalancerClientDelegate 封装:

/**
 * 由于初始化 FeignBlockingLoadBalancerClient 需要 LoadBalancerClient
 * 但是由于 Spring Cloud 2020 之后,Spring Cloud LoadBalancer BlockingClient 的加载,强制加入了顺序
 * @see org.springframework.cloud.loadbalancer.config.BlockingLoadBalancerClientAutoConfiguration
 * 这个自动配置加入了 @AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
 * 导致我们在初始化的 FeignClient 的时候,无法拿到 BlockingClient
 * 所以,需要通过 ObjectProvider 封装 LoadBalancerClient,在真正调用 FeignClient 的时候通过 ObjectProvider 拿到 LoadBalancerClient 来创建 FeignBlockingLoadBalancerClient
 */
public class FeignBlockingLoadBalancerClientDelegate implements Client {
    private FeignBlockingLoadBalancerClient feignBlockingLoadBalancerClient;

    private final Client delegate;
    private final ObjectProvider<LoadBalancerClient> loadBalancerClientObjectProvider;
    private final LoadBalancerProperties properties;
    private final LoadBalancerClientFactory loadBalancerClientFactory;

    public FeignBlockingLoadBalancerClientDelegate(
            Client delegate,
            ObjectProvider<LoadBalancerClient> loadBalancerClientObjectProvider,
            LoadBalancerProperties properties,
            LoadBalancerClientFactory loadBalancerClientFactory
    ) {
        this.delegate = delegate;
        this.loadBalancerClientObjectProvider = loadBalancerClientObjectProvider;
        this.properties = properties;
        this.loadBalancerClientFactory = loadBalancerClientFactory;
    }

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        if (feignBlockingLoadBalancerClient == null) {
            synchronized (this) {
                if (feignBlockingLoadBalancerClient == null) {
                    feignBlockingLoadBalancerClient = new FeignBlockingLoadBalancerClient(
                            this.delegate,
                            this.loadBalancerClientObjectProvider.getIfAvailable(),
                            this.properties,
                            this.loadBalancerClientFactory
                    );
                }
            }
        }
        return feignBlockingLoadBalancerClient.execute(request, options);
    }
}

我们指定的 FeignClient 的 NamedContextFactory(即 FeignContext)的默认配置 DefaultOpenFeignConfiguration 中,主要粘合了重试逻辑,以及错误解码器:

@Configuration(proxyBeanMethods = false)
public class DefaultOpenFeignConfiguration {

    @Bean
    public ErrorDecoder errorDecoder() {
        return new DefaultErrorDecoder();
    }

    @Bean
    public Feign.Builder resilience4jFeignBuilder(
            List<FeignDecoratorBuilderInterceptor> feignDecoratorBuilderInterceptors,
            FeignDecorators.Builder builder
    ) {
        feignDecoratorBuilderInterceptors.forEach(feignDecoratorBuilderInterceptor -> feignDecoratorBuilderInterceptor.intercept(builder));
        return Resilience4jFeign.builder(builder.build());
    }

    @Bean
    public FeignDecorators.Builder defaultBuilder(Environment environment, RetryRegistry retryRegistry) {
        String name = environment.getProperty("feign.client.name");
        Retry retry = null;
        try {
            retry = retryRegistry.retry(name, name);
        } catch (ConfigurationNotFoundException e) {
            retry = retryRegistry.retry(name);
        }
        //覆盖其中的异常判断,只针对 feign.RetryableException 进行重试,所有需要重试的异常我们都在 DefaultErrorDecoder 以及 Resilience4jFeignClient 中封装成了 RetryableException
        retry = Retry.of(name, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {
            return throwable instanceof feign.RetryableException;
        }).build());
        return FeignDecorators.builder().withRetry(
                retry
        );
    }
}

错误解码器即把上面可以重试的异常响应码,以及我们想重试的请求封装成 RetryableException,代码就不赘述了。这样我们就实现了自定义的实现重试、断路器以及线程隔离的 FeignClient。可以通过如下方式进行配置使用:

application.yml 配置:

################ feign配置 ################
feign:
  hystrix:
    enabled: false
  client:
    config:
      default:
        # 链接超时
        connectTimeout: 500
        # 读取超时
        readTimeout: 8000
      test1-client:
        # 链接超时
        connectTimeout: 500
        # 读取超时
        readTimeout: 60000
################ resilience配置 ################
resilience4j.circuitbreaker:
  configs:
    default:
      registerHealthIndicator: true
      slidingWindowSize: 10
      minimumNumberOfCalls: 5
      slidingWindowType: TIME_BASED
      permittedNumberOfCallsInHalfOpenState: 3
      automaticTransitionFromOpenToHalfOpenEnabled: true
      waitDurationInOpenState: 2s
      failureRateThreshold: 30
      eventConsumerBufferSize: 10
      recordExceptions:
        - java.lang.Exception
resilience4j.retry:
  configs:
    default:
      maxRetryAttempts: 2
    test1-client:
      maxRetryAttempts: 3
resilience4j.thread-pool-bulkhead:
  configs:
    default:
      maxThreadPoolSize: 64
      coreThreadPoolSize: 32
      queueCapacity: 32

定义 Feignclient:

//这个会用到所有 key 为 test1-client 的配置,如果对应的配置中没有 test1-client,就用 default
@FeignClient(name = "service1", contextId = "test1-client")
public interface TestService1Client {
    @GetMapping("/anything")
    HttpBinAnythingResponse anything();
}
//这个会用到所有 key 为 test2-client 的配置,由于我们这里没有 test2-client 的单独配置,所以用的全是 default 配置
@FeignClient(name = "service1", contextId = "test2-client")
public interface TestService1Client2 {
    @GetMapping("/anything")
    HttpBinAnythingResponse anything();
}

下一节开始,我们会对这里实现的 FeignClient 封装进行单元测试,验证我们的正确性。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

33. 实现重试、断路器以及线程隔离源码

评论区
推荐文章

暂无数据