Hystrix——让你的服务更稳一点

Stella981
• 阅读 512

摘要: 1、为什么要用Hystrix在分布式服务环境下,服务之间的调用关系变得错综复杂,你是否担心依赖的服务延迟导致自己的服务也被拖跨呢?是否在苦苦思考如何优雅的对依赖服务进行异步调用呢?是否希望当流量高峰时自动进行 ...

1、为什么要用Hystrix

在分布式服务环境下,服务之间的调用关系变得错综复杂,你是否担心依赖的服务延迟导致自己的服务也被拖跨呢?是否在苦苦思考如何优雅的对依赖服务进行异步调用呢?是否希望当流量高峰时自动进行服务降级避免把自己线程耗尽而宕机?以上的问题,Hystrix能有效的解决。

2、 什么是Hystrix

Hystrix——让你的服务更稳一点

Hystrix由Netflix于2011年创立的项目,最初是用于解决系统快速恢复的需求,其特性能保证在分布式服务中,防止服务失败引起的级联失败,项目的熔断机制能够使依赖服务的调用快速失败并且自动恢复,有效避免对失败的服务进行请求。同时对每个接口进行线程池隔离(或按信号量隔离),避免因为高峰流量或服务延迟导致线程耗尽而宕机。

Hystrix——让你的服务更稳一点

3、 Hystrix入门介绍

Hystrix使用HystrixCommand和HystrixObservableCommand进行对调用接口的封装,从而使接口的调用自动实现了线程隔离以及调用熔断机制,通过执行execute() 或者queue() 就能完成接口的同步或者异步调用。使用上手相当轻松,那么我们就先以一个简单程序来认识Hystrix:

a) 项目加入Hystrix依赖

Hystrix——让你的服务更稳一点

b) 创建一个用来被调用的方法, HelloWorldService

Hystrix——让你的服务更稳一点

c) 创建一个使用HystrixCommand包装的调用, HelloWorldInvoker

Hystrix——让你的服务更稳一点

d) 进行调用测试

  Hystrix——让你的服务更稳一点

4、 Hystrix核心框架(HystrixCommand)

HystrixCommand是框架的流程核心类,主要承担封装接口的调用执行,将接口调用进行命令分组,调用执行进行线程池隔离,请求的结果缓存命中,判断熔断,以及执行错误或熔断后的结果FallBack机制流程,主要工作流程如下图:

Hystrix——让你的服务更稳一点

4.1 HystrixCommand创建

HystrixCommand创建需要两个必填参数:HystrixCommandGroupKey和HystrixThreadPoolKey,HystrixCommandGroupKey用于进行command分组,便于调用统计。HystrixThreadPoolKey用于线程池隔离,相同的线程池key的接口调用,将会使用相同的线程池,线程池大小默认为10个线程,其余参数将会一并初始化。

Hystrix——让你的服务更稳一点

4.2 HystrixCommand执行

分别可使用execute(),queue(),observe(),toObservable()完成接口调用并包装上述工作全流程, 四种执行方式区别如下:

a) execute:command执行后进行同步等待,直到结果返回

b) queue:command执行后返回一个Future对象,Future对象可以进行异步的结果获取

c) observe:command立即执行,进行接口调用后并返回observable,外部subscriber进行数据读取

d) toObservable:command不会立即执行,接口调用仅当有外部subscriber进行订阅后,接口才会被调用

前三种执行方式都是toObservable的变种,command底层均是执行toObservable方法得到一个Observable对象,然后该对象被订阅的Subscriber进行结果获取。过程如下:

Hystrix——让你的服务更稳一点

5、 Hystrix核心框架(HystrixCircuitBreaker)

5.1 熔断器HystrixCircuitBreaker,是保证调用接口延迟或失败情况下自动熔断,保证服务不被外部调用的失败而拖跨。熔断器和每个HystrixCommand绑定,为每个独立的command进行失败计数和熔断状态控制。在创建command时对熔断器进行初始化

Hystrix——让你的服务更稳一点

5.2 HystrixCircuitBreaker 通过HealthCountsStream维护一个command调用的健康计数器,如果计数器的线程堆积数大于允许的阙值或者调用失败比例大于允许的百分比,则进行熔断处理,后续接口调用均会被短路并降级调用fallBack()返回。

