SpringCloud升级之路2020.0.x版-35. 验证线程隔离正确性

干货满满张哈希 等级 61 0 0
标签: 线程

SpringCloud升级之路2020.0.x版-35. 验证线程隔离正确性

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

上一节我们通过单元测试验证了重试的正确性,这一节我们来验证我们线程隔离的正确性,主要包括:

  1. 验证配置正确加载:即我们在 Spring 配置(例如 application.yml)中的加入的 Resilience4j 的配置被正确加载应用了。
  2. 相同微服务调用不同实例的时候,使用的是不同的线程(池)。

验证配置正确加载

与之前验证重试类似,我们可以定义不同的 FeignClient,之后检查 resilience4j 加载的线程隔离配置来验证线程隔离配置的正确加载。

并且,与重试配置不同的是,通过系列前面的源码分析,我们知道 spring-cloud-openfeign 的 FeignClient 其实是懒加载的。所以我们实现的线程隔离也是懒加载的,需要先调用,之后才会初始化线程池。所以这里我们需要先进行调用之后,再验证线程池配置。

首先定义两个 FeignClient,微服务分别是 testService1 和 testService2,contextId 分别是 testService1Client 和 testService2Client

@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
    @GetMapping("/anything")
    HttpBinAnythingResponse anything();
}
@FeignClient(name = "testService2", contextId = "testService2Client")
    public interface TestService2Client {
        @GetMapping("/anything")
        HttpBinAnythingResponse anything();
}

然后,我们增加 Spring 配置,并且给两个微服务都添加一个实例,使用 SpringExtension 编写单元测试类:

//SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
        //默认请求重试次数为 3
        "resilience4j.retry.configs.default.maxAttempts=3",
        // testService2Client 里面的所有方法请求重试次数为 2
        "resilience4j.retry.configs.testService2Client.maxAttempts=2",
        //默认线程池配置
        "resilience4j.thread-pool-bulkhead.configs.default.coreThreadPoolSize=10",
        "resilience4j.thread-pool-bulkhead.configs.default.maxThreadPoolSize=10",
        "resilience4j.thread-pool-bulkhead.configs.default.queueCapacity=1" ,
        //testService2Client 的线程池配置
        "resilience4j.thread-pool-bulkhead.configs.testService2Client.coreThreadPoolSize=5",
        "resilience4j.thread-pool-bulkhead.configs.testService2Client.maxThreadPoolSize=5",
        "resilience4j.thread-pool-bulkhead.configs.testService2Client.queueCapacity=1",
})
@Log4j2
public class OpenFeignClientTest {
    @SpringBootApplication
    @Configuration
    public static class App {
        @Bean
        public DiscoveryClient discoveryClient() {
            //模拟两个服务实例
            ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
            ServiceInstance service2Instance2 = Mockito.spy(ServiceInstance.class);
            Map<String, String> zone1 = Map.ofEntries(
                    Map.entry("zone", "zone1")
            );
            when(service1Instance1.getMetadata()).thenReturn(zone1);
            when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
            when(service1Instance1.getHost()).thenReturn("www.httpbin.org");
            when(service1Instance1.getPort()).thenReturn(80);
            when(service2Instance2.getInstanceId()).thenReturn("service1Instance2");
            when(service2Instance2.getHost()).thenReturn("httpbin.org");
            when(service2Instance2.getPort()).thenReturn(80);
            DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
            Mockito.when(spy.getInstances("testService1"))
                    .thenReturn(List.of(service1Instance1));
            Mockito.when(spy.getInstances("testService2"))
                    .thenReturn(List.of(service2Instance2));
            return spy;
        }
    }
}

编写测试代码,验证配置正确:

@Test
public void testConfigureThreadPool() {
    //防止断路器影响
    circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
    //调用下这两个 FeignClient 确保对应的 NamedContext 被初始化
    testService1Client.anything();
    testService2Client.anything();
    //验证线程隔离的实际配置,符合我们的填入的配置
    ThreadPoolBulkhead threadPoolBulkhead = threadPoolBulkheadRegistry.getAllBulkheads().asJava()
            .stream().filter(t -> t.getName().contains("service1Instance1")).findFirst().get();
    Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getCoreThreadPoolSize(), 10);
    Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getMaxThreadPoolSize(), 10);
    threadPoolBulkhead = threadPoolBulkheadRegistry.getAllBulkheads().asJava()
            .stream().filter(t -> t.getName().contains("service1Instance2")).findFirst().get();
    Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getCoreThreadPoolSize(), 5);
    Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getMaxThreadPoolSize(), 5);
}

相同微服务调用不同实例的时候,使用的是不同的线程(池)。

我们需要确保,最后调用(也就是发送 http 请求)的执行的线程池,必须是对应的 ThreadPoolBulkHead 中的线程池。这个需要我们对 ApacheHttpClient 做切面实现,添加注解 @EnableAspectJAutoProxy(proxyTargetClass = true)

//SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
        //默认请求重试次数为 3
        "resilience4j.retry.configs.default.maxAttempts=3",
        // testService2Client 里面的所有方法请求重试次数为 2
        "resilience4j.retry.configs.testService2Client.maxAttempts=2",
        //默认线程池配置
        "resilience4j.thread-pool-bulkhead.configs.default.coreThreadPoolSize=10",
        "resilience4j.thread-pool-bulkhead.configs.default.maxThreadPoolSize=10",
        "resilience4j.thread-pool-bulkhead.configs.default.queueCapacity=1" ,
        //testService2Client 的线程池配置
        "resilience4j.thread-pool-bulkhead.configs.testService2Client.coreThreadPoolSize=5",
        "resilience4j.thread-pool-bulkhead.configs.testService2Client.maxThreadPoolSize=5",
        "resilience4j.thread-pool-bulkhead.configs.testService2Client.queueCapacity=1",
})
@Log4j2
public class OpenFeignClientTest {
    @SpringBootApplication
    @Configuration
    @EnableAspectJAutoProxy(proxyTargetClass = true)
    public static class App {
        @Bean
        public DiscoveryClient discoveryClient() {
            //模拟两个服务实例
            ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
            ServiceInstance service2Instance2 = Mockito.spy(ServiceInstance.class);
            Map<String, String> zone1 = Map.ofEntries(
                    Map.entry("zone", "zone1")
            );
            when(service1Instance1.getMetadata()).thenReturn(zone1);
            when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
            when(service1Instance1.getHost()).thenReturn("www.httpbin.org");
            when(service1Instance1.getPort()).thenReturn(80);
            when(service2Instance2.getInstanceId()).thenReturn("service1Instance2");
            when(service2Instance2.getHost()).thenReturn("httpbin.org");
            when(service2Instance2.getPort()).thenReturn(80);
            DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
            Mockito.when(spy.getInstances("testService1"))
                    .thenReturn(List.of(service1Instance1));
            Mockito.when(spy.getInstances("testService2"))
                    .thenReturn(List.of(service2Instance2));
            return spy;
        }
    }
}

拦截 ApacheHttpClientexecute 方法,这样可以拿到真正负责 http 调用的线程池,将线程其放入请求的 Header:

@Aspect
public static class ApacheHttpClientAop {
    //在最后一步 ApacheHttpClient 切面
    @Pointcut("execution(* com.github.jojotech.spring.cloud.webmvc.feign.ApacheHttpClient.execute(..))")
    public void annotationPointcut() {
    }

    @Around("annotationPointcut()")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        //设置 Header,不能通过 Feign 的 RequestInterceptor,因为我们要拿到最后调用 ApacheHttpClient 的线程上下文
        Request request = (Request) pjp.getArgs()[0];
        Field headers = ReflectionUtils.findField(Request.class, "headers");
        ReflectionUtils.makeAccessible(headers);
        Map<String, Collection<String>> map = (Map<String, Collection<String>>) ReflectionUtils.getField(headers, request);
        HashMap<String, Collection<String>> stringCollectionHashMap = new HashMap<>(map);
        stringCollectionHashMap.put(THREAD_ID_HEADER, List.of(String.valueOf(Thread.currentThread().getName())));
        ReflectionUtils.setField(headers, request, stringCollectionHashMap);
        return pjp.proceed();
    }
}

这样,我们就能拿到具体承载请求的线程的名称,从名称中可以看出他所处于的线程池(格式为“bulkhead-线程隔离名称-n”,例如 bulkhead-testService1Client:www.httpbin.org:80-1),接下来我们就来看下不同的实例是否用了不同的线程池进行调用:

@Test
public void testDifferentThreadPoolForDifferentInstance() throws InterruptedException {
    //防止断路器影响
    circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
    Set<String> threadIds = Sets.newConcurrentHashSet();
    Thread[] threads = new Thread[100];
    //循环100次
    for (int i = 0; i < 100; i++) {
        threads[i] = new Thread(() -> {
            Span span = tracer.nextSpan();
            try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                HttpBinAnythingResponse response = testService1Client.anything();
                //因为 anything 会返回我们发送的请求实体的所有内容,所以我们能获取到请求的线程名称 header
                String threadId = response.getHeaders().get(THREAD_ID_HEADER);
                threadIds.add(threadId);
            }
        });
        threads[i].start();
    }
    for (int i = 0; i < 100; i++) {
        threads[i].join();
    }
    //确认实例 testService1Client:httpbin.org:80 线程池的线程存在
    Assertions.assertTrue(threadIds.stream().anyMatch(s -> s.contains("testService1Client:httpbin.org:80")));
    //确认实例 testService1Client:httpbin.org:80 线程池的线程存在
    Assertions.assertTrue(threadIds.stream().anyMatch(s -> s.contains("testService1Client:www.httpbin.org:80")));
}

这样,我们就成功验证了,实例调用的线程池隔离。

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

SpringCloud升级之路2020.0.x版-35. 验证线程隔离正确性

收藏
评论区

相关推荐

