聊聊promtail的Client

字节拾贝
• 阅读 2672

本文主要研究一下promtail的Client

Client

loki/pkg/promtail/client/client.go

// Client pushes entries to Loki and can be stopped
type Client interface {
    api.EntryHandler
    // Stop goroutine sending batch of entries.
    Stop()
}
Client接口内嵌了api.EntryHandler接口,定义了Stop方法

EntryHandler

loki/pkg/promtail/api/types.go

// EntryHandler is something that can "handle" entries.
type EntryHandler interface {
    Handle(labels model.LabelSet, time time.Time, entry string) error
}
EntryHandler接口定义了Handle方法

client

loki/pkg/promtail/client/client.go

// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
    logger  log.Logger
    cfg     Config
    client  *http.Client
    quit    chan struct{}
    once    sync.Once
    entries chan entry
    wg      sync.WaitGroup

    externalLabels model.LabelSet
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
    if len(c.externalLabels) > 0 {
        ls = c.externalLabels.Merge(ls)
    }

    // Get the tenant  ID in case it has been overridden while processing
    // the pipeline stages, then remove the special label
    tenantID := c.getTenantID(ls)
    if _, ok := ls[ReservedLabelTenantID]; ok {
        // Clone the label set to not manipulate the input one
        ls = ls.Clone()
        delete(ls, ReservedLabelTenantID)
    }

    c.entries <- entry{tenantID, ls, logproto.Entry{
        Timestamp: t,
        Line:      s,
    }}
    return nil
}

// Stop the client.
func (c *client) Stop() {
    c.once.Do(func() { close(c.quit) })
    c.wg.Wait()
}
client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop方法;Handle方法判断LabelSet是否包含ReservedLabelTenantID,如果包含则会执行ls.Clone()及然后移除,之后构造entry发送到c.entries这个channel;Stop方法执行close(c.quit)

run

loki/pkg/promtail/client/client.go

func (c *client) run() {
    batches := map[string]*batch{}

    // Given the client handles multiple batches (1 per tenant) and each batch
    // can be created at a different point in time, we look for batches whose
    // max wait time has been reached every 10 times per BatchWait, so that the
    // maximum delay we have sending batches is 10% of the max waiting time.
    // We apply a cap of 10ms to the ticker, to avoid too frequent checks in
    // case the BatchWait is very low.
    minWaitCheckFrequency := 10 * time.Millisecond
    maxWaitCheckFrequency := c.cfg.BatchWait / 10
    if maxWaitCheckFrequency < minWaitCheckFrequency {
        maxWaitCheckFrequency = minWaitCheckFrequency
    }

    maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)

    defer func() {
        // Send all pending batches
        for tenantID, batch := range batches {
            c.sendBatch(tenantID, batch)
        }

        c.wg.Done()
    }()

    for {
        select {
        case <-c.quit:
            return

        case e := <-c.entries:
            batch, ok := batches[e.tenantID]

            // If the batch doesn't exist yet, we create a new one with the entry
            if !ok {
                batches[e.tenantID] = newBatch(e)
                break
            }

            // If adding the entry to the batch will increase the size over the max
            // size allowed, we do send the current batch and then create a new one
            if batch.sizeBytesAfter(e) > c.cfg.BatchSize {
                c.sendBatch(e.tenantID, batch)

                batches[e.tenantID] = newBatch(e)
                break
            }

            // The max size of the batch isn't reached, so we can add the entry
            batch.add(e)

        case <-maxWaitCheck.C:
            // Send all batches whose max wait time has been reached
            for tenantID, batch := range batches {
                if batch.age() < c.cfg.BatchWait {
                    continue
                }

                c.sendBatch(tenantID, batch)
                delete(batches, tenantID)
            }
        }
    }
}
client的run方法创建time.NewTicker(maxWaitCheckFrequency),然后for循环,如果是c.entries读取到了数据就执行batch.add(e),如果是maxWaitCheck触发了则遍历batches,执行c.sendBatch(tenantID, batch)及delete;最后quit的时候,还有defer方法遍历batches执行c.sendBatch(tenantID, batch)

sendBatch

loki/pkg/promtail/client/client.go

