聊聊dubbo的CacheFilter

哈希流星
• 阅读 2313

本文主要研究一下dubbo的CacheFilter

CacheFilter

dubbo-2.7.2/dubbo-filter/dubbo-filter-cache/src/main/java/org/apache/dubbo/cache/filter/CacheFilter.java

@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)
public class CacheFilter implements Filter {

    private CacheFactory cacheFactory;

    /**
     * Dubbo will populate and set the cache factory instance based on service/method/consumer/provider configured
     * cache attribute value. Dubbo will search for the class name implementing configured <b>cache</b> in file org.apache.dubbo.cache.CacheFactory
     * under META-INF sub folders.
     *
     * @param cacheFactory instance of CacheFactory based on <b>cache</b> type
     */
    public void setCacheFactory(CacheFactory cacheFactory) {
        this.cacheFactory = cacheFactory;
    }

    /**
     * If cache is configured, dubbo will invoke method on each method call. If cache value is returned by cache store
     * then it will return otherwise call the remote method and return value. If remote method's return valeu has error
     * then it will not cache the value.
     * @param invoker    service
     * @param invocation invocation.
     * @return Cache returned value if found by the underlying cache store. If cache miss it will call target method.
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), CACHE_KEY))) {
            Cache cache = cacheFactory.getCache(invoker.getUrl(), invocation);
            if (cache != null) {
                String key = StringUtils.toArgumentString(invocation.getArguments());
                Object value = cache.get(key);
                if (value != null) {
                    if (value instanceof ValueWrapper) {
                        return AsyncRpcResult.newDefaultAsyncResult(((ValueWrapper) value).get(), invocation);
                    } else {
                        return AsyncRpcResult.newDefaultAsyncResult(value, invocation);
                    }
                }
                Result result = invoker.invoke(invocation);
                if (!result.hasException()) {
                    cache.put(key, new ValueWrapper(result.getValue()));
                }
                return result;
            }
        }
        return invoker.invoke(invocation);
    }

    /**
     * Cache value wrapper.
     */
    static class ValueWrapper implements Serializable {

        private static final long serialVersionUID = -1777337318019193256L;

        private final Object value;

        public ValueWrapper (Object value) {
            this.value = value;
        }

        public Object get() {
            return this.value;
        }
    }
}
  • CacheFilter实现了Filter接口,它的invoke方法会先判断cacheFactory是否不为null且invoker的URL中包含cache参数,如果该条件成立则从cacheFactory获取cache,然后在从cache中获取value,如果value不为null则返回AsyncRpcResult.newDefaultAsyncResult,如果value为null则执行invoke成功之后将该value缓存到cache中

实例

dubbo-2.7.2/dubbo-filter/dubbo-filter-cache/src/test/java/org/apache/dubbo/cache/filter/CacheFilterTest.java

public class CacheFilterTest {
    private RpcInvocation invocation;
    private CacheFilter cacheFilter = new CacheFilter();
    private Invoker<?> invoker = mock(Invoker.class);
    private Invoker<?> invoker1 = mock(Invoker.class);
    private Invoker<?> invoker2 = mock(Invoker.class);
    private Invoker<?> invoker3 = mock(Invoker.class);
    private Invoker<?> invoker4 = mock(Invoker.class);

    static Stream<Arguments> cacheFactories() {
        return Stream.of(
                Arguments.of("lru", new LruCacheFactory()),
                Arguments.of("jcache", new JCacheFactory()),
                Arguments.of("threadlocal", new ThreadLocalCacheFactory()),
                Arguments.of("expiring", new ExpiringCacheFactory())
        );
    }