SpringCloud升级之路2020.0.x版-7.从Bean到SpringCloud
本系列为之前系列的整理重启版,随着项目的发展以及项目中的使用,之前系列里面很多东西发生了变化,并且还有一些东西之前系列并没有提到,所以重启这个系列重新整理下,欢迎各位留言交流,谢谢!在理解 Spring Cloud 之前,我们先了解下 Spring 框架、Spring Boot、Spring Cloud 这三者的关系,从一个简单的 Bean,是如何发展出一个
2021升级版微服务教程5—通过IDEA运行多个项目实例「模拟集群」
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」 ![](https://oscimg.oschina.net/oscnet/c90af336-21f6-4812-a448-cdce3e5d903a.png) **教程全目录「含视频」**:https://gitee.com/bingqilinpe
2021升级版微服务教程4—Nacos 服务注册和发现
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」 ![](https://oscimg.oschina.net/oscnet/f2a7c1f4-d28b-48a9-b156-11d0a33ad613.png) 默认文件1610014380163 **教程全目录「含视频」**:https://gi
2021升级版微服务教程4—Nacos 服务注册和发现
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」 ![](https://oscimg.oschina.net/oscnet/f2a7c1f4-d28b-48a9-b156-11d0a33ad613.png) 默认文件1610014380163 **教程全目录「含视频」**:https://gi
Spring Cloud Config Server迁移节点或容器化带来的问题 原因,解决
版本 == * springboot 2.0.1.RELEASE * springcloud Finchley.RC1 问题 == 看程序猿 dd 的博客 [http://blog.didispace.com/Spring-Cloud-Config-Server-ip-change-problem/](https://www.oschina.n
Spring Cloud系列教程(十四):服务追踪SpringCloud Sleuth集成Zipkin持久化数据存储Mysql(Finchley版本)
一、前言 ---- 在上一篇文章中: **[Spring Cloud系列教程(十三):服务追踪Spring Cloud Sleuth+Zipkin(Finchley版本)](https://www.oschina.net/action/GoToLink?url=https%3A%2F%2Fthinkingcao.blog.csdn.net%2Farticl
SpringBoot与SpringCloud的版本对应详细版
大版本对应: Spring Boot Spring Cloud 1.2.x Angel版本 1.3.x Brixton版本 1.4.x stripes Camden版本 1.5.x Dalston版本、Edgware版本 2.0.x Finchley版本 2.1.x Greenwich.SR2 在实际开发过程中,我们需要更**详
SpringBoot与SpringCloud的版本对应详细版
大版本对应: Spring Boot Spring Cloud 1.2.x Angel版本 1.3.x Brixton版本 1.4.x stripes Camden版本 1.5.x Dalston版本、Edgware版本 2.0.x Finchley版本 2.1.x Greenwich.SR2 在实际开发过程中,我们需要更**详
SpringCloud demo
1.首先创建一个maven项目 ===============   这个maven项目会包含springcloud相关的项目,目录结构如下图:     本项目所有的springcloud版本为Finchley.SR2,对应的springboot的版本为2.0.7.RELEASE。     ![](https://img2018.cnblogs.com/
SpringCloud的版本
Spring Cloud 项目目前仍然是快速迭代期,版本变化很快。这里整理一下版本相关的东西,备忘一下。 大版本 --- ### 版本号规则 Spring Cloud并没有熟悉的数字版本号,而是对应一个开发代号。 Cloud代号 Boot版本(train) Boot版本(tested) lifecycle Angle 1.2.x inco
springCloud Finchley升级记录
最近开发新项目顺便升级 Springcloud Dalston.SR5 到当前最新版 Finchley.SR1 由于 springboot1.5.10到当前最新版 spring2.0.1版本 升级修改比较大,记录一下 首先修改一下springboot cloud 版本号其他不用变 部分配置文件名称修改了 添加一下以下依赖会提示如何迁移 <
springcloud知识点总结
一.SpringCloud面试题口述 1.SpringCloud和Dubbo SpringCloud和Dubbo都是现在主流的微服务架构 SpringCloud是Apache旗下的Spring体系下的微服务解决方案 Dubbo是阿里系的分布式服务治理框架 从技术维度上,其实SpringCloud远远的超过Dubbo,Dubbo本身只是实现
springcloud线上发布超时之feign(ribbon饥饿加载)
springcloud线上发布超时系列文章: [springcloud线上发布超时之feign(ribbon饥饿加载)](https://my.oschina.net/u/560547/blog/3275636) [springcloud线上发布超时之grpc](https://my.oschina.net/u/560547/blog/3275637)
springcloud线上发布超时方案之终极杀招:预热(测试用例)
springcloud线上发布超时系列文章: [springcloud线上发布超时之feign(ribbon饥饿加载)](https://my.oschina.net/u/560547/blog/3275636) [springcloud线上发布超时之grpc](https://my.oschina.net/u/560547/blog/3275637)
太刺激了,太刺激了,记录一下新版SpringCloud集成Feign和hystrix做服务熔断遇到的坑
太刺激了,太刺激了,熬了一个晚上,终于发现了服务熔断无效不是我的错,而是在SpringCloud的新版中,对断路器配置上有了变动;作者此处所使用的SpringCloud 和Spring Boot版本为Release Train Version: 2020.0.3;Supported Boot Version: 2.4.6。如果比较急,可以跳过前文的扯淡环节,