go 全链路追踪之 jaeger

异步蝉翼
• 阅读 2219

微服务中进行链路追踪,包括两部分: 进程内追踪, 跨进程追踪

docker下进安装jaeger

docker run -d -p 9411:9411 openzipkin/zipkin

docker run -d -p 5775:5775/udp -p 16686:16686 -p 14250:14250 -p 14268:14268 jaegertracing/all-in-one:latest

14250 gRPC jaeger-agent通过该端口将收集的 span以 model.proto 格式发送到 collector
14268 HTTP 客户端可以通过该端口直接将 span发送到 collector。
16686 HTTP 默认url localhost:16686

访问地址:
http://127.0.0.1:16686/search
http://127.0.0.1:9411

同一个进程中进行追踪

package main

import (
    "context"
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/uber/jaeger-client-go"
    "github.com/uber/jaeger-client-go/config"
    "io"
    "time"
)

func main() {
    tracer, closer := initJaeger("jaeger-demo") //初始化
    defer closer.Close()
    opentracing.SetGlobalTracer(tracer) //StartspanFromContext创建新span时会用到

    span := tracer.StartSpan("span_root") //开始监控

    //ContextWithSpan来创建一个新的ctx,将span的信息与context关联,传到foo3中时,需要创建一个子span,父span是ctx中的span。
    //用StartSpanFromContext来模拟从context中启动一个子span,但是StartSpanFromContext或者SpanFromContext只能在同一个服务内使用,
    //grpc中client的context和server的context并不是同一个context,无法使用这两个函数。
    ctx := opentracing.ContextWithSpan(context.Background(), span)
    r1 := foo3("Hello foo3", ctx)
    r2 := foo4("Hello foo4", ctx)
    fmt.Println(r1, r2)
    span.Finish() //结束监控
}

func foo3(req string, ctx context.Context) (reply string) {
    //1.创建子span
    span, _ := opentracing.StartSpanFromContext(ctx, "span_foo3")
    defer func() {
        //4.接口调用完,在tag中设置request和reply
        span.SetTag("request", req)
        span.SetTag("reply", reply)
        span.Finish()
    }()

    println(req)
    //2.模拟处理耗时
    time.Sleep(time.Second / 2)
    //3.返回reply
    reply = "foo3Reply"
    return
}

//跟foo3一样逻辑
func foo4(req string, ctx context.Context) (reply string) {
    span, _ := opentracing.StartSpanFromContext(ctx, "span_foo4")
    defer func() {
        span.SetTag("request", req)
        span.SetTag("reply", reply)
        span.Finish()
    }()

    println(req)
    time.Sleep(time.Second / 2)
    reply = "foo4Reply"
    return
}

//初始化jaeger tracer的initJaeger方法
func initJaeger(serviceName string) (opentracing.Tracer, io.Closer) {
    cfg := &config.Configuration{
        Sampler: &config.SamplerConfig{
            Type:  "const",
            Param: 1, //设置采样率 1
        },
        // reporter中配置jaeger Agent的ip与端口,以便将tracer的信息发布到agent中。
        // LocalAgentHostPort参数为127.0.0.1:6381,6381接口是接受压缩格式的thrift协议数据。
        Reporter: &config.ReporterConfig{
            LogSpans:           true,
            //LocalAgentHostPort: "127.0.0.1:6831",
CollectorEndpoint: "http://127.0.0.1:14268/api/traces",
        },
        ServiceName: serviceName,
    }
    tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
    if err != nil {
        panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
    }
    return tracer, closer
}

跨进程追踪:
Client:

package main

import (
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/uber/jaeger-client-go"
    "github.com/uber/jaeger-client-go/config"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/grpclog"
    "google.golang.org/grpc/metadata"
    "io"
    pb "micro/proto/hello"
    "time"
)

const (
    // Address gRPC服务地址
    Address = "127.0.0.1:50052"
)