    public void setUp(String cacheType, CacheFactory cacheFactory) {
        invocation = new RpcInvocation();
        cacheFilter.setCacheFactory(cacheFactory);

        URL url = URL.valueOf("test://test:11/test?cache=" + cacheType);

        given(invoker.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult("value", invocation));
        given(invoker.getUrl()).willReturn(url);

        given(invoker1.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult("value1", invocation));
        given(invoker1.getUrl()).willReturn(url);

        given(invoker2.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult("value2", invocation));
        given(invoker2.getUrl()).willReturn(url);

        given(invoker3.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(new RuntimeException(), invocation));
        given(invoker3.getUrl()).willReturn(url);

        given(invoker4.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(invocation));
        given(invoker4.getUrl()).willReturn(url);
    }

    @ParameterizedTest
    @MethodSource("cacheFactories")
    public void testNonArgsMethod(String cacheType, CacheFactory cacheFactory) {
        setUp(cacheType, cacheFactory);
        invocation.setMethodName("echo");
        invocation.setParameterTypes(new Class<?>[]{});
        invocation.setArguments(new Object[]{});

        cacheFilter.invoke(invoker, invocation);
        Result rpcResult1 = cacheFilter.invoke(invoker1, invocation);
        Result rpcResult2 = cacheFilter.invoke(invoker2, invocation);
        Assertions.assertEquals(rpcResult1.getValue(), rpcResult2.getValue());
        Assertions.assertEquals(rpcResult1.getValue(), "value");
    }

    @ParameterizedTest
    @MethodSource("cacheFactories")
    public void testMethodWithArgs(String cacheType, CacheFactory cacheFactory) {
        setUp(cacheType, cacheFactory);
        invocation.setMethodName("echo1");
        invocation.setParameterTypes(new Class<?>[]{String.class});
        invocation.setArguments(new Object[]{"arg1"});

        cacheFilter.invoke(invoker, invocation);
        Result rpcResult1 = cacheFilter.invoke(invoker1, invocation);
        Result rpcResult2 = cacheFilter.invoke(invoker2, invocation);
        Assertions.assertEquals(rpcResult1.getValue(), rpcResult2.getValue());
        Assertions.assertEquals(rpcResult1.getValue(), "value");
    }

    @ParameterizedTest
    @MethodSource("cacheFactories")
    public void testException(String cacheType, CacheFactory cacheFactory) {
        setUp(cacheType, cacheFactory);
        invocation.setMethodName("echo1");
        invocation.setParameterTypes(new Class<?>[]{String.class});
        invocation.setArguments(new Object[]{"arg2"});

        cacheFilter.invoke(invoker3, invocation);
        Result rpcResult = cacheFilter.invoke(invoker2, invocation);
        Assertions.assertEquals(rpcResult.getValue(), "value2");
    }

    @ParameterizedTest
    @MethodSource("cacheFactories")
    public void testNull(String cacheType, CacheFactory cacheFactory) {
        setUp(cacheType, cacheFactory);
        invocation.setMethodName("echo1");
        invocation.setParameterTypes(new Class<?>[]{String.class});
        invocation.setArguments(new Object[]{"arg3"});

        cacheFilter.invoke(invoker4, invocation);
        Result result1 = cacheFilter.invoke(invoker1, invocation);
        Result result2 = cacheFilter.invoke(invoker2, invocation);
        Assertions.assertEquals(result1.getValue(), null);
        Assertions.assertEquals(result2.getValue(), null);
    }
}
  • 这里分别验证了无参方法、带参数方法、抛异常的方法、value为null的场景

小结

CacheFilter实现了Filter接口,它的invoke方法会先判断cacheFactory是否不为null且invoker的URL中包含cache参数,如果该条件成立则从cacheFactory获取cache,然后在从cache中获取value,如果value不为null则返回AsyncRpcResult.newDefaultAsyncResult,如果value为null则执行invoke成功之后将该value缓存到cache中

doc

