聊聊dubbo的ExecuteLimitFilter

字节映星客
• 阅读 2206

本文主要研究一下dubbo的ExecuteLimitFilter

ExecuteLimitFilter

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java

public class ExecuteLimitFilter extends ListenableFilter {

    private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";

    public ExecuteLimitFilter() {
        super.listener = new ExecuteLimitListener();
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
        if (!RpcStatus.beginCount(url, methodName, max)) {
            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +
                    url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
                    "\" /> limited.");
        }

        invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        try {
            return invoker.invoke(invocation);
        } catch (Throwable t) {
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        }
    }

    static class ExecuteLimitListener implements Listener {
        @Override
        public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
            RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
        }

        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
            RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
        }

        private long getElapsed(Invocation invocation) {
            String beginTime = invocation.getAttachment(EXECUTELIMIT_FILTER_START_TIME);
            return StringUtils.isNotEmpty(beginTime) ? System.currentTimeMillis() - Long.parseLong(beginTime) : 0;
        }
    }
}
  • ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
  • invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
  • ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时

RpcStatus

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java

public class RpcStatus {

    private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();

    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
    private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
    private final AtomicInteger active = new AtomicInteger();
    private final AtomicLong total = new AtomicLong();
    private final AtomicInteger failed = new AtomicInteger();
    private final AtomicLong totalElapsed = new AtomicLong();
    private final AtomicLong failedElapsed = new AtomicLong();
    private final AtomicLong maxElapsed = new AtomicLong();
    private final AtomicLong failedMaxElapsed = new AtomicLong();
    private final AtomicLong succeededMaxElapsed = new AtomicLong();

    //......

    public static void beginCount(URL url, String methodName) {
        beginCount(url, methodName, Integer.MAX_VALUE);
    }

    /**
     * @param url
     */
    public static boolean beginCount(URL url, String methodName, int max) {
        max = (max <= 0) ? Integer.MAX_VALUE : max;
        RpcStatus appStatus = getStatus(url);
        RpcStatus methodStatus = getStatus(url, methodName);
        if (methodStatus.active.incrementAndGet() > max) {
            methodStatus.active.decrementAndGet();
            return false;
        } else {
            appStatus.active.incrementAndGet();
            return true;
        }
    }

    /**
     * @param url
     * @param elapsed
     * @param succeeded
     */
    public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
        endCount(getStatus(url), elapsed, succeeded);
        endCount(getStatus(url, methodName), elapsed, succeeded);
    }

    private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
        status.active.decrementAndGet();
        status.total.incrementAndGet();
        status.totalElapsed.addAndGet(elapsed);
        if (status.maxElapsed.get() < elapsed) {
            status.maxElapsed.set(elapsed);
        }
        if (succeeded) {
            if (status.succeededMaxElapsed.get() < elapsed) {
                status.succeededMaxElapsed.set(elapsed);
            }
        } else {
            status.failed.incrementAndGet();
            status.failedElapsed.addAndGet(elapsed);
            if (status.failedMaxElapsed.get() < elapsed) {
                status.failedMaxElapsed.set(elapsed);
            }
        }
    }

    //......
}
  • RpcStatus的beginCount方法会递增methodStatus.active,然后判断是否大于max值,超出则返回false并递减methodStatus.active;小于等于则递增appStatus.active;endCount方法会递减status.active,递增status.total,然后根据成功与否更新status.succeededMaxElapsed或status.failed、status.failedElapsed、status.failedMaxElapsed

小结

  • ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
  • ExecuteLimitFilter的invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
  • ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时

doc

