Hystrix核心熔断器

反编译极光
• 阅读 2090

在深入研究熔断器之前,我们需要先看一下Hystrix的几个重要的默认配置,这几个配置在HystrixCommandProperties

//时间窗(ms)
static final Integer default_metricsRollingStatisticalWindow = 10000;
//最少请求次数
private static final Integer default_circuitBreakerRequestVolumeThreshold = 20;
//熔断器打开后开始尝试半开的时间间隔
private static final Integer default_circuitBreakerSleepWindowInMilliseconds = 5000;
//错误比例
private static final Integer default_circuitBreakerErrorThresholdPercentage = 50;

这几个属性共同组成了熔断器的核心逻辑,即:

  1. 每10秒的窗口期内,当请求次数超过20次,且出错比例超过50%,则触发熔断器打开
  2. 当熔断器5秒后,会尝试放过去一部分流量进行试探
熔断器初始化

熔断器的初始化是在HystrixCircuitBreaker.FactorygetInstance方法

        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            // this should find it for all but the first time
            HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize

            // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
            // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
            // If 2 threads hit here only one will get added and the other will get a non-null response instead.
            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            if (cbForCommand == null) {
                // this means the putIfAbsent step just created a new one so let's retrieve and return it
                return circuitBreakersByCommand.get(key.name());
            } else {
                // this means a race occurred and while attempting to 'put' another one got there before
                // and we instead retrieved it and will now return it
                return cbForCommand;
            }
        }

由上方代码可知,每一个熔断器都是由HystrixCircuitBreakerImpl实现的,而所有的熔断器都维护在circuitBreakersByCommand这个ConcurrentHashMap

熔断器实现
构造方法
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private final HystrixCommandProperties properties;
    private final HystrixCommandMetrics metrics;

    enum Status {
        CLOSED, OPEN, HALF_OPEN
    }

    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
    private final AtomicLong circuitOpened = new AtomicLong(-1);
    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;

        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
        Subscription s = subscribeToStream();
        activeSubscription.set(s);
    }
}

先介绍一下几个比较基础的属性:

  1. HystrixCommandProperties:当前熔断器的配置
  2. HystrixCommandMetrics: 请求统计组件
  3. Status:熔断器状态枚举,一共包含三种,关闭、打开和半开
  4. status:当前熔断器的状态
  5. circuitOpened:当前熔断器的打开时间
  6. activeSubscription:订阅请求统计的处理函数
请求统计处理
private Subscription subscribeToStream() {
            /*
             * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
             */
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(HealthCounts hc) {
                            // check if we are past the statisticalWindowVolumeThreshold
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                // we are not past the minimum volume threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                    //we are not past the minimum error threshold for the stat window,
                                    // so no change to circuit status.
                                    // if it was CLOSED, it stays CLOSED
                                    // if it was half-open, we need to wait for a successful command execution
                                    // if it was open, we need to wait for sleep window to elapse
                                } else {
                                    // our failure rate is too high, we need to set the state to OPEN
                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                        circuitOpened.set(System.currentTimeMillis());
                                    }
                                }
                            }
                        }
                    });
        }

直接看onNext方法里的处理方式:

  1. 时间窗内的请求数量是否达标,按默认配置就是10秒钟的请求数是否超过20次,如果不达标不能开启熔断器
  2. else中首先判断错误比例是否达到比例,按默认就是50%
  3. 满足打开条件,使用CAS修改状态为打开,并记录打开时间circuitOpened为当前时间

当记录了当前应用的统计数据之后,在每次请求的时候就可以根据这些数据来判断是否应该打开熔断器了

请求过滤

不知你是否还记得在系列文章第一篇中曾经提到了一个方法applyHystrixSemantics,在这个方法中就包含了判断是否应该熔断的逻辑,如果熔断器打开的情况下会直接进入降级逻辑。这个判断的方法如下:

        public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }
  1. 第一个if,如果配置强制熔断则返回false表示开启熔断器进入降级逻辑
  2. 第二个,如果配置强制关闭则返回正常不进行后续的判断
  3. 第三个,打开时间为空则肯定没打开过
  4. 第四个,判断是否满足尝试时间,默认是5秒钟。时间计算方式如下:
