专栏目录
17.Eureka的实例配置 18.Eureka的客户端核心设计和配置 1. 背景 2.微服务框架需要考虑的问题 3.Eureka Server 与 API 网关要考虑的问题 4.maven依赖回顾以及项目框架结构 5.所有项目的parent与spring-framework-common说明 6.微服务特性相关的依赖说明 19.Eureka的服务端设计与配置 7.从Bean到SpringCloud 8.理解 NamedContextFactory 9.如何理解并定制一个Spring Cloud组件 10.使用Log4j2以及一些核心配置 11.Log4j2 监控相关 12.UnderTow 简介与内部原理 13.UnderTow 核心配置 14.UnderTow AccessLog 配置介绍 15.UnderTow 订制 16.Eureka架构和核心概念 20. 启动一个 Eureka Server 集群 21.Spring Cloud LoadBalancer简介 22.Spring Cloud LoadBalancer核心源码 23.订制Spring Cloud LoadBalancer 24.测试Spring Cloud LoadBalancer 25.OpenFeign简介与使用 26.OpenFeign的组件 27.OpenFeign的生命周期-创建代理 28.OpenFeign的生命周期-进行调用 29.Spring Cloud OpenFeign 的解析(1) 30. FeignClient 实现重试 31. FeignClient 实现断路器以及线程隔离限流的思路 32. 改进负载均衡算法 33. 实现重试、断路器以及线程隔离源码 34.验证重试配置正确性 35. 验证线程隔离正确性 36. 验证断路器正确性 37. 实现异步的客户端封装配置管理的意义与设计 38. 实现自定义 WebClient 的 NamedContextFactory 39. 改造 resilience4j 粘合 WebClient 40. spock 单元测试封装的 WebClient(上) 40. spock 单元测试封装的 WebClient(下) 41. SpringCloudGateway 基本流程讲解(1) 41. SpringCloudGateway 基本流程讲解(2) 42.SpringCloudGateway 现有的可供分析的请求日志以及缺陷 43.为何 SpringCloudGateway 中会有链路信息丢失 44.避免链路信息丢失做的设计(1) 44.避免链路信息丢失做的设计(2) 45. 实现公共日志记录

32. 改进负载均衡算法

干货满满张哈希
• 阅读 251

32. 改进负载均衡算法

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

在前面一节,我们梳理了实现 Feign 断路器以及线程隔离的思路,这一节,我们先不看如何源码实现(因为源码中会包含负载均衡算法的改进部分),先来讨论下如何优化目前的负载均衡算法。

之前的负载均衡算法

  1. 获取服务实例列表,将实例列表按照 ip 端口排序,如果不排序即使 position 是下一个可能也代表的是之前已经调用过的实例
  2. 根据请求中的 traceId,从本地缓存中以 traceId 为 key 获取一个初始值为随机数的原子变量 position,这样防止所有请求都从第一个实例开始调用,之后第二个、第三个这样。
  3. position 原子加一,之后对实例个数取余,返回对应下标的实例进行调用

其中请求包含 traceId 是来自于我们使用了 spring-cloud-sleuth 链路追踪,基于这种机制我们能保证请求不会重试到之前已经调用过的实例。源码是:

//一定必须是实现ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//因为注册的时候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    //每次请求算上重试不会超过1分钟
    //对于超过1分钟的,这种请求肯定比较重,不应该重试
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
            //随机初始值,防止每次都是从第一个开始调用
            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;


    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
        this.serviceInstanceListSupplier = serviceInstanceListSupplier;
        this.serviceId = serviceId;
        this.tracer = tracer;
    }

    //每次重试,其实都会调用这个 choose 方法重新获取一个实例
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        return getInstanceResponseByRoundRobin(serviceInstances);
    }

    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        //为了解决原始算法不同调用并发可能导致一个请求重试相同的实例
        //从 sleuth 的 Tracer 中获取当前请求的上下文
        Span currentSpan = tracer.currentSpan();
        //如果上下文不存在,则可能不是前端用户请求,而是其他某些机制触发,我们就创建一个新的上下文
        if (currentSpan == null) {
            currentSpan = tracer.newTrace();
        }
        //从请求上下文中获取请求的 traceId,用来唯一标识一个请求
        long l = currentSpan.context().traceId();
        AtomicInteger seed = positionCache.get(l);
        int s = seed.getAndIncrement();
        int pos = s % serviceInstances.size();
        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
        return new DefaultResponse(serviceInstances.stream()
                //实例返回列表顺序可能不同,为了保持一致,先排序再取
                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                .collect(Collectors.toList()).get(pos));
    }
}