Hystrix——让你的服务更稳一点

5.3 HystrixCircuitBreaker将在熔断后的一段时间内,允许部分请求进行接口调用,若返回接口正确,则熔断器将关闭,服务进行正常请求,若此时接口调用仍旧失败,则熔断器保持熔断,并重新进行半熔断状态倒计时。

6、 Hystrix核心框架(RequestCache)

接口调用中,如果不是高频次修改的数据查询结果,可以使用请求缓存来减少服务调用的网络开销,Hystrix会基于调用的key进行结果命中,当能匹配到结果是则直接返回结果而避免进行接口调用。

a) 请求缓存是基于command中的getCacheKey()方法判断是否是相同请求,所以需重写该方法

Hystrix——让你的服务更稳一点

b) 确保在调用之前开启HystrixRequestContext,可以使用统一的拦截器来进行拦截开启。

  Hystrix——让你的服务更稳一点

7、 Hystrix核心框架(FallBack)

当接口调用超时或者直接出现异常,框架将对接口调用进行降级处理,调用fallBack进行结果返回。接口调用的降级只需要在HystrixCommand中重写getFallBack()方法,方法同接口调用一样的返回,用于直接书写接口返回,或者在fallBack中继续调用HystrixCommand进行接口的降级调用

Hystrix——让你的服务更稳一点

8、项目中的应用实现

Hystrix的服务隔离主要有两种,常用的就是线程池隔离的方式,对热点接口建立单独的线程池避免对主程序的影响。另一种是信号量的方式,用的场景不是太多。两者的区别其实就是一个增大系统的开销,一个则直接限制了线程总的并发数,开销更小一些。

 在传统Spring项目中的应用

     <!-- hystrix -->
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>1.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>1.5.9</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-javanica</artifactId>
            <version>1.5.9</version>
        </dependency>

在Spring的配置文件中配置Hystrix的切面信息

<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"></bean>
    <aop:aspectj-autoproxy />

主要是开启注解的AOP扫描

@Configuration
public class HystrixCircuitBreakerConfiguration {
    public HystrixCircuitBreakerConfiguration() {
    }
    //重要的入口类
    @Bean
    public HystrixCommandAspect hystrixCommandAspect() {
        return new HystrixCommandAspect();
    }
}
@Aspect
public class HystrixCommandAspect {
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {
    }
    // 请求的合并
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }
 
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = AopUtils.getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
        } else {
            HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
            //如果是@HystrixCommand,最终调用CommandMetaHolderFactory.create创建metaHolder
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            // 创建HystrixInvokable,只是一个空接口,没有任何方法,只是用来标记具备可执行的能力
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
 
            try {
                Object result;
                if (!metaHolder.isObservable()) {
                    // 利用工具CommandExecutor来执行
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = this.executeObservable(invokable, executionType, metaHolder);
                }
 
                return result;
            } catch (HystrixBadRequestException var9) {
                throw var9.getCause();
            } catch (HystrixRuntimeException var10) {
                throw this.getCauseOrDefault(var10, var10);
            }
        }
    }
    
}
//创建Command
public class HystrixCommandFactory {
    private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory();
    private HystrixCommandFactory() {
    }
    public static HystrixCommandFactory getInstance() {
        return INSTANCE;
    }
    public HystrixInvokable create(MetaHolder metaHolder) {
        Object executable;
        if (metaHolder.isCollapserAnnotationPresent()) {
            executable = new CommandCollapser(metaHolder);
        } else if (metaHolder.isObservable()) {//如果切入的方法是Observable类型
            executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        } else {//比如:public String hello()方法
            executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        }
 
        return (HystrixInvokable)executable;
    }
}
//新建GenericCommand:有点熟悉了吧
public class GenericCommand extends AbstractHystrixCommand<Object> {
    private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);
 
    public GenericCommand(HystrixCommandBuilder builder) {
        super(builder);
    }
    // 执行具体的方法,如:serviceName的hello
    protected Object run() throws Exception {
        LOGGER.debug("execute command: {}", this.getCommandKey().name());
        return this.process(new AbstractHystrixCommand<Object>.Action() {
            Object execute() {
                return GenericCommand.this.getCommandAction().execute(GenericCommand.this.getExecutionType());
            }
        });
    }
    // 执行fallback方法,如:serviceName的error()
    protected Object getFallback() {
        final CommandAction commandAction = this.getFallbackAction();
        if (commandAction != null) {
            try {
                return this.process(new AbstractHystrixCommand<Object>.Action() {
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable var3) {
                LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());
                throw new FallbackInvocationException(var3.getCause());
            }
        } else {
            return super.getFallback();
        }
    }
    //实现了HystrixExecutable的方法
    public R execute() {
        try {
            return this.queue().get();
        } catch (Exception var2) {
            throw Exceptions.sneakyThrow(this.decomposeException(var2));
        }
    }
}
//HystrixInvokable(GenericCommand)委托CommandExecutor 来执行
public class CommandExecutor {
    // 全文的关键方法
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);
        switch(executionType) {
        case SYNCHRONOUS:
            // 首先将 invokable 转换为 HystrixExecutable,再执行 HystrixCommand的execute() 方法
            return castToExecutable(invokable, executionType).execute();
        case ASYNCHRONOUS://如果切入的目标方法是Future返回类型时
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
 
            return executable.queue();
        case OBSERVABLE:
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }
    // HystrixExecutable 的 execute() 方法由 HystrixCommand.execute() 实现
    private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
        if (invokable instanceof HystrixExecutable) {
            return (HystrixExecutable)invokable;
        } else {
            throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
        }
    }
}
public interface HystrixExecutable<R> extends HystrixInvokable<R> {
    R execute();
    Future<R> queue();
    Observable<R> observe();
}

 可以看到我们主要是通过这个类切面扫描Hystrix的相关注解,以达到接口处理前,提前执行Hystrix相关逻辑的代码。

