专栏目录
17.Eureka的实例配置 18.Eureka的客户端核心设计和配置 1. 背景 2.微服务框架需要考虑的问题 3.Eureka Server 与 API 网关要考虑的问题 4.maven依赖回顾以及项目框架结构 5.所有项目的parent与spring-framework-common说明 6.微服务特性相关的依赖说明 19.Eureka的服务端设计与配置 7.从Bean到SpringCloud 8.理解 NamedContextFactory 9.如何理解并定制一个Spring Cloud组件 10.使用Log4j2以及一些核心配置 11.Log4j2 监控相关 12.UnderTow 简介与内部原理 13.UnderTow 核心配置 14.UnderTow AccessLog 配置介绍 15.UnderTow 订制 16.Eureka架构和核心概念 20. 启动一个 Eureka Server 集群 21.Spring Cloud LoadBalancer简介 22.Spring Cloud LoadBalancer核心源码 23.订制Spring Cloud LoadBalancer 24.测试Spring Cloud LoadBalancer 25.OpenFeign简介与使用 26.OpenFeign的组件 27.OpenFeign的生命周期-创建代理 28.OpenFeign的生命周期-进行调用 29.Spring Cloud OpenFeign 的解析(1) 30. FeignClient 实现重试 31. FeignClient 实现断路器以及线程隔离限流的思路 32. 改进负载均衡算法 33. 实现重试、断路器以及线程隔离源码 34.验证重试配置正确性 35. 验证线程隔离正确性 36. 验证断路器正确性 37. 实现异步的客户端封装配置管理的意义与设计 38. 实现自定义 WebClient 的 NamedContextFactory 39. 改造 resilience4j 粘合 WebClient 40. spock 单元测试封装的 WebClient(上) 40. spock 单元测试封装的 WebClient(下) 41. SpringCloudGateway 基本流程讲解(1) 41. SpringCloudGateway 基本流程讲解(2) 42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷 43.为何 SpringCloudGateway 中会有链路信息丢失 44.避免链路信息丢失做的设计(1) 44.避免链路信息丢失做的设计(2) 45. 实现公共日志记录

34.验证重试配置正确性

干货满满张哈希
• 阅读 343

34.验证重试配置正确性

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

在前面一节,我们利用 resilience4j 粘合了 OpenFeign 实现了断路器、重试以及线程隔离,并使用了新的负载均衡算法优化了业务激增时的负载均衡算法表现。这一节,我们开始编写单元测试验证这些功能的正确性,以便于日后升级依赖,修改的时候能保证正确性。同时,通过单元测试,我们更能深入理解 Spring Cloud。

验证重试配置

对于我们实现的重试,我们需要验证:

  1. 验证配置正确加载:即我们在 Spring 配置(例如 application.yml)中的加入的 Resilience4j 的配置被正确加载应用了。
  2. 验证针对 ConnectTimeout 重试正确:FeignClient 可以配置 ConnectTimeout 连接超时时间,如果连接超时会有连接超时异常抛出,对于这种异常无论什么请求都应该重试,因为请求并没有发出。
  3. 验证针对断路器异常的重试正确:断路器是微服务实例方法级别的,如果抛出断路器打开异常,应该直接重试下一个实例。
  4. 验证针对限流器异常的重试正确:当某个实例线程隔离满了的时候,抛出线程限流异常应该直接重试下一个实例。
  5. 验证针对非 2xx 响应码可重试的方法重试正确
  6. 验证针对非 2xx 响应码不可重试的方法没有重试
  7. 验证针对可重试的方法响应超时异常重试正确:FeignClient 可以配置 ReadTimeout 即响应超时,如果方法可以重试,则需要重试。
  8. 验证针对不可重试的方法响应超时异常不能重试:FeignClient 可以配置 ReadTimeout 即响应超时,如果方法不可以重试,则不能重试。

验证配置正确加载

我们可以定义不同的 FeignClient,之后检查 resilience4j 加载的重试配置来验证重试配置的正确加载。

首先定义两个 FeignClient,微服务分别是 testService1 和 testService2,contextId 分别是 testService1Client 和 testService2Client

@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
    @GetMapping("/anything")
    HttpBinAnythingResponse anything();
}
@FeignClient(name = "testService2", contextId = "testService2Client")
    public interface TestService2Client {
        @GetMapping("/anything")
        HttpBinAnythingResponse anything();
}