但是在这次请求突增很多的时候,这种负载均衡算法还是给我们带来了问题。

首先,本次突增,我们并没有采取扩容,导致本次的性能压力对于压力的均衡分布非常敏感。举个例子是,假设微服务 A 有 9 个实例,在业务高峰点来的时候,最理想的情况是保证无论何时这 9 个负载压力都完全均衡,但是由于我们使用了初始值为随机数的原子变量 position,虽然从一天的总量上来看,负责均衡压力肯定是均衡,但是在某一小段时间内,很可能压力全都跑到了某几个实例上,导致这几个实例被压垮,熔断,然后又都跑到了另外的几个实例上,又被压垮,熔断,如此恶性循环。

然后,我们部署采用的是 k8s 部署,同一个虚拟机上面可能会跑很多微服务的 pod。在某些情况下,同一个微服务的多个 pod 可能会跑到同一个虚拟机 Node 上,这个可以从pod 的 ip 网段上看出来:例如某个微服务有如下 7 个实例:10.238.13.12:8181,10.238.13.24:8181,10.238.15.12:8181,10.238.17.12:8181,10.238.20.220:8181,10.238.21.31:8181,10.238.21.121:8181,那么 10.238.13.12:8181 与 10.238.13.24:8181 很可能在同一个 Node 上,10.238.21.31:8181 和 10.238.21.121:8181 很可能在同一个 Node 上。我们重试,需要优先重试与之前重试过的实例尽量不在同一个 Node 上的实例,因为同一个 Node 上的实例只要有一个有问题或者压力过大,其他的基本上也有问题或者压力过大。

最后,如果调用某个实例一直失败,那么这个实例的调用优先级需要排在其他正常的实例后面。这个对于减少快速刷新发布(一下子启动很多实例之后停掉多个老实例,实例个数大于重试次数配置)对于用户的影响,以及某个可用区突然发生异常导致多个实例下线对用户的影响,以及业务压力已经过去,压力变小后,需要关掉不再需要的实例,导致大量实例发生迁移的时候对用户的影响,有很大的作用。

针对以上问题的优化方案

我们针对上面三个问题,提出了一种优化后的解决方案:

  1. 针对每次请求,记录:
    1. 本次请求已经调用过哪些实例 -> 请求调用过的实例缓存
    2. 调用的实例,当前有多少请求在处理中 -> 实例运行请求数
    3. 调用的实例,最近请求错误率 -> 实例请求错误率
  2. 随机将实例列表打乱,防止在以上三个指标都相同时,总是将请求发给同一个实例。
  3. 按照 当前请求没有调用过靠前 -> 错误率越小越靠前 的顺序排序 -> 实例运行请求数越小越靠前
  4. 取排好序之后的列表第一个实例作为本次负载均衡的实例

具体实现是:以下的代码来自于:https://github.com/JoJoTec/spring-cloud-parent

我们使用了依赖:

<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
</dependency>

记录实例数据的缓存类:

@Log4j2
public class ServiceInstanceMetrics {
    private static final String CALLING = "-Calling";
    private static final String FAILED = "-Failed";

    private MetricRegistry metricRegistry;

    ServiceInstanceMetrics() {
    }

    public ServiceInstanceMetrics(MetricRegistry metricRegistry) {
        this.metricRegistry = metricRegistry;
    }

    /**
     * 记录调用实例
     * @param serviceInstance
     */
    public void recordServiceInstanceCall(ServiceInstance serviceInstance) {
        String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
        metricRegistry.counter(key + CALLING).inc();
    }
    /**
     * 记录调用实例结束
     * @param serviceInstance
     * @param isSuccess 是否成功
     */
    public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {
        String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
        metricRegistry.counter(key + CALLING).dec();
        if (!isSuccess) {
            //不成功则记录失败
            metricRegistry.meter(key + FAILED).mark();
        }
    }

    /**
     * 获取正在运行的调用次数
     * @param serviceInstance
     * @return
     */
    public long getCalling(ServiceInstance serviceInstance) {
        String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
        long count = metricRegistry.counter(key + CALLING).getCount();
        log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
        return count;
    }

    /**
     * 获取最近一分钟调用失败次数分钟速率,其实是滑动平均数
     * @param serviceInstance
     * @return
     */
    public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {
        String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
        double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
        log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
        return rate;
    }
}

负载均衡核心代码:

private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
        .expireAfterAccess(3, TimeUnit.MINUTES)
        .build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;

//每次重试,其实都会调用这个 choose 方法重新获取一个实例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
    Span span = tracer.currentSpan();
    return serviceInstanceListSupplier.get().next()
            .map(serviceInstances -> {
                //保持 span 和调用 choose 的 span 一样
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                    return getInstanceResponse(serviceInstances);
                }
            });
}


private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    //读取 spring-cloud-sleuth 的对于当前请求的链路追踪上下文,获取对应的 traceId
    Span currentSpan = tracer.currentSpan();
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    return getInstanceResponseByRoundRobin(l, serviceInstances);
}

@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
    //首先随机打乱列表中实例的顺序
    Collections.shuffle(serviceInstances);
    //需要先将所有参数缓存起来,否则 comparator 会调用多次,并且可能在排序过程中参数发生改变(针对实例的请求统计数据一直在并发改变)
    Map<ServiceInstance, Integer> used = Maps.newHashMap();
    Map<ServiceInstance, Long> callings = Maps.newHashMap();
    Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
    serviceInstances = serviceInstances.stream().sorted(
            Comparator
                    //之前已经调用过的网段,这里排后面
                    .<ServiceInstance>comparingInt(serviceInstance -> {
                        return used.computeIfAbsent(serviceInstance, k -> {
                            return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {
                                return serviceInstance.getHost().contains(prefix);
                            }) ? 1 : 0;
                        });
                    })
                    //当前错误率最少的
                    .thenComparingDouble(serviceInstance -> {
                        return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {
                            double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
                            //由于使用的是移动平均值(EMA),需要忽略过小的差异(保留两位小数,不是四舍五入,而是直接舍弃)
                            return ((int) (value * 100)) / 100.0;
                        });
                    })
                    //当前负载请求最少的
                    .thenComparingLong(serviceInstance -> {
                        return callings.computeIfAbsent(serviceInstance, k ->
                                serviceInstanceMetrics.getCalling(serviceInstance)
                        );
                    })
    ).collect(Collectors.toList());
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    ServiceInstance serviceInstance = serviceInstances.get(0);
    //记录本次返回的网段
    calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
    //目前记录这个只为了兼容之前的单元测试(调用次数测试)
    positionCache.get(traceId).getAndIncrement();
    return new DefaultResponse(serviceInstance);
}

对于记录实例数据的缓存何时更新,是在 FeignClient 粘合重试,断路以及线程隔离的代码中的,这个我们下一节就会看到。

一些组内关于方案设计的取舍 Q&A

1. 为何没有使用所有微服务共享的缓存来保存调用数据,来让这些数据更加准确

共享缓存的可选方案包括将这些数据记录放入 Redis,或者是 Apache Ignite 这样的内存网格中。但是有两个问题:

  1. 如果数据记录放入 Redis 这样的额外存储,如果 Redis 不可用会导致所有的负载均衡都无法执行。如果放入 Apache Ignite,如果对应的节点下线,那么对应的负载均衡也无法执行。这些都是不能接受的。
  2. 假设微服务 A 需要调用微服务 B,可能 A 的某个实例调用 B 的某个实例有问题,但是 A 的其他实例调用 B 的这个实例却没有问题,例如当某个可用区与另一个可用区网络拥塞的时候。如果用同一个缓存 Key 记录 A 所有的实例调用 B 这个实例的数据,显然是不准确的。

每个微服务使用本地缓存,记录自己调用其他实例的数据,在我们这里看来,不仅是更容易实现,也是更准确的做法。

2. 采用 EMA 的方式而不是请求窗口的方式统计最近错误率

采用请求窗口的方式统计,肯定是最准确的,例如我们统计最近一分钟的错误率,就将最近一分钟的请求缓存起来,读取的时候,将缓存起来的请求数据加在一起取平均数即可。但是这种方式在请求突增的时候,可能会占用很多很多内存来缓存这些请求。同时计算错误率的时候,随着缓存请求数的增多也会消耗更大量的 CPU 进行计算。这样做很不值得。

EMA 这种滑动平均值的计算方式,常见于各种性能监控统计场景,例如 JVM 中 TLAB 大小的动态计算,G1 GC Region 大小的伸缩以及其他很多 JVM 需要动态得出合适值的地方,都用这种计算方式。他不用将请求缓存起来,而是直接用最新值乘以一个比例之后加上老值乘以 (1 - 这个比例),这个比例一般高于 0.5,表示 EMA 和当前最新值更加相关。

但是 EMA 也带来另一个问题,我们会发现随着程序运行小数点位数会非常多,会看到类似于如下的值:0.00000000123, 0.120000001, 0.120000003, 为了忽略过于细致差异的影响(其实这些影响也来自于很久之前的错误请求),我们只保留两位小数进行排序

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

32. 改进负载均衡算法

评论区
推荐文章

暂无数据