点赞
收藏
评论区
推荐文章
捉虫大师 捉虫大师
4年前
dubbo 2.7应用级服务发现踩坑小记
本文已收录https://github.com/lkxiaolou/lkxiaolou欢迎star。背景本文记录最近一位读者反馈的dubbo2.7.x中应用级服务发现的问题,关于dubbo应用级服务发现的相关介绍可以参考之前的文章,这里不再赘述。读者反馈他们在基于dubbo2.7应用级服务发现开发dubbo网关,根据文章《dubbo应用级服务发现初
Easter79 Easter79
3年前
springcloud知识点总结
一.SpringCloud面试题口述1.SpringCloud和DubboSpringCloud和Dubbo都是现在主流的微服务架构SpringCloud是Apache旗下的Spring体系下的微服务解决方案Dubbo是阿里系的分布式服务治理框架从技术维度上,其实SpringCloud远远的超过Dubbo,Dubbo本身只是实现
Wesley13 Wesley13
3年前
eclipse中不能找到dubbo.xsd解决方法
使用dubbo时遇到问题:org.xml.sax.SAXParseException: schema\_reference.4: Failed to read schema document 'http://code.alibabatech.com/schema/dubbo/dubbo.xsd', because 1) could not f
Wesley13 Wesley13
3年前
RPC接口测试(三) RPC接口测试
RPC接口测试接口测试主要分HTTP和RPC两类,RPC类型里面以Dubbo较为知名。互联网微服务架构,两种接口都需要做接口测试的,不管是业务测试还是回归测试;Dubbo:Java栈的互联网公司比如阿里、美团、58、滴滴、京东等等都是差不多的服务端架构,所以这些公司,两类接口测试也是必不可少的工作部分;Dubbo是一
Stella981 Stella981
3年前
Go 版本入 Dubbo 生态一周年:已和 Spring Cloud、gRPC 互通
本文作者:o\\\\0去年5月,阿里开源的高性能RPC框架Dubbo从ASF毕业并晋升顶级项目,同时,还宣布Go语言版本的Dubbogo正式加入Dubbo官方生态。经过一年的发展,Dubbogo在技术和社区运营方面都已经有了不错的成绩。Dubbogo是Dubbo的完整Go语言实现,在功能实现和技术路
Stella981 Stella981
3年前
Dubbo Filter机制概述
微信公众号:\中间件兴趣圈\作者简介:《RocketMQ技术内幕》作者从上文可知,在服务的调用或消费端发送请求命令中,Dubbo引入过滤器链机制来实现功能的包装(或扩展)。Dubbo很多功能,例如泛化调用、并发控制等都是基于Filter机制实现的,系统默认的Filter在/dubborpcapi/src/main/resou
Stella981 Stella981
3年前
Dubbo Demo调试
下载dubbo的示例demogitclonehttps://github.com/apache/incubatordubbo.gitcdincubatordubbo运行dubbodemoprovider中的com.alibaba.dubbo.demo.provider.Provid
Stella981 Stella981
3年前
GoFramework框架简介(四)dubbo篇
框架中dubbo配置说明:Provider端配置如下:<dubbo:protocolname"dubbo"host"${dubbo.host}"port"${dubbo.port}"/<!服务提供者filter,在Provider上尽量多配置Consumer端属性,配置的覆盖规则:1)
Stella981 Stella981
3年前
Dubbo错误No provider available for the service
  最近要开发dubbo服务,因为以前没用过,其实dubbo服务很简单,网上有很多例子,还有官方文档http://dubbo.io/Homezh.htm,由于新手上路难免遇到各种各样的问题,我就遇到一个问题让我很是费解,百度也没有多少可用的博客,浪费了1天时间,最后找我们技术总监才搞定,这个错误如下:com.alibaba.dubbo.rpc.Rp
Stella981 Stella981
3年前
Dubbo注册到发布执行流程(原理)
前言:本文章为个人笔记,参考Dubbo官方文档(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fdubbo.apache.org%2F),加上自己的理解,所总结的Dubbo注册到发布的执行流程(也可以说Dubbo原理),中间涉及到的技术,如果感兴趣,请自行搜索;
Stella981 Stella981
3年前
Dubbo(五) Dubbo入门demo——helloworld
前言前面我已经介绍了dubbo的一些基本工具和知识,让大家简单的了解了下RPC框架和Dubbo。接下来就是重点了,Dubbo的helloworld项目。!(http://www.droptb.com/rec/article/images/201801/png/20180107135041_472.png)一、搭建项目首先我们新