然后,我们增加 Spring 配置,使用 SpringExtension 编写单元测试类:

//SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
        //默认请求重试次数为 3
        "resilience4j.retry.configs.default.maxAttempts=3",
        // testService2Client 里面的所有方法请求重试次数为 2
        "resilience4j.retry.configs.testService2Client.maxAttempts=2",
})
@Log4j2
public class OpenFeignClientTest {
    @SpringBootApplication
    @Configuration
    public static class App {
    }
}

编写测试代码,验证配置加载正确性:

@Test
public void testConfigureRetry() {
    //读取所有的 Retry
    List<Retry> retries = retryRegistry.getAllRetries().asJava();
    //验证其中的配置是否符合我们填写的配置
    Map<String, Retry> retryMap = retries.stream().collect(Collectors.toMap(Retry::getName, v -> v));
    //我们初始化 Retry 的时候,使用 FeignClient 的 ContextId 作为了 Retry 的 Name
    Retry retry = retryMap.get("testService1Client");
    //验证 Retry 配置存在
    Assertions.assertNotNull(retry);
    //验证 Retry 配置符合我们的配置
    Assertions.assertEquals(retry.getRetryConfig().getMaxAttempts(), 3);
    retry = retryMap.get("testService2Client");
    //验证 Retry 配置存在
    Assertions.assertNotNull(retry);
    //验证 Retry 配置符合我们的配置
    Assertions.assertEquals(retry.getRetryConfig().getMaxAttempts(), 2);
}

验证针对 ConnectTimeout 重试正确

我们可以通过针对一个微服务注册两个实例,一个实例是连接不上的,另一个实例是可以正常连接的,无论怎么调用 FeignClient,请求都不会失败,来验证重试是否生效。我们使用 HTTP 测试网站来测试,即 http://httpbin.org 。这个网站的 api 可以用来模拟各种调用。其中 /status/{status} 就是将发送的请求原封不动的在响应中返回。在单元测试中,我们不会单独部署一个注册中心,而是直接 Mock spring cloud 中服务发现的核心接口 DiscoveryClient,并且将我们 Eureka 的服务发现以及注册通过配置都关闭,即:

//SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
        //关闭 eureka client
        "eureka.client.enabled=false",
        //默认请求重试次数为 3
        "resilience4j.retry.configs.default.maxAttempts=3"
})
@Log4j2
public class OpenFeignClientTest {
    @SpringBootApplication
    @Configuration
    public static class App {
        @Bean
        public DiscoveryClient discoveryClient() {
            //模拟两个服务实例
            ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
            ServiceInstance service1Instance4 = Mockito.spy(ServiceInstance.class);
            Map<String, String> zone1 = Map.ofEntries(
                    Map.entry("zone", "zone1")
            );
            when(service1Instance1.getMetadata()).thenReturn(zone1);
            when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
            when(service1Instance1.getHost()).thenReturn("httpbin.org");
            when(service1Instance1.getPort()).thenReturn(80);
            when(service1Instance4.getInstanceId()).thenReturn("service1Instance4");
            when(service1Instance4.getHost()).thenReturn("www.httpbin.org");
            //这个port连不上,测试 IOException
            when(service1Instance4.getPort()).thenReturn(18080);
            DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
            //微服务 testService3 有两个实例即 service1Instance1 和 service1Instance4
            Mockito.when(spy.getInstances("testService3"))
                    .thenReturn(List.of(service1Instance1, service1Instance4));
            return spy;
        }
    }
}

编写 FeignClient:

@FeignClient(name = "testService3", contextId = "testService3Client")
public interface TestService3Client {
    @PostMapping("/anything")
    HttpBinAnythingResponse anything();
}

调用 TestService3Client 的 anything 方法,验证是否有重试:

@SpyBean
private TestService3Client testService3Client;

/**
 * 验证对于有不正常实例(正在关闭的实例,会 connect timeout)请求是否正常重试
 */
@Test
public void testIOExceptionRetry() {
    //防止断路器影响
    circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
    for (int i = 0; i < 5; i++) {
        Span span = tracer.nextSpan();
        try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
            //不抛出异常,则正常重试了
            testService3Client.anything();
            testService3Client.anything();
        }
    }
}

这里强调一点,由于我们在这个类中还会测试其他异常,以及断路器,我们需要避免这些测试一起执行的时候,断路器打开了,所以我们在所有测试调用 FeignClient 的方法开头,清空所有断路器的数据,通过:

circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);

并且通过日志中可以看出由于 connect timeout 进行重试:

call url: POST -> http://www.httpbin.org:18080/anything, ThreadPoolStats(testService3Client:www.httpbin.org:18080): {"coreThreadPoolSize":10,"maximumThreadPoolSize":10,"queueCapacity":100,"queueDepth":0,"remainingQueueCapacity":100,"threadPoolSize":1}, CircuitBreakStats(testService3Client:www.httpbin.org:18080:public abstract com.github.jojotech.spring.cloud.webmvc.test.feign.HttpBinAnythingResponse com.github.jojotech.spring.cloud.webmvc.test.feign.OpenFeignClientTest$TestService3Client.anything()): {"failureRate":-1.0,"numberOfBufferedCalls":0,"numberOfFailedCalls":0,"numberOfNotPermittedCalls":0,"numberOfSlowCalls":0,"numberOfSlowFailedCalls":0,"numberOfSlowSuccessfulCalls":0,"numberOfSuccessfulCalls":0,"slowCallRate":-1.0}
TestService3Client#anything() response: 582-Connect to www.httpbin.org:18080 [www.httpbin.org/34.192.79.103, www.httpbin.org/18.232.227.86, www.httpbin.org/3.216.167.140, www.httpbin.org/54.156.165.4] failed: Connect timed out, should retry: true
call url: POST -> http://httpbin.org:80/anything, ThreadPoolStats(testService3Client:httpbin.org:80): {"coreThreadPoolSize":10,"maximumThreadPoolSize":10,"queueCapacity":100,"queueDepth":0,"remainingQueueCapacity":100,"threadPoolSize":1}, CircuitBreakStats(testService3Client:httpbin.org:80:public abstract com.github.jojotech.spring.cloud.webmvc.test.feign.HttpBinAnythingResponse com.github.jojotech.spring.cloud.webmvc.test.feign.OpenFeignClientTest$TestService3Client.anything()): {"failureRate":-1.0,"numberOfBufferedCalls":0,"numberOfFailedCalls":0,"numberOfNotPermittedCalls":0,"numberOfSlowCalls":0,"numberOfSlowFailedCalls":0,"numberOfSlowSuccessfulCalls":0,"numberOfSuccessfulCalls":0,"slowCallRate":-1.0}
response: 200 - OK

验证针对断路器异常的重试正确

通过系列前面的源码分析,我们知道 spring-cloud-openfeign 的 FeignClient 其实是懒加载的。所以我们实现的断路器也是懒加载的,需要先调用,之后才会初始化断路器。所以这里如果我们要模拟断路器打开的异常,需要先手动读取载入断路器,之后才能获取对应方法的断路器,修改状态。

我们先定义一个 FeignClient:

@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
    @GetMapping("/anything")
    HttpBinAnythingResponse anything();
}

使用前面同样的方式,给这个微服务添加实例:

//SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
        //关闭 eureka client
        "eureka.client.enabled=false",
        //默认请求重试次数为 3
        "resilience4j.retry.configs.default.maxAttempts=3",
        //增加断路器配置
        "resilience4j.circuitbreaker.configs.default.failureRateThreshold=50",
        "resilience4j.circuitbreaker.configs.default.slidingWindowType=COUNT_BASED",
        "resilience4j.circuitbreaker.configs.default.slidingWindowSize=5",
        "resilience4j.circuitbreaker.configs.default.minimumNumberOfCalls=2",
})
@Log4j2
public class OpenFeignClientTest {
    @SpringBootApplication
    @Configuration
    public static class App {
        @Bean
        public DiscoveryClient discoveryClient() {
            //模拟两个服务实例
            ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
            ServiceInstance service1Instance3 = Mockito.spy(ServiceInstance.class);
            Map<String, String> zone1 = Map.ofEntries(
                    Map.entry("zone", "zone1")
            );
            when(service1Instance1.getMetadata()).thenReturn(zone1);
            when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
            when(service1Instance1.getHost()).thenReturn("httpbin.org");
            when(service1Instance1.getPort()).thenReturn(80);
            when(service1Instance3.getMetadata()).thenReturn(zone1);
            when(service1Instance3.getInstanceId()).thenReturn("service1Instance3");
            //这其实就是 httpbin.org ,为了和第一个实例进行区分加上 www
            when(service1Instance3.getHost()).thenReturn("www.httpbin.org");
            DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
            //微服务 testService3 有两个实例即 service1Instance1 和 service1Instance4
            Mockito.when(spy.getInstances("testService1"))
                    .thenReturn(List.of(service1Instance1, service1Instance3));
            return spy;
        }
    }
}

然后,编写测试代码:

@Test
public void testRetryOnCircuitBreakerException() {
    //防止断路器影响
    circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
    CircuitBreaker testService1ClientInstance1Anything;
    try {
        testService1ClientInstance1Anything = circuitBreakerRegistry
                .circuitBreaker("testService1Client:httpbin.org:80:public abstract com.github.jojotech.spring.cloud.webmvc.test.feign.HttpBinAnythingResponse com.github.jojotech.spring.cloud.webmvc.test.feign.OpenFeignClientTest$TestService1Client.anything()", "testService1Client");
    } catch (ConfigurationNotFoundException e) {
        //找不到就用默认配置
        testService1ClientInstance1Anything = circuitBreakerRegistry
                .circuitBreaker("testService1Client:httpbin.org:80:public abstract com.github.jojotech.spring.cloud.webmvc.test.feign.HttpBinAnythingResponse com.github.jojotech.spring.cloud.webmvc.test.feign.OpenFeignClientTest$TestService1Client.anything()");
    }
    //将断路器打开
    testService1ClientInstance1Anything.transitionToOpenState();
    //调用多次,调用成功即对断路器异常重试了
    for (int i = 0; i < 10; i++) {
        this.testService1Client.anything();
    }
}

运行测试,日志中可以看出,针对断路器打开的异常进行重试了:

2021-11-13 03:40:13.546  INFO [,,] 4388 --- [           main] c.g.j.s.c.w.f.DefaultErrorDecoder        : TestService1Client#anything() response: 581-CircuitBreaker 'testService1Client:httpbin.org:80:public abstract com.github.jojotech.spring.cloud.webmvc.test.feign.HttpBinAnythingResponse com.github.jojotech.spring.cloud.webmvc.test.feign.OpenFeignClientTest$TestService1Client.anything()' is OPEN and does not permit further calls, should retry: true

验证针对限流器异常的重试正确

通过系列前面的源码分析,我们知道 spring-cloud-openfeign 的 FeignClient 其实是懒加载的。所以我们实现的断路器也是懒加载的,需要先调用,之后才会初始化线程隔离。所以这里如果我们要模拟线程隔离满的异常,需要先手动读取载入线程隔离,之后才能获取对应实例的线程隔离,将线程池填充满。

我们先定义一个 FeignClient:

@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
    @GetMapping("/anything")
    HttpBinAnythingResponse anything();
}

使用前面同样的方式,给这个微服务添加实例:

//SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
        //关闭 eureka client
        "eureka.client.enabled=false",
        //默认请求重试次数为 3
        "resilience4j.retry.configs.default.maxAttempts=3",
        //增加断路器配置
        "resilience4j.circuitbreaker.configs.default.failureRateThreshold=50",
        "resilience4j.circuitbreaker.configs.default.slidingWindowType=COUNT_BASED",
        "resilience4j.circuitbreaker.configs.default.slidingWindowSize=5",
        "resilience4j.circuitbreaker.configs.default.minimumNumberOfCalls=2",
})
@Log4j2
public class OpenFeignClientTest {
    @SpringBootApplication
    @Configuration
    public static class App {
        @Bean
        public DiscoveryClient discoveryClient() {
            //模拟两个服务实例
            ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
            ServiceInstance service1Instance3 = Mockito.spy(ServiceInstance.class);
            Map<String, String> zone1 = Map.ofEntries(
                    Map.entry("zone", "zone1")
            );
            when(service1Instance1.getMetadata()).thenReturn(zone1);
            when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
            when(service1Instance1.getHost()).thenReturn("httpbin.org");
            when(service1Instance1.getPort()).thenReturn(80);
            when(service1Instance3.getMetadata()).thenReturn(zone1);
            when(service1Instance3.getInstanceId()).thenReturn("service1Instance3");
            //这其实就是 httpbin.org ,为了和第一个实例进行区分加上 www
            when(service1Instance3.getHost()).thenReturn("www.httpbin.org");
            DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
            //微服务 testService3 有两个实例即 service1Instance1 和 service1Instance4
            Mockito.when(spy.getInstances("testService1"))
                    .thenReturn(List.of(service1Instance1, service1Instance3));
            return spy;
        }
    }
}

然后,编写测试代码:

@Test
public void testRetryOnBulkheadException() {
    //防止断路器影响
    circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
    this.testService1Client.anything();
    ThreadPoolBulkhead threadPoolBulkhead;
    try {
        threadPoolBulkhead = threadPoolBulkheadRegistry
                .bulkhead("testService1Client:httpbin.org:80", "testService1Client");
    } catch (ConfigurationNotFoundException e) {
        //找不到就用默认配置
        threadPoolBulkhead = threadPoolBulkheadRegistry
                .bulkhead("testService1Client:httpbin.org:80");
    }
    //线程队列我们配置的是 1,线程池大小是 10,这样会将线程池填充满
    for (int i = 0; i < 10 + 1; i++) {
        threadPoolBulkhead.submit(() -> {
            try {
                //这样任务永远不会结束了
                Thread.currentThread().join();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    //调用多次,调用成功即对断路器异常重试了
    for (int i = 0; i < 10; i++) {
        this.testService1Client.anything();
    }
}

运行测试,日志中可以看出,针对线程池满的异常进行重试了:

2021-11-13 03:35:16.371  INFO [,,] 3824 --- [           main] c.g.j.s.c.w.f.DefaultErrorDecoder        : TestService1Client#anything() response: 584-Bulkhead 'testService1Client:httpbin.org:80' is full and does not permit further calls, should retry: true

验证针对非 2xx 响应码可重试的方法重试正确

我们通过使用 http.bin 的 /status/{statusCode} 接口,这个接口会根据路径参数 statusCode 返回对应状态码的响应:

@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
    @GetMapping("/status/500")
    String testGetRetryStatus500();
}

我们如何感知被重试三次呢?每次调用,就会从负载均衡器获取一个服务实例。在负载均衡器代码中,我们使用了根据当前 sleuth 的上下文的 traceId 的缓存,每次调用,traceId 对应的 position 值就会加 1。我们可以通过观察这个值的变化获取到究竟本次请求调用了几次负载均衡器,也就是做了几次调用。

编写测试:

@Test
public void testNon2xxRetry() {
    Span span = tracer.nextSpan();
    try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
        //防止断路器影响
        circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
        long l = span.context().traceId();
        RoundRobinWithRequestSeparatedPositionLoadBalancer loadBalancerClientFactoryInstance
                = (RoundRobinWithRequestSeparatedPositionLoadBalancer) loadBalancerClientFactory.getInstance("testService1");
        AtomicInteger atomicInteger = loadBalancerClientFactoryInstance.getPositionCache().get(l);
        int start = atomicInteger.get();
        try {
            //get 方法会重试
            testService1Client.testGetRetryStatus500();
        } catch (Exception e) {
        }
        //因为每次调用都会失败,所以会重试配置的 3 次
        Assertions.assertEquals(3, atomicInteger.get() - start);
    }
}

验证针对非 2xx 响应码不可重试的方法没有重试

我们通过使用 http.bin 的 /status/{statusCode} 接口,这个接口会根据路径参数 statusCode 返回对应状态码的响应,并且支持各种 HTTP 请求方式:

@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
    @PostMapping("/status/500")
    String testPostRetryStatus500();
}

默认情况下,我们只会对 GET 方法重试,对于其他 HTTP 请求方法,是不会重试的:

@Test
public void testNon2xxRetry() {
    Span span = tracer.nextSpan();
    try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
        //防止断路器影响
        circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
        long l = span.context().traceId();
        RoundRobinWithRequestSeparatedPositionLoadBalancer loadBalancerClientFactoryInstance
                = (RoundRobinWithRequestSeparatedPositionLoadBalancer) loadBalancerClientFactory.getInstance("testService1");
        AtomicInteger atomicInteger = loadBalancerClientFactoryInstance.getPositionCache().get(l);
        int start = atomicInteger.get();
        try {
            //post 方法不会重试
            testService1Client.testPostRetryStatus500();
        } catch (Exception e) {
        }
        //不会重试,因此只会被调用 1 次
        Assertions.assertEquals(1, atomicInteger.get() - start);
    }
}

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

34.验证重试配置正确性

评论区
推荐文章

暂无数据