Dubbo + Zipkin + Brave实现全链路追踪

Stella981
• 阅读 809

Dubbo + Zipkin + Brave实现全链路追踪

最近写了一个链路追踪Demo分享下,实现了链路追踪过程中数据的记录,还有能扩展的地方,后期再继续补充。

原理参考上面文章 《Dubbo链路追踪——生成全局ID(traceId)》

源码地址

实现链路追踪的目的

  • 服务调用的流程信息,定位服务调用链
  • 记录调用入参及返回值信息,方便问题重现
  • 记录调用时间线,代码重构及调优处理
  • 调用信息统计

分布式跟踪系统还有其他比较成熟的实现,例如:Naver的Pinpoint、Apache的HTrace、阿里的鹰眼Tracing、京东的Hydra、新浪的Watchman,美团点评的CAT,skywalking等。 本次主要利用Dubbo数据传播特性扩展Filter接口来实现链路追踪的目的

重点主要是zipkin及brave使用及特性,当前brave版本为 5.2.0 为 2018年8月份发布的release版本 , zipkin版本为2.2.1 所需JDK为1.8

快速启动zipkin

下载最新的zipkin并启动

wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'
java -jar zipkin.jar

输入 http://localhost:9411/zipkin/ 进入WebUI界面如下 Dubbo + Zipkin + Brave实现全链路追踪


核心源码

代码的初步版本:方便描述

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.propagation.*;
import brave.sampler.Sampler;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.json.JSON;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.rpc.*;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import com.alibaba.dubbo.rpc.support.RpcUtils;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Sender;
import zipkin2.reporter.okhttp3.OkHttpSender;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * Created with IntelliJ IDEA.
 *
 * @author: bakerZhu
 * @description:
 * @modifytime:
 */
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class TracingFilter  implements Filter {

    private static final Logger log = LoggerFactory.getLogger(TracingFilter.class);

    private static Tracing tracing;
    private static Tracer tracer;
    private static TraceContext.Extractor<Map<String, String>> extractor;
    private static TraceContext.Injector<Map<String, String>> injector;

    static final Propagation.Getter<Map<String, String>, String> GETTER =
            new Propagation.Getter<Map<String, String>, String>() {
                @Override
                public String get(Map<String, String> carrier, String key) {
                    return carrier.get(key);
                }

                @Override
                public String toString() {
                    return "Map::get";
                }
            };

    static final Propagation.Setter<Map<String, String>, String> SETTER =
            new Propagation.Setter<Map<String, String>, String>() {
                @Override
                public void put(Map<String, String> carrier, String key, String value) {
                    carrier.put(key, value);
                }

                @Override
                public String toString() {
                    return "Map::set";
                }
            };

    static {
        // 1
        Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
        // 2
        AsyncReporter asyncReporter = AsyncReporter.builder(sender)
                .closeTimeout(500, TimeUnit.MILLISECONDS)
                .build(SpanBytesEncoder.JSON_V2);
        // 3
        tracing = Tracing.newBuilder()
                .localServiceName("tracer-client")
                .spanReporter(asyncReporter)
                .sampler(Sampler.ALWAYS_SAMPLE)
                .propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))
                .build();
        tracer = tracing.tracer();
        // 4
        // 4.1
        extractor = tracing.propagation().extractor(GETTER);
        // 4.2
        injector = tracing.propagation().injector(SETTER);
    }



    public TracingFilter() {
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {


        RpcContext rpcContext = RpcContext.getContext();
        // 5
        Span.Kind kind = rpcContext.isProviderSide() ? Span.Kind.SERVER : Span.Kind.CLIENT;
        final Span span;
        if (kind.equals(Span.Kind.CLIENT)) {
            //6
            span = tracer.nextSpan();
            //7
            injector.inject(span.context(), invocation.getAttachments());
        } else {
            //8
            TraceContextOrSamplingFlags extracted = extractor.extract(invocation.getAttachments());
            //9
            span = extracted.context() != null ? tracer.joinSpan(extracted.context()) : tracer.nextSpan(extracted);
        }

        if (!span.isNoop()) {
            span.kind(kind).start();
            //10
            String service = invoker.getInterface().getSimpleName();
            String method = RpcUtils.getMethodName(invocation);
            span.kind(kind);
            span.name(service + "/" + method);
            InetSocketAddress remoteAddress = rpcContext.getRemoteAddress();
            span.remoteIpAndPort(
                    remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName(),remoteAddress.getPort());
        }

        boolean isOneway = false, deferFinish = false;
        try (Tracer.SpanInScope scope = tracer.withSpanInScope(span)){
            //11
            collectArguments(invocation, span, kind);
            Result result = invoker.invoke(invocation);

            if (result.hasException()) {
                onError(result.getException(), span);
            }
            // 12
            isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
            // 13
            Future<Object> future = rpcContext.getFuture();

            if (future instanceof FutureAdapter) {
                deferFinish = true;
                ((FutureAdapter) future).getFuture().setCallback(new FinishSpanCallback(span));// 14
            }
            return result;
        } catch (Error | RuntimeException e) {
            onError(e, span);
            throw e;
        } finally {
            if (isOneway) { // 15
                span.flush();
            } else if (!deferFinish) { // 16
                span.finish();
            }
        }
    }

    static void onError(Throwable error, Span span) {
        span.error(error);
        if (error instanceof RpcException) {
            span.tag("dubbo.error_msg", RpcExceptionEnum.getMsgByCode(((RpcException) error).getCode()));
        }
    }

    static void collectArguments(Invocation invocation, Span span, Span.Kind kind) {
        if (kind == Span.Kind.CLIENT) {
            StringBuilder fqcn = new StringBuilder();
            Object[] args = invocation.getArguments();
            if (args != null && args.length > 0) {
                try {
                    fqcn.append(JSON.json(args));
                } catch (IOException e) {
                    log.warn(e.getMessage(), e);
                }
            }
            span.tag("args", fqcn.toString());
        }
    }



    static final class FinishSpanCallback implements ResponseCallback {
        final Span span;

        FinishSpanCallback(Span span) {
            this.span = span;
        }

        @Override
        public void done(Object response) {
            span.finish();
        }

        @Override
        public void caught(Throwable exception) {
            onError(exception, span);
            span.finish();
        }
    }
    // 17
    private enum RpcExceptionEnum {
        UNKNOWN_EXCEPTION(0, "unknown exception"),
        NETWORK_EXCEPTION(1, "network exception"),
        TIMEOUT_EXCEPTION(2, "timeout exception"),
        BIZ_EXCEPTION(3, "biz exception"),
        FORBIDDEN_EXCEPTION(4, "forbidden exception"),
        SERIALIZATION_EXCEPTION(5, "serialization exception"),;

        private int code;

        private String msg;

        RpcExceptionEnum(int code, String msg) {
            this.code = code;
            this.msg = msg;
        }

        public static String getMsgByCode(int code) {
            for (RpcExceptionEnum error : RpcExceptionEnum.values()) {
                if (code == error.code) {
                    return error.msg;
                }
            }
            return null;
        }
    }
}
  1. 构建客户端发送工具
  2. 构建异步reporter
  3. 构建tracing上下文
  4. 初始化injector 和 Extractor [tab]4.1 extractor 指数据提取对象,用于在carrier中提取TraceContext相关信息或者采样标记信息到TraceContextOrSamplingFlags 中 -4.2 injector 用于将TraceContext中的各种数据注入到carrier中,其中carrier一半是指数据传输中的载体,类似于Dubbo中Invocation中的attachment(附件集合)
  5. 判断此次调用是作为服务端还是客户端
  6. rpc客户端调用会从ThreadLocal中获取parent的 TraceContext ,为新生成的Span指定traceId及 parentId如果没有parent traceContext 则生成的Span为 root span
  7. 将Span绑定的TraceContext中 属性信息 Copy 到 Invocation中达到远程参数传递的作用
  8. rpc服务提供端 , 从invocation中提取TraceContext相关信息及采样数据信息
  9. 生成span , 兼容初次服务端调用
  10. 记录接口信息及远程IP Port
  11. 将创建的Span 作为当前Span (可以通过Tracer.currentSpan 访问到它) 并设置查询范围
  12. oneway调用即只请求不接受结果
  13. 如果future不为空则为 async 调用 在回调中finish span
  14. 设置异步回调,回调代码执行span finish() .
  15. oneway调用 因为不需等待返回值 即没有 cr (Client Receive) 需手动flush()
  16. 同步调用 业务代码执行完毕后需手动finish()
  17. 设置枚举类 与 Dubbo中RpcException保持对应