private boolean isAfterSleepWindow() {
    final long circuitOpenTime = circuitOpened.get();
    final long currentTime = System.currentTimeMillis();
    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
    return currentTime > circuitOpenTime + sleepWindowTime;
}
  1. 当满足尝试时则使用CAS方式修改熔断器为半开状态

而当请求成功的时候则会调用如下方法清除统计数据,更改熔断器状态为关闭

 public void markSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
                //This thread wins the race to close the circuit - it resets the stream to start it over from 0
                metrics.resetStream();
                Subscription previousSubscription = activeSubscription.get();
                if (previousSubscription != null) {
                    previousSubscription.unsubscribe();
                }
                Subscription newSubscription = subscribeToStream();
                activeSubscription.set(newSubscription);
                circuitOpened.set(-1L);
            }
        }

请求失败则再次打开熔断器,并更新打开时间

        public void markNonSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
                circuitOpened.set(System.currentTimeMillis());
            }
        }

Hystrix核心熔断器

点赞
收藏
评论区
推荐文章
Easter79 Easter79
4年前
springcloud(八) Hystrix监控
一、Feign项目Hystrix自带的监控  在feign项目pom.xml添加:<!1,使用Hystrix的模块hystrixmetricseventstream,就可将这些监控的指标信息以text/eventstream的格式暴露给外部系统。spring
Easter79 Easter79
4年前
springcloud(四):熔断器Hystrix
熔断器雪崩效应在微服务架构中通常会有多个服务层调用,基础服务的故障可能会导致级联故障,进而造成整个系统不可用的情况,这种现象被称为服务雪崩效应。服务雪崩效应是一种因“服务提供者”的不可用导致“服务消费者”的不可用,并将不可用逐渐放大的过程。如果下图所示:A作为服务提供者,B为A的服务消费者,C和D是B
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
4年前
Hystrix 停止开发。。。Spring Cloud 何去何从?
!(https://oscimg.oschina.net/oscnet/8f265631921542e3b74f72b2759d88a5.png)栈长得到消息,Hystrix停止开发了。。。大家如果有对Hystrix不清楚的,请看下这篇文章:分布式服务防雪崩熔断器,Hystrix理论实战(https://www.oschina
Easter79 Easter79
4年前
SpringCloud入门(七)
Hystrix提供了HystrixDashboard来实时监控HystrixCommand方法的执行情况。HystrixDashboard可以有效地反映出每个Hystrix实例的运行情况,帮助我们快速发现系统中的问题,从而采取对应措施。使用熔断器仪表盘监控在Ribbon和Feign项目增加Hystrix仪表盘功能,两个项目的改造方式相同。
Easter79 Easter79
4年前
SpringCloud Alibaba微服务实战二十
!(https://oscimg.oschina.net/oscnet/d8c498ec1a794ad3ac37338f40db0851.png)在之前的项目中我们已经实现了使用Feign调用远程接口,本章内容主要是借助sentinel实现Feign接口熔断器功能。概述首先我们看看不使用熔断器的情况下调用一个没有启动的服务会出现
Stella981 Stella981
4年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Stella981 Stella981
4年前
Sentinel 流量控制 熔断降级 初探
    还记得之前写过一篇防雪崩利器:熔断器Hystrix的原理与使用https://my.oschina.net/u/3266761/blog/2654470,讲述了服务降级和熔断的控制,今天带来另一个流量控制与服务降级阿里开源框架sentinel。  首先是两者的对比:    Hystrix的关注点在于以隔离和熔断为主的容错机制
Stella981 Stella981
4年前
Hystrix异常处理及线程池划分
异常处理异常传播在HystrixCommand实现的run()方法中抛出异常时,除了HystrixBadRequestException之外,其他异常均会被Hystrix认为命令执行失败并触发服务降级的处理逻辑,所以当需要在命令执行中抛出不触发服务降级的异常时来选择它。在使用注解配置实现Hystrix命令时,可以忽略指定的异常
SpringCloud-Hystrix服务熔断与降级工作原理&源码 | 京东物流技术团队
在生活中,如果电路的负载过高,保险箱会自动跳闸,以保护家里的各种电器,这就是熔断器的一个活生生例子。在Hystrix中也存在这样一个熔断器,当所依赖的服务不稳定时,能够自动熔断,并提供有损服务,保护服务的稳定性。在运行过程中,Hystrix会根据接口的执行状态(成功、失败、超时和拒绝),收集并统计这些数据,根据这些信息来实时决策是否进行熔断。