//grpc提供了拦截器,我们可以在dial函数里设置拦截器,这样每次请求都会经过拦截器,我们不需要在每个接口中去编写重复的代码。
func main() {
    //init jaeger
    jaegerAgentHost := "127.0.0.1:6831"
    tracer, closer, err := initJaeger("client", jaegerAgentHost)
    if err != nil {
        fmt.Println(err)
    }
    defer closer.Close()
    //dial
    conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()), clientDialOption(tracer))
    if err != nil {
        fmt.Printf("dial fail, %+v\n\n", err)
    }

    //// 连接
    ////conn, err := grpc.Dial(Address, grpc.WithInsecure())
    //conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
    //if err != nil {
    //    grpclog.Fatalln(err)
    //}
    //defer conn.Close()

    // 初始化客户端
    client := pb.NewHelloClient(conn)
    ctx, cancel := context.WithTimeout(context.TODO(), time.Duration(10)*time.Second)
    defer cancel()
    // 调用方法
    res, err := client.SayHello(ctx, &pb.HelloRequest{Name: "gRPC1212"})

    if err != nil {
        grpclog.Fatalln(err)
    }

    fmt.Println(res.Message)
}

func clientDialOption(tracer opentracing.Tracer) grpc.DialOption {
    return grpc.WithUnaryInterceptor(jaegerGrpcClientInterceptor)
}

type TextMapWriter struct {
    metadata.MD
}

// Set 重写TextMapWriter的Set方法,我们需要将carrier中的数据写入到metadata中,这样grpc才会携带。
func (t TextMapWriter) Set(key, val string) {
    //key = strings.ToLower(key)
    t.MD[key] = append(t.MD[key], val)
}

//span finish之前利用SetTag添加一些额外的信息,例如request和reply,
//以及error信息,但是这些信息是不会在client和server中传递的,我们可以在UI中每个span中显示出他们的tag。
func jaegerGrpcClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
    var parentContext opentracing.SpanContext
    //先从context中获取原始的span
    parentSpan := opentracing.SpanFromContext(ctx)
    if parentSpan != nil {
        parentContext = parentSpan.Context()
    }
    tracer := opentracing.GlobalTracer()
    span := tracer.StartSpan(method, opentracing.ChildOf(parentContext))
    defer span.Finish()
    //从context中获取metadata。md.(type) == map[string][]string
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        //md = metadata.New(nil)
        m := map[string]string{
            "x-request-id":      "X-Request-Id",
            "x-b3-traceid":      "X-B3-Traceid",
            "x-b3-spanid":       "X-B3-Spanid",
            "x-b3-parentspanid": "X-B3-Parentspanid",
            "x-b3-sampled":      "X-B3-Sampled",
            "x-b3-flags":        "X-B3-Flags",
            "x-ot-span-context": "X-Ot-Span-Context",
        }
        md = metadata.New(m)
    } else {
        //如果对metadata进行修改,那么需要用拷贝的副本进行修改。(FromIncomingContext的注释)
        md = md.Copy()
    }
    //定义一个carrier,下面的Inject注入数据需要用到。carrier.(type) == map[string]string
    //carrier := opentracing.TextMapCarrier{}
    carrier := TextMapWriter{md}
    //将span的context信息注入到carrier中
    e := tracer.Inject(span.Context(), opentracing.TextMap, carrier)
    if e != nil {
        fmt.Println("tracer Inject err,", e)
    }
    //创建一个新的context,把metadata附带上
    ctx = metadata.NewOutgoingContext(ctx, md)

    return invoker(ctx, method, req, reply, cc, opts...)
}

func initJaeger(serviceName string, jaegerAgentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) {
    cfg := &config.Configuration{
        Sampler: &config.SamplerConfig{
            Type:  "const",
            Param: 1,
        },
        Reporter: &config.ReporterConfig{
            LogSpans:           true,
            LocalAgentHostPort: jaegerAgentHost,
        },
        ServiceName: serviceName,
    }
    tracer, closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
    opentracing.SetGlobalTracer(tracer)
    return tracer, closer, err
}

Server 端

package main

import (
    "context"
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/uber/jaeger-client-go"
    "github.com/uber/jaeger-client-go/config"
    "google.golang.org/grpc"
    "google.golang.org/grpc/grpclog"
    "google.golang.org/grpc/metadata"
    "io"
    pb "micro/proto/hello" // 引入proto包
    "net"
)