func (c *client) sendBatch(tenantID string, batch *batch) {
    buf, entriesCount, err := batch.encode()
    if err != nil {
        level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
        return
    }
    bufBytes := float64(len(buf))
    encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

    ctx := context.Background()
    backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
    var status int
    for backoff.Ongoing() {
        start := time.Now()
        status, err = c.send(ctx, tenantID, buf)
        requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

        if err == nil {
            sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
            sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
            for _, s := range batch.streams {
                lbls, err := parser.ParseMetric(s.Labels)
                if err != nil {
                    // is this possible?
                    level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
                    return
                }
                var lblSet model.LabelSet
                for i := range lbls {
                    if lbls[i].Name == LatencyLabel {
                        lblSet = model.LabelSet{
                            model.LabelName(HostLabel):    model.LabelValue(c.cfg.URL.Host),
                            model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),
                        }
                    }
                }
                if lblSet != nil {
                    streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
                }
            }
            return
        }

        // Only retry 429s, 500s and connection-level errors.
        if status > 0 && status != 429 && status/100 != 5 {
            break
        }

        level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
        batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
        backoff.Wait()
    }

    if err != nil {
        level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
        droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
        droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
    }
}
sendBatch方法先通过batch.encode()编码为buf,然后通过c.send(ctx, tenantID, buf)进行发送

send

loki/pkg/promtail/client/client.go

func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {
    ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
    defer cancel()
    req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
    if err != nil {
        return -1, err
    }
    req = req.WithContext(ctx)
    req.Header.Set("Content-Type", contentType)
    req.Header.Set("User-Agent", UserAgent)

    // If the tenant ID is not empty promtail is running in multi-tenant mode, so
    // we should send it to Loki
    if tenantID != "" {
        req.Header.Set("X-Scope-OrgID", tenantID)
    }

    resp, err := c.client.Do(req)
    if err != nil {
        return -1, err
    }
    defer helpers.LogError("closing response body", resp.Body.Close)

    if resp.StatusCode/100 != 2 {
        scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
        line := ""
        if scanner.Scan() {
            line = scanner.Text()
        }
        err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
    }
    return resp.StatusCode, err
}
send方法执行一个POST的http请求发送到c.cfg.URL.String()

小结

promtail的client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop方法;Handle方法构造entry发送到c.entries这个channel;Stop方法执行close(c.quit);然后它还有一个run方法将entry添加到batch,然后将batch通过http的POST请求发送到指定的地址。

doc

点赞
收藏
评论区
推荐文章
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Easter79 Easter79
4年前
springcloud eureka.instance
1.在springcloud中服务的 InstanceID默认值是:${spring.cloud.client.hostname}:${spring.application.name}:${spring.application.instance\_id:${server.port}},也就是:主机名:应用名:应用端口。如图1
科工人 科工人
4年前
聊聊golang的DDD项目结构
序本文主要研究一下golang的DDD项目结构interfacesfoodappserver/interfacesinterfacesgit:(master)tree.|____fileupload||____fileformat.go||____fileupload.go|____food_handler.go|__
Stella981 Stella981
4年前
Python+Selenium自动化篇
本篇文字主要学习selenium定位页面元素的集中方法,以百度首页为例子。0.元素定位方法主要有:id定位:find\_element\_by\_id('')name定位:find\_element\_by\_name('')class定位:find\_element\_by\_class\_name(''
Wesley13 Wesley13
4年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
4年前
PostgreSQL死锁进程及慢查询处理
1、死锁进程查看:SELECTFROMpg_stat_activityWHEREdatname'数据库名称'andwaitingtrue;pid进程id。2、慢查询SQL:selectdatname,pid,usename,application_name,client_addr,client
Wesley13 Wesley13
4年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Stella981 Stella981
4年前
RedisTemplate读取slowlog
序本文主要研究一下如何使用RedisTemplate(lettuce类库)读取slowlogmaven<dependency<groupIdorg.springframework.boot</groupId<artifactIdspringbootstarterdata
Wesley13 Wesley13
4年前
Netty4.0学习笔记系列之一:Server与Client的通讯
本文是学习Netty的第一篇文章,主要对Netty的Server和Client间的通讯机制进行验证。Server与Client建立连接后,会执行以下的步骤:1、Client向Server发送消息:Areyouok?2、Server接收客户端发送的消息,并打印出来。3、Server端向客户端发送消息:Iamok!4、Client接收