测试项

  • Dubbo sync async oneway 测试
  • RPC异常测试
  • 普通业务异常测试
  • 并发测试

配置方式

POM依赖添加

<dependency>
    <groupId>com.github.baker</groupId>
    <artifactId>Tracing</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

资源目录根路径下添加tracing.properties文件 Dubbo + Zipkin + Brave实现全链路追踪 一次调用信息 Dubbo + Zipkin + Brave实现全链路追踪 调用链 Dubbo + Zipkin + Brave实现全链路追踪 调用成功失败汇总 Dubbo + Zipkin + Brave实现全链路追踪 zipkinHost 指定zipkin服务器IP:PORT 默认为localhost:9411 serviceName 指定应用名称 默认为trace-default

调用链: Dubbo + Zipkin + Brave实现全链路追踪

待扩展项

  • 抽象数据传输(扩展Kafka数据传输)
  • 调用返回值数据打印
  • 更灵活的配置方式

源码地址

赞赏支持

Dubbo + Zipkin + Brave实现全链路追踪

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Java修道之路,问鼎巅峰,我辈代码修仙法力齐天
<center<fontcolor00FF7Fsize5face"黑体"代码尽头谁为峰,一见秃头道成空。</font<center<fontcolor00FF00size5face"黑体"编程修真路破折,一步一劫渡飞升。</font众所周知,编程修真有八大境界:1.Javase练气筑基2.数据库结丹3.web前端元婴4.Jav
Stella981 Stella981
2年前
Dubbo日志链路追踪TraceId选型
!mark(https://oscimg.oschina.net/oscnet/updd1ad9729fb807ee6dc473bdc283b1a4481.png)一、目的开发排查系统问题用得最多的手段就是查看系统日志,但是在分布式环境下使用日志定位问题还是比较麻烦,需要借助全链路追踪ID把上下文串联起来,本文主要分享基于
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
2年前
Dubbo链路追踪——生成全局ID(traceId)
全局traceId关于链路追踪,在微服务的趋势下,一次调用的日志信息分布在不同的机器上或目录下,当需要看一条链路调用所有的日志信息时,这是个比较困难的地方,我们虽然有ELK,Sentry等日志异常收集分析工具,但是如何把信息串起来也是一个关键的问题。我们一般的做法是在系统调用开始时生成一个traceId,并且它伴随着一
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
分布式系统中的分布式链路追踪与分布式调用链路
在分布式系统中,由于服务间的调用关系复杂,需要实现分布式链路追踪来跟踪请求在各个服务中的调用路径和时间消耗。这对问题排查和性能监控都很重要。常用的分布式链路追踪实现有基于日志的和基于分布式追踪系统的两种方式: