Hystrix熔断器执行机制

字节玄铁师
• 阅读 4741

本篇假设大家对Hystrix的执行过程及源码有一定的了解,这里介绍Hystrix的熔断器执行机制。

1.Hystrix 熔断器类结构

Hystrix熔断器执行机制
HystrixCircuitBreaker作为接口定义,具体的实现有NoOpCircuitBreakerHystrixCircuitBreakerImpl,其中NoOpCircuitBreaker只是个空壳没有具体的实现,相当于不熔断。HystrixCircuitBreakerImpl是主要的熔断逻辑实现。

2.Hystrix 熔断器状态

熔断器有三个状态 CLOSED OPENHALF_OPEN 熔断器默认关闭状态,当触发熔断后状态变更为 OPEN,在等待到指定的时间,Hystrix会放请求检测服务是否开启,这期间熔断器会变为HALF_OPEN 半开启状态,熔断探测服务可用则继续变更为 CLOSED关闭熔断器。
Hystrix熔断器执行机制

3.代码视角

ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

Hystrix为每个commandKey都维护了一个熔断器,保持着对应的熔断器,所以当new XXXHystrixCommand()的时候依然能够保持着原来熔断器的状态。

3.1 如何判定开启熔断
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    this.properties = properties;
    this.metrics = metrics;

    //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
    Subscription s = subscribeToStream();
    activeSubscription.set(s);
}

private Subscription subscribeToStream() {
    /*
     * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
     */
    return metrics.getHealthCountsStream()
            .observe()
            .subscribe(new Subscriber<HealthCounts>() {
                
                 //.....................省略干扰代码......................
                @Override
                public void onNext(HealthCounts hc) {
                    // check if we are past the statisticalWindowVolumeThreshold
                    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                        
                    } else {
                        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                            
                        } else {
                            
                            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                circuitOpened.set(System.currentTimeMillis());
                            }
                        }
                    }
                }
            });
}

这里面HystrixBreaker启动的时候会订阅HystrixCommandMetricsHealthCountsStream,每当HealthCountsStream搜集到数据,都会触发上面的 onNext方法,然后该方法做下面几个判断
1.当前请求量是否达到设定水位(请求量太小不做阀值控制)
2.当前的请求错误量是否达到阀值,达到后会将熔断器状态置为 OPEN, circuitOpened设置为当前时间戳表示开启的时间。

3.2 attemptExecution

先看下HystrixCommand 的执行Observable
com.netflix.hystrix.AbstractCommand#applyHystrixSemantics

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) {
··········省略代码··········

这里,每次HystrixCommand执行都会调用 circuitBreaker.attemptExecution()

public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }

这里代码判断逻辑
1.判断是否强制开启熔断器,是则return false,command不能执行
2.判断是否强制关闭熔断器,是则return true, command可执行
3.判断熔断器是否开启 circuitOpened.get() == -1表示没有开启,则return true,command可执行。
4.到这步证明已经开启了熔断器,那么判断是否可尝试请求,如果可以同时会把熔断器的状态改为HALF_OPEN

3.3 markSuccess&markNonSuccess

com.netflix.hystrix.AbstractCommand#executeCommandAndObserve

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    ......省略干扰代码.......

    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
            if (shouldOutputOnNextEvents()) {
                executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
            }
            if (commandIsScalar()) {
                ......省略干扰代码.......
                circuitBreaker.markSuccess();
            }
        }
    };

    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call() {
            if (!commandIsScalar()) {
                ......省略干扰代码.......
                circuitBreaker.markSuccess();
            }
        }
    };

    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            circuitBreaker.markNonSuccess();
            ......省略干扰代码.......
        }
    };

    ......省略干扰代码.......

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

此处表示HystrixCommand执行的过程中对应的熔断器状态变更,上面代码不难看出,当error的时候会触发circuitBreaker.markNonSuccess();,执行成功或者执行完成触发 circuitBreaker.markSuccess();

markNonSuccess

@Override
public void markNonSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
        //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
        circuitOpened.set(System.currentTimeMillis());
    }
}

如果能执行到markNonSuccess,说明此时熔断器是关闭状态,或者尝试放流阶段。关闭状态的话不做处理(未触发熔断),尝试放流时,发现依然执行失败,这里讲熔断器状态重新置为开启状态,并把circuitOpened设置为当前的时间戳。

markSuccess

@Override
public void markSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        //This thread wins the race to close the circuit - it resets the stream to start it over from 0
        metrics.resetStream();
        Subscription previousSubscription = activeSubscription.get();
        if (previousSubscription != null) {
            previousSubscription.unsubscribe();
        }
        Subscription newSubscription = subscribeToStream();
        activeSubscription.set(newSubscription);
        circuitOpened.set(-1L);
    }
}