const (
    // Address gRPC服务地址
    Address = "127.0.0.1:50052"
)

// 定义helloService并实现约定的接口
type helloService struct{}

// HelloService Hello服务
var HelloService = helloService{}

// SayHello 实现Hello服务接口
func (h helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
    resp := new(pb.HelloResponse)
    resp.Message = fmt.Sprintf("Hello %s.", in.Name)
    return resp, nil
}

//我们使用Extract函数将carrier从metadata中提取出来,
//这样client端与server端就能建立span信息的关联。我们在server端同样只是修改拦截器,
//在grpc.NewServer时将我们的拦截器传进去。
func main() {
    jaegerAgentHost := "127.0.0.1:6831"
    tracer, closer, err := initJaeger("client", jaegerAgentHost)
    if err != nil {
        fmt.Println(err)
    }
    defer closer.Close()
    var serviceOpts []grpc.ServerOption

    listen, err := net.Listen("tcp", Address)
    if err != nil {
        grpclog.Fatalf("Failed to listen: %v", err)
    }

    if tracer != nil {
        serviceOpts = append(serviceOpts, serverOption(tracer))
    }

    // 实例化grpc Server
    s := grpc.NewServer(serviceOpts...)

    // 注册HelloService
    pb.RegisterHelloServer(s, HelloService)

    grpclog.Println("Listen on " + Address)
    s.Serve(listen)
}

func serverOption(tracer opentracing.Tracer) grpc.ServerOption {
    return grpc.UnaryInterceptor(jaegerGrpcServerInterceptor)
}

type TextMapReader struct {
    metadata.MD
}

// ForeachKey 读取metadata中的span信息
func (t TextMapReader) ForeachKey(handler func(key, val string) error) error { //不能是指针
    for key, val := range t.MD {
        for _, v := range val {
            if err := handler(key, v); err != nil {
                return err
            }
        }
    }
    return nil
}

func jaegerGrpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    //从context中获取metadata。md.(type) == map[string][]string
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        md = metadata.New(nil)
    } else {
        //如果对metadata进行修改,那么需要用拷贝的副本进行修改。(FromIncomingContext的注释)
        md = md.Copy()
    }
    fmt.Println(md)
    carrier := TextMapReader{md}
    tracer := opentracing.GlobalTracer()
    spanContext, e := tracer.Extract(opentracing.TextMap, carrier)
    if e != nil {
        fmt.Println("Extract err:", e)
    }

    span := tracer.StartSpan(info.FullMethod, opentracing.ChildOf(spanContext))
    defer span.Finish()
    ctx = opentracing.ContextWithSpan(ctx, span)

    return handler(ctx, req)
}

func initJaeger(serviceName string, jaegerAgentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) {
    cfg := &config.Configuration{
        Sampler: &config.SamplerConfig{
            Type:  "const",
            Param: 1,
        },
        Reporter: &config.ReporterConfig{
            LogSpans:           true,
            LocalAgentHostPort: jaegerAgentHost,
        },
        ServiceName: serviceName,
    }
    tracer, closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
    opentracing.SetGlobalTracer(tracer)
    return tracer, closer, err
}

由于grpc 调用和服务端都声明了 UnaryInterceptor 和 StreamInterceptor 两回调函数,因此只需要重写这两个函数,在函数中调用 opentracing 的接口进行链路追踪,并初始化客户端或者服务端时候注册进去就可以。可以直接引用第三方包, 然后使用。

github项目地址 grpc-opentracing


go 全链路追踪之 jaeger


参考文章:
https://www.lixueduan.com/pos...

基于jaeger和grpc实现的rpc调用链跟踪模块
golang 链路追踪之 jaeger(http)

https://github.com/grpc-ecosy...

