logstash tcp multihost output(多目标主机输出,保证TCP输出链路的稳定性)

Wesley13
• 阅读 437

在清洗日志时,有一个应用场景,就是TCP输出时,需要在一个主机挂了的情况下,自已切换到下一个可用入口,而原tcp output仅支持单个目标主机设定。故本人在原tcp的基础上,开发出tcp_multihost输出插件,来满足此场景。

插件在一开始的时候会随机选择一个链路,而在链路出错连续超过3(默认)次后会尝试数组中下一个主机

github: http://github.com/xiaohelong2005

Logstash版本:1.4.2

文件位置:

# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "thread"
#@auhor:xiaohelong 
#@date:2014-10-24
#@email:xiaohelong2005@gmail.com
# Write events over a TCP socket.
#Auto select the host from iplist to tolerate the link
# Each event json is separated by a newline.
#
# Can  connect to a server,
class LogStash::Outputs::Tcp_Multihost < LogStash::Outputs::Base
  config_name "tcp_multihost"
  milestone  0
  default :codec, "json"
  # the address to connect to.
  #hosts config example
  config :hosts, :validate => :array, :required => true,:default=>{}
    config :reconnect_times, :validate => :number,:default=>3
  # When connect failed,retry interval in sec.
  config :reconnect_interval, :validate => :number, :default => 10
 #last available host
 @hosts_size=0
 #last available host ip
@host="0.0.0.0"
#last available port
@port=9200
#retry action count,if retry_count<=0,we need to update the host and port , also include retry_count itself
@retry_count=0
#get the desgined index data
@initloc=0

  public
  def register
    require "stud/try"
  #here we use the recorded host,if recorded host is not available, we need update it
  @retry_count=@reconnect_times
   @hosts_size=@hosts.length
    @logger.info("length:#@hosts_size; hosts:#@hosts")
   @initloc=Random.rand(@hosts_size)#generate 0-(hosts_size-1) int
    @logger.info("initloc:#@initloc")
    icount=0;
      @hosts.each do |hosthash|               
            @logger.info("hosthash info",hosthash)
       end#do
        @host=@hosts[@initloc].keys[0]        
       @port=@hosts[@initloc][@host]
       
      client_socket = nil
      @codec.on_event do |payload|
        begin
            @retry_count=@reconnect_times#here we need to init retry mark
          client_socket = connect unless client_socket
          r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)          
          # don't expect any reads, but a readable socket might
          # mean the remote end closed, so read it and throw it away.
          # we'll get an EOFError if it happens.
          client_socket.sysread(16384) if r.any?
          # Now send the payload
          client_socket.syswrite(payload) if w.any?
           @logger.info("tcp output info:", :host => @host, :port => @port,
                       :exception => e, :backtrace => e.backtrace)
        rescue => e
          @logger.warn("tcp output exception", :host => @host, :port => @port,
                       :exception => e, :backtrace => e.backtrace)
          client_socket.close rescue nil
          client_socket = nil
          @retry_count-=1
            @logger.info("retry_count:#@retry_count")
          if    @retry_count<=0
                                        @initloc+=1
                              @initloc=@initloc%@hosts_size #update  init location
                                           @host=@hosts[@initloc].keys[0]        
                                            @port=@hosts[@initloc][@host]
                   @retry_count=@reconnect_times #update retry_count
                      @logger.info("retry_count <=0,initloc:#@initloc,retry_count=#@retry_count:", :host => @host, :port => @port, :exception => e, :backtrace => e.backtrace)
          end
          sleep @reconnect_interval
          retry
        end
      end
  end # def register

  private
  def connect
    Stud::try do
      return TCPSocket.new(@host,@port)
    end
  end # def connect
  public
  def receive(event)
    return unless output?(event)
    @codec.encode(event)
  end # def receive
end # class LogStash::Outputs::Tcp_multihost

配置说明(我放在LOGSTASH_HOME/config):

output{

tcp_multihost{

   hosts=>[
            {"127.0.0.1"=>"9202"},
       {"localhost"=>"9201"},
   {"127.0.0.1"=>"9203"},
            {"127.0.0.1"=>"9204"}
         ] #主机列表
    workers =>16 #线程,默认1

reconnect_times=>3 # 默认3次, 尝试多少次数后切换

reconnect_interval=>3 #默认10秒,失败重连间隔

}

}

调用执行:

 "LOGSTASH_HOME/bin/logstash" agent --debug -f "LOGSTASH_HOME/config/shipper.config"  --pluginpath "LOGSTASH_HOME"

NC接收端可以尝试:

nc -lkv 9201 之类的

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
2年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
Android蓝牙连接汽车OBD设备
//设备连接public class BluetoothConnect implements Runnable {    private static final UUID CONNECT_UUID  UUID.fromString("0000110100001000800000805F9B34FB");
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
34.TCP取样器
阅读文本大概需要3分钟。1、TCP取样器的作用   TCP取样器作用就是通过TCP/IP协议来连接服务器,然后发送数据和接收数据。2、TCP取样器详解!(https://oscimg.oschina.net/oscnet/32a9b19ba1db00f321d22a0f33bcfb68a0d.png)TCPClien
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这