能走到markSuccess说明熔断器此时关闭或者放流阶段,尝试放流阶段则讲熔断器关闭,设置circuitOpened=-1,并重置指标统计。

4.结束了

到这里熔断器的介绍就结束了,回顾下主要有熔断器如何开启、如何关闭、几个状态的变更。一个完整的熔断器就此呈现在大家的面前。

点赞
收藏
评论区
推荐文章
Easter79 Easter79
3年前
springcloud(四):熔断器Hystrix
熔断器雪崩效应在微服务架构中通常会有多个服务层调用,基础服务的故障可能会导致级联故障,进而造成整个系统不可用的情况,这种现象被称为服务雪崩效应。服务雪崩效应是一种因“服务提供者”的不可用导致“服务消费者”的不可用,并将不可用逐渐放大的过程。如果下图所示:A作为服务提供者,B为A的服务消费者,C和D是B
Stella981 Stella981
3年前
Hystrix 停止开发。。。Spring Cloud 何去何从?
!(https://oscimg.oschina.net/oscnet/8f265631921542e3b74f72b2759d88a5.png)栈长得到消息,Hystrix停止开发了。。。大家如果有对Hystrix不清楚的,请看下这篇文章:分布式服务防雪崩熔断器,Hystrix理论实战(https://www.oschina
Easter79 Easter79
3年前
SpringCloud入门(七)
Hystrix提供了HystrixDashboard来实时监控HystrixCommand方法的执行情况。HystrixDashboard可以有效地反映出每个Hystrix实例的运行情况,帮助我们快速发现系统中的问题,从而采取对应措施。使用熔断器仪表盘监控在Ribbon和Feign项目增加Hystrix仪表盘功能,两个项目的改造方式相同。
Stella981 Stella981
3年前
Hystrix实现ThreadLocal上下文的传递 转
springcloud微服务中,服务间传输全局类参数,如session信息等。一、问题背景Hystrix有2个隔离策略:THREAD以及SEMAPHORE,当隔离策略为THREAD时,是没办法拿到ThreadLocal中的值的。Hystrix提供了基于信号量和线程两种隔离模式,通过在Hystrix基础章节中已经验证过,通过
Easter79 Easter79
3年前
SpringCloud微服务(04):Turbine组件,实现微服务集群监控
写在前面,阅读本文前,你需要了解熔断器相关内容SpringCloud微服务:Hystrix组件,实现服务熔断(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fmp.weixin.qq.com%2Fs%3F__biz%3DMzU4Njg0MzYwNw%3D%3D%26mid%3D22
Stella981 Stella981
3年前
Hystrix的使用1
Hystrix的使用1简介以下文章照搬自Hystrix官网介绍(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fgithub.com%2FNetflix%2FHystrix%2Fwiki)1.什么是Hystrix?在
Stella981 Stella981
3年前
Sentinel 流量控制 熔断降级 初探
    还记得之前写过一篇防雪崩利器:熔断器Hystrix的原理与使用https://my.oschina.net/u/3266761/blog/2654470,讲述了服务降级和熔断的控制,今天带来另一个流量控制与服务降级阿里开源框架sentinel。  首先是两者的对比:    Hystrix的关注点在于以隔离和熔断为主的容错机制
Stella981 Stella981
3年前
Hystrix断路器是如何工作的
前言20181130,Hystrix已经不再维护,这里是学习记录。12月1日才完成,没有完成11月的诺言,捐款记录以上动弹。https://my.oschina.net/floor/tweet/19421296(https://my.oschina.net/floor/tweet/19421296)Hystrix是什么
Stella981 Stella981
3年前
Hystrix异常处理及线程池划分
异常处理异常传播在HystrixCommand实现的run()方法中抛出异常时,除了HystrixBadRequestException之外,其他异常均会被Hystrix认为命令执行失败并触发服务降级的处理逻辑,所以当需要在命令执行中抛出不触发服务降级的异常时来选择它。在使用注解配置实现Hystrix命令时,可以忽略指定的异常
SpringCloud-Hystrix服务熔断与降级工作原理&源码 | 京东物流技术团队
在生活中,如果电路的负载过高,保险箱会自动跳闸,以保护家里的各种电器,这就是熔断器的一个活生生例子。在Hystrix中也存在这样一个熔断器,当所依赖的服务不稳定时,能够自动熔断,并提供有损服务,保护服务的稳定性。在运行过程中,Hystrix会根据接口的执行状态(成功、失败、超时和拒绝),收集并统计这些数据,根据这些信息来实时决策是否进行熔断。