/**
 * 提供客户行为接口
 *
 */
@Controller
@RequestMapping(value = "/test")
public class BehaviorController {
    Logger                                logger    = Logger.getLogger(BehaviorController.class);
    @Autowired
    private BehaviorService behaviorService;

    @RequestMapping(value="/addBehavior",method = RequestMethod.POST,produces = "application/json;charset=UTF-8")
    @ResponseBody
    @HystrixCommand(fallbackMethod = "fallback", threadPoolProperties = {  
            @HystrixProperty(name = "coreSize", value = "20"), @HystrixProperty(name = "maxQueueSize", value = "100"),
            @HystrixProperty(name = "queueSizeRejectionThreshold", value = "20")},
            commandProperties = {  
                    @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "30000"),  
                    @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "20")
  
    })
    public String addBehavior(@RequestBody String parms) {

            //业务逻辑实现
            return result;
    }

    public String fallback(@RequestBody String parms){
        logger.info("fallback");
        //失败的实现
        return result;
    }
}

注意:
请求的接口必须为public,fallback为降级的接口逻辑,可以为private,也可以为public。但是要特别注意fallback方法的返回值和参数必须和请求方法相同。另外需要说的是,当请求失败、被拒绝、超时或者断路器打开时,都会进入回退方法,但是进入回退方法并不意味着断路器已经被打开。

9、 总结

Hystrix对于接口调用具有很好的保护,能在多服务依赖的 分布式系统中,有效的提供应用的可用性,并且对失败应用进行熔断和恢复检查,让应用在复杂的环境中也能各种稳。

常用参数介绍

参数

描述

默认值

execution.isolation.strategy

隔离策略,有THREAD和SEMAPHORE

THREAD - 它在单独的线程上执行,并发请求受线程池中的线程数量的限制
SEMAPHORE - 它在调用线程上执行,并发请求受到信号量计数的限制

默认使用THREAD模式,以下几种场景可以使用SEMAPHORE模式:

只想控制并发度

外部的方法已经做了线程隔离

调用的是本地方法或者可靠度非常高、耗时特别小的方法(如medis)

execution.isolation.thread.timeoutInMilliseconds

超时时间

默认值:1000

在THREAD模式下,达到超时时间,可以中断

在SEMAPHORE模式下,会等待执行完成后,再去判断是否超时

设置标准:

有retry,99meantime+avg meantime

没有retry,99.5meantime

execution.timeout.enabled