点赞
收藏
评论区
推荐文章
捉虫大师 捉虫大师
4年前
dubbo 2.7应用级服务发现踩坑小记
本文已收录https://github.com/lkxiaolou/lkxiaolou欢迎star。背景本文记录最近一位读者反馈的dubbo2.7.x中应用级服务发现的问题,关于dubbo应用级服务发现的相关介绍可以参考之前的文章,这里不再赘述。读者反馈他们在基于dubbo2.7应用级服务发现开发dubbo网关,根据文章《dubbo应用级服务发现初
Easter79 Easter79
4年前
springcloud知识点总结
一.SpringCloud面试题口述1.SpringCloud和DubboSpringCloud和Dubbo都是现在主流的微服务架构SpringCloud是Apache旗下的Spring体系下的微服务解决方案Dubbo是阿里系的分布式服务治理框架从技术维度上,其实SpringCloud远远的超过Dubbo,Dubbo本身只是实现
Wesley13 Wesley13
4年前
eclipse中不能找到dubbo.xsd解决方法
使用dubbo时遇到问题:org.xml.sax.SAXParseException: schema\_reference.4: Failed to read schema document 'http://code.alibabatech.com/schema/dubbo/dubbo.xsd', because 1) could not f
Stella981 Stella981
4年前
Dubbo 云原生之路
【Dubbo云原生之路】系列开篇作者:刘军花名陆龟,Github账号Chickenlj,ApacheDubboPMC,项目核心开发,见证了Dubbo重启开源,到从Apache基金会毕业的整个过程。现任职阿里云云原生应用平台团队,参与服务框架、微服务相关工作,目前主要
Stella981 Stella981
4年前
Dubbo 云原生之路:ASF 毕业一周年、3.0 可期
!1.png(https://ucc.alicdn.com/pic/developerecology/86f96cb46a5a4f2ab1e931a9ebe89e79.png)作者|刘军\\导读:\\今年是Dubbo从Apache基金会毕业的一周年,同时也是推进Dubbo3.0,即全面拥抱云原生的重要一年。Dubbo社
Stella981 Stella981
4年前
Dubbo Filter机制概述
微信公众号:\中间件兴趣圈\作者简介:《RocketMQ技术内幕》作者从上文可知,在服务的调用或消费端发送请求命令中,Dubbo引入过滤器链机制来实现功能的包装(或扩展)。Dubbo很多功能,例如泛化调用、并发控制等都是基于Filter机制实现的,系统默认的Filter在/dubborpcapi/src/main/resou
Stella981 Stella981
4年前
Dubbo协议及序列化
Dubbo是Alibaba开源的分布式服务框架远程调用框架,在网络间传输数据,就需要通信协议和序列化。一通信协议Dubbo支持dubbo、rmi、hessian、http、webservice、thrift、redis等多种协议,但是Dubbo官网是推荐我们使用Dubbo协议的,默认也是用的dubbo协议。先介绍几种常见的协议:1\.
Stella981 Stella981
4年前
Dubbo Demo调试
下载dubbo的示例demogitclonehttps://github.com/apache/incubatordubbo.gitcdincubatordubbo运行dubbodemoprovider中的com.alibaba.dubbo.demo.provider.Provid
Stella981 Stella981
4年前
GoFramework框架简介(四)dubbo篇
框架中dubbo配置说明:Provider端配置如下:<dubbo:protocolname"dubbo"host"${dubbo.host}"port"${dubbo.port}"/<!服务提供者filter,在Provider上尽量多配置Consumer端属性,配置的覆盖规则:1)
Stella981 Stella981
4年前
Dubbo注册到发布执行流程(原理)
前言:本文章为个人笔记,参考Dubbo官方文档(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fdubbo.apache.org%2F),加上自己的理解,所总结的Dubbo注册到发布的执行流程(也可以说Dubbo原理),中间涉及到的技术,如果感兴趣,请自行搜索;
Stella981 Stella981
4年前
Dubbo之服务暴露
!(https://oscimg.oschina.net/oscnet/up4596697d7918a914b39348df311c6366353.png)前言本文Dubbo使用版本2.7.5Dubbo通过使用dubbo:service配置或@service在解析完配置后进行服务暴露,供服务消费者消费。Dubbo的