k8s与监控--改造telegraf的buffer实现

数字寻梦人
• 阅读 3824

改造telegraf的buffer实现

前言

最近在使用telegraf的场景中,要求数据在程序意外终止的时候不丢失。按照telegraf最初的原始实现,在running_output内部维护了两个buffer,分别是metrics和failMetrics。这两个buffer是基于go中channel实现的。由于没有持久化机制,在意外退出的时候,存在丢失数据的风险。所以这篇文章主要讲述之前telegraf保证数据安全的一些措施和我们对代码的一些优化。

telegraf关于数据安全的处理办法

关于两个buffer,定义在running_output.go的struct中。

// RunningOutput contains the output configuration
type RunningOutput struct {
    Name              string
    Output            telegraf.Output
    Config            *OutputConfig
    MetricBufferLimit int
    MetricBatchSize   int

    MetricsFiltered selfstat.Stat
    MetricsWritten  selfstat.Stat
    BufferSize      selfstat.Stat
    BufferLimit     selfstat.Stat
    WriteTime       selfstat.Stat

    metrics     *buffer.Buffer
    failMetrics *buffer.Buffer

    // Guards against concurrent calls to the Output as described in #3009
    sync.Mutex
}

这个两个buffer的大小提供了配置参数可以设置。

metrics:           buffer.NewBuffer(batchSize),
failMetrics:       buffer.NewBuffer(bufferLimit),

顾名思义。metrics存放要发送到指定output的metric,而failMetrics存放发送失败的metric。当然失败的metrics会在telegraf重发机制下再次发送。

    if ro.metrics.Len() == ro.MetricBatchSize {
        batch := ro.metrics.Batch(ro.MetricBatchSize)
        err := ro.write(batch)
        if err != nil {
            ro.failMetrics.Add(batch...)
        }
    }

在向metrics增加metrics的时候,做是否达到批量发送的数量,如果达到就调用发送方法。当然还有定时的解决方案,如果一直没有达到MetricBatchSize,也会在一定时间后发送数据。具体实现代码在agent.go中

    ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
    semaphore := make(chan struct{}, 1)
    for {
        select {
        case <-shutdown:
            log.Println("I! Hang on, flushing any cached metrics before shutdown")
            // wait for outMetricC to get flushed before flushing outputs
            wg.Wait()
            a.flush()
            return nil
        case <-ticker.C:
            go func() {
                select {
                case semaphore <- struct{}{}:
                    internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
                    a.flush()
                    <-semaphore
                default:
                    // skipping this flush because one is already happening
                    log.Println("W! Skipping a scheduled flush because there is" +
                        " already a flush ongoing.")
                }
            }()

在程序接受到停止信号后,程序会首先flush剩下的数据到output中,然后退出进程。这样可以保证一定的数据安全。

基于redis实现buffer的持久化

在持久化机制的选型中,优先实现redis。本身redis性能高,而且具备完善的持久化。
具体的实现架构如下:
k8s与监控--改造telegraf的buffer实现
将原buffer中功能抽象出buffer.go接口。
具体代码:

package buffer

import (
    "github.com/influxdata/telegraf"
    "github.com/influxdata/telegraf/internal/buffer/memory"
    "github.com/influxdata/telegraf/internal/buffer/redis"
)

const (
    BufferTypeForMemory = "memory"
    BufferTypeForRedis  = "redis"
)

type Buffer interface {
    IsEmpty() bool
    Len() int
    Add(metrics ...telegraf.Metric)
    Batch(batchSize int) []telegraf.Metric
}

func NewBuffer(mod string, size int, key, addr string) Buffer {
    switch mod {
    case BufferTypeForRedis:
        return redis.NewBuffer(size, key, addr)
    default:
        return memory.NewBuffer(size)
    }
}

然后分别内存和redis实现了Buffer接口。
其中NewBuffer相当于一个工厂方法。
当然在后期可以实现基于file和db等buffer实现,来满足不同的场景和要求。

redis实现buffer的要点

由于要满足先进先出的要求,选择了redis的list数据结构。redis中的list是一个字符串list,所以telegraf中metric数据接口要符合序列化的要求。比如属性需要可导出,即public。所以这点需要改动telegraf对于metric struct的定义。另外可以选择json或是msgpack等序列化方式。我们这边是采用的json序列化的方式。

结语

改造以后,可以根据自己的需求通过配置文件来决定使用channel或是redis来实现buffer。各有优劣,内存实现的话,性能高,受到的依赖少。而redis这种分布式存储,决定了数据安全,但是性能会有一定的损耗,毕竟有大量的序列化和反序列化以及网络传输,当然依赖也增加了,取决于redis的可靠性,建议redis集群部署。

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Mysql设置sort_buffer_size
sort\_buffer\_sizemysqlshowvariableslike‘%sort\_buffer\_size%’; ——————————— |Variable\_name|Value| ——————————— |innodb\_sort\_buffer\_size|1048576
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究