点赞
收藏
评论区
推荐文章
Johnny21 Johnny21
4年前
服务追踪工具 SkyWorking 搭建使用
服务追踪工具SkyWorking搭建使用是用于对微服务,CloudNative,容器等提供应用性能监控和分布式调用链追踪的工具截图环境SkyWalking5.0.0beat2MacOSElasticSearch5.6.
Stella981 Stella981
3年前
Dubbo日志链路追踪TraceId选型
!mark(https://oscimg.oschina.net/oscnet/updd1ad9729fb807ee6dc473bdc283b1a4481.png)一、目的开发排查系统问题用得最多的手段就是查看系统日志,但是在分布式环境下使用日志定位问题还是比较麻烦,需要借助全链路追踪ID把上下文串联起来,本文主要分享基于
Stella981 Stella981
3年前
Opentracing + Uber Jaeger 全链路灰度调用链,Nepxion Discovery
当网关和服务在实施全链路分布式灰度发布和路由时候,我们需要一款追踪系统来监控网关和服务走的是哪个灰度组,哪个灰度版本,哪个灰度区域,甚至监控从HttpHeader头部全程传递的灰度规则和路由策略。这个功能意义在于:不仅可以监控全链路中基本的调用信息,也可以监控额外的灰度信息,有助于我们判断灰度发布和路由是否执行准确,一旦有问题,也可以快速定位
Stella981 Stella981
3年前
2021升级版微服务教程4—Nacos 服务注册和发现
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」!(https://oscimg.oschina.net/oscnet/f2a7c1f4d28b48a9b15611d0a33ad613.png)默认文件1610014380163教程全目录「含视频」:https://gi
可莉 可莉
3年前
2021升级版微服务教程4—Nacos 服务注册和发现
2021升级版SpringCloud教程从入门到实战精通「H版&alibaba&链路追踪&日志&事务&锁」!(https://oscimg.oschina.net/oscnet/f2a7c1f4d28b48a9b15611d0a33ad613.png)默认文件1610014380163教程全目录「含视频」:https://gi
Stella981 Stella981
3年前
Dubbo + Zipkin + Brave实现全链路追踪
DubboZipkinBrave实现全链路追踪最近写了一个链路追踪Demo分享下,实现了链路追踪过程中数据的记录,还有能扩展的地方,后期再继续补充。原理参考上面文章《Dubbo链路追踪——生成全局ID(traceId)》(https://my.oschina.net/Luc
Stella981 Stella981
3年前
Spring Cloud 系列之 Sleuth 链路追踪(一)
随着微服务架构的流行,服务按照不同的维度进行拆分,一次请求往往需要涉及到多个服务。互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发、可能使用不同的编程语言来实现、有可能布在了几千台服务器,横跨多个不同的数据中心。因此,就需要一些可以帮助理解系统行为、用于分析性能问题的工具,以便发生故障的时候,能够快速定位和解决问题。在复杂的微服务架
Stella981 Stella981
3年前
Dubbo链路追踪——生成全局ID(traceId)
全局traceId关于链路追踪,在微服务的趋势下,一次调用的日志信息分布在不同的机器上或目录下,当需要看一条链路调用所有的日志信息时,这是个比较困难的地方,我们虽然有ELK,Sentry等日志异常收集分析工具,但是如何把信息串起来也是一个关键的问题。我们一般的做法是在系统调用开始时生成一个traceId,并且它伴随着一
Wesley13 Wesley13
3年前
Uber jaeger
JaegerUber开源的一个基于Go的分布式追踪系统最近因工作需要在研究traing系统,最后选了jaeger,下面是一些总结,同时摘抄了网上的一些资料,并结合自己实践过程中遇到的一些什么问题,欢迎指正,如你也在使用jaeger,或者想使用jaeger,途中遇到什么困难,可发邮件交流:hong
分布式系统中的分布式链路追踪与分布式调用链路
在分布式系统中,由于服务间的调用关系复杂,需要实现分布式链路追踪来跟踪请求在各个服务中的调用路径和时间消耗。这对问题排查和性能监控都很重要。常用的分布式链路追踪实现有基于日志的和基于分布式追踪系统的两种方式:
技术分享-日志链路追踪
1.背景简述在技术运维过程中,很难从某服务庞杂的日志中,单独找寻出某次API调用的全部日志。为提高排查问题的效率,在多个系统及应用内根据统一的TraceId查找同一次请求链路上的日志,根据日志快速定位问题,同时需对业务代码无侵入,特别是在高频请求下,也可以