HystrixCommand.run()执行是否应该有超时。

默认值:true

fallback.isolation.semaphore.maxConcurrentRequests

设置在使用时允许执行fallback方法的最大并发请求数

默认值:10

circuitBreaker.requestVolumeThreshold

设置滚动时间窗中,断路器熔断的最小请求数

 默认值:20

滚动窗口默认10s,即10s内失败请求达到20个,熔断器即打开

 coreSize

设置执行命令线程池的核心线程数。

 默认值:10

maxQueueSize

设置执行命令线程池的核心线程数。

  默认值:-1

当设置为-1时,线程池使用SynchronousQueue实现的队列,否则将使用LinkedBlockingQueue实现的队列

queueSizeRejectionThreshold

为队列设置拒绝阈值

默认值:5

当设置该参数后,即使队列没有达到最大值也能拒绝请求。

注意:当maxQueueSize属性为-1的时候,该属性不会生效

点赞
收藏
评论区
推荐文章
Jacquelyn38 Jacquelyn38
1年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。 1、使用解构获取json数据let jsonData   id: 1, status: "OK", data: ['a', 'b'] ; let  id, status, data: number   jsonData; console.log(id, status, number )
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录 问题 用navicat导入数据时,报错: 原因这是因为当前的MySQL不支持datetime为0的情况。 解决修改sql\mode: sql\mode:SQL Mode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。 全局s
Stella981 Stella981
11个月前
KVM调整cpu和内存
一.修改kvm虚拟机的配置 1、virsh edit centos7 找到“memory”和“vcpu”标签,将 <name>centos7</name> <uuid>2220a6d1-a36a-4fbb-8523-e078b3dfe795</uuid>
Easter79 Easter79
11个月前
Twitter的分布式自增ID算法snowflake (Java版)
概述 == 分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。 有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。 而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Easter79 Easter79
11个月前
SpringCloud之Hystrix服务降级(七)
### Hystrix设计原则 1.防止单个服务的故障,耗尽整个系统服务的容器(比如tomcat)的线程资源,避免分布式环境里大量级联失败。通过第三方客户端访问(通常是通过网络)依赖服务出现失败、拒绝、超时或短路时执行回退逻辑 2.用快速失败代替排队(每个依赖服务维护一个小的线程池或信号量,当线程池满或信号量满,会立即拒绝服务而不会排队等待)和优雅的服
Stella981 Stella981
11个月前
Hystrix详述(一)
Hystrix详述(一) 博客分类: 架构   **一、hystrix的作用** * 控制被依赖服务的延时和失败 * 防止在复杂系统中的级联失败 * 可以进行快速失败(不需要等待)和**快速恢复**(当依赖服务失效后又恢复正常,其对应的线程池会被清理干净,即剩下的都是未使用的线程,相对于整个 Tomcat 容器的线程池被占满需要耗费更长时间以
Wesley13 Wesley13
11个月前
MySQL查询按照指定规则排序
1.按照指定(单个)字段排序 select * from table_name order id desc; 2.按照指定(多个)字段排序 select * from table_name order id desc,status desc; 3.按照指定字段和规则排序 selec
helloworld_70591445 helloworld_70591445
11个月前
AWS国庆双重礼,仅限7天
自2021年10月1日00:00起至2021年10月7日24:00,新注册并激活(需全部完成账号注册的五个步骤,否则账号状态并未激活)AWS海外区域账户,填写页面下方表单,即可申领价值$200美元的 AWS 海外区域账户服务抵扣券直充到您的账户,用以抵扣服务消费,助您轻松体验多个云迁移应用场景。同时,您还可获赠。国庆双重礼,仅限7天$200美元 AWS服务抵
Wesley13 Wesley13
11个月前
PHP中的NOW()函数
是否有一个PHP函数以与MySQL函数`NOW()`相同的格式返回日期和时间? 我知道如何使用`date()`做到这一点,但是我问是否有一个仅用于此的函数。 例如,返回: 2009-12-01 00:00:00 * * * ### #1楼 使用此功能: function getDatetimeNow() {
helloworld_34035044 helloworld_34035044
2个月前
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。 uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid() 或 uuid(sep)参数说明:sep 布尔值,生成的uuid中是否包含分隔符'',缺省为