最新Centos7.6 部署ELK日志分析系统

构建客
• 阅读 1565

下载elasticsearch

创建elk用户并授权
useradd elk
chown -R elk:elk /home/elk/elasticsearch
chown -R elk:elk /home/elk/elasticsearch1
chown -R elk:elk /home/elk/elasticsearch2
mkdir -p /home/eladata
mkdir -p /var/log/elk
chown -R elk:elk /home/eladata
chown -R elk:elk /var/log/elk

主节点master

elasticsearch解压,修改配置文件
/home/elk/elasticsearch/config
[root@localhost config]# grep -v  "^#" elasticsearch.yml 
cluster.name: my-application
node.name: node0
node.master: true
node.attr.rack: r1
node.max_local_storage_nodes: 3
path.data: /home/eladata
path.logs: /var/log/elk
http.cors.enabled: true
http.cors.allow-origin: "*"
network.host: 192.168.1.70
http.port: 9200
transport.tcp.port: 9301
discovery.zen.minimum_master_nodes: 1
cluster.initial_master_nodes: ["node0"]
手动启动命令
su elk -l -c '/home/elk/elasticsearch/bin/elasticsearch -d'
启动文件 elasticsearch.service
[root@localhost system]# pwd
/lib/systemd/system
[root@localhost system]# cat elasticsearch.service 
[Unit]
Description=Elasticsearch
Documentation=http://www.elastic.co
Wants=network-online.target
After=network-online.target
[Service]
RuntimeDirectory=elasticsearch
PrivateTmp=true
Environment=ES_HOME=/home/elk/elasticsearch
Environment=ES_PATH_CONF=/home/elk/elasticsearch/config
Environment=PID_DIR=/var/run/elasticsearch
EnvironmentFile=-/etc/sysconfig/elasticsearch
WorkingDirectory=/home/elk/elasticsearch
User=elk
Group=elk
ExecStart=/home/elk/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quiet
StandardOutput=journal
StandardError=inherit
LimitNOFILE=65536
LimitNPROC=4096
LimitAS=infinity
LimitFSIZE=infinity
TimeoutStopSec=0
KillSignal=SIGTERM
KillMode=process
SendSIGKILL=no
SuccessExitStatus=143
[Install]
WantedBy=multi-user.target

[root@localhost system]# 

Node1节点

/home/elk/elasticsearch1/config
[root@localhost config]# grep -v  "^#" elasticsearch.yml 
cluster.name: my-application
node.name: node1
node.master: false
node.attr.rack: r1
node.max_local_storage_nodes: 3
path.data: /home/eladata
path.logs: /var/log/elk
http.cors.enabled: true
http.cors.allow-origin: "*"
network.host: 192.168.1.70
transport.tcp.port: 9303
http.port: 9302
discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"]
[root@localhost config]# 
手动启动命令
su elk -l -c '/home/elk/elasticsearch1/bin/elasticsearch1 -d'
启动文件 elasticsearch1.service
[root@localhost system]# pwd
/lib/systemd/system
[root@localhost system]# cat elasticsearch1.service 
[Unit]
Description=Elasticsearch
Documentation=http://www.elastic.co
Wants=network-online.target
After=network-online.target
[Service]
RuntimeDirectory=elasticsearch1
PrivateTmp=true
Environment=ES_HOME=/home/elk/elasticsearch1
Environment=ES_PATH_CONF=/home/elk/elasticsearch1/config
Environment=PID_DIR=/var/run/elasticsearch
EnvironmentFile=-/etc/sysconfig/elasticsearch
WorkingDirectory=/home/elk/elasticsearch
User=elk
Group=elk
ExecStart=/home/elk/elasticsearch1/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quiet
StandardOutput=journal
StandardError=inherit
LimitNOFILE=65536
LimitNPROC=4096
LimitAS=infinity
LimitFSIZE=infinity
TimeoutStopSec=0
KillSignal=SIGTERM
KillMode=process
SendSIGKILL=no
SuccessExitStatus=143
[Install]
WantedBy=multi-user.target

[root@localhost system]# 

Node2节点

/home/elk/elasticsearch2/config
[root@localhost config]# grep -v  "^#" elasticsearch.yml 
cluster.name: my-application
node.name: node2
node.attr.rack: r1
node.master: false
node.max_local_storage_nodes: 3
path.data: /home/eladata
path.logs: /var/log/elk
http.cors.enabled: true
http.cors.allow-origin: "*"
network.host: 192.168.1.70
http.port: 9203
transport.tcp.port: 9304
discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"]
discovery.zen.minimum_master_nodes: 1
[root@localhost config]# 
手动启动命令
su elk -l -c '/home/elk/elasticsearch2/bin/elasticsearch2 -d'
启动文件 elasticsearch2.service
[root@localhost system]# pwd
/lib/systemd/system
[root@localhost system]# cat elasticsearch2.service 
[Unit]
Description=Elasticsearch
Documentation=http://www.elastic.co
Wants=network-online.target
After=network-online.target
[Service]
RuntimeDirectory=elasticsearch2
PrivateTmp=true
Environment=ES_HOME=/home/elk/elasticsearch2
Environment=ES_PATH_CONF=/home/elk/elasticsearch2/config
Environment=PID_DIR=/var/run/elasticsearch
EnvironmentFile=-/etc/sysconfig/elasticsearch
WorkingDirectory=/home/elk/elasticsearch2
User=elk
Group=elk
ExecStart=/home/elk/elasticsearch2/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quiet
StandardOutput=journal
StandardError=inherit
LimitNOFILE=65536
LimitNPROC=4096
LimitAS=infinity
LimitFSIZE=infinity
TimeoutStopSec=0
KillSignal=SIGTERM
KillMode=process
SendSIGKILL=no
SuccessExitStatus=143
[Install]
WantedBy=multi-user.target

[root@localhost system]# 

下载logstash

目录如下,默认配置即可
[root@localhost logstash]# pwd
/home/elk/logstash
[root@localhost logstash]#
手动启动命令
./logstash -f ../dev.conf 
nohup ./logstash -f ../dev.conf &

下载kibana

配置文件如下
[root@localhost config]# pwd
/home/elk/kibana/config
[root@localhost config]# grep -v  "^#" kibana.yml 
server.host: "192.168.1.70"
elasticsearch.hosts: ["http://192.168.1.70:9200"]
kibana.index: ".kibana"
i18n.locale: "zh-CN"
手动启动命令
./kibana
nohup ./kibana &
kibana启动文件
[root@localhost system]# pwd
/lib/systemd/system
[root@localhost system]# cat kibana.service 
[Unit]
Description=Kibana  Server Manager
[Service]
ExecStart=/home/elk/kibana/bin/kibana
[Install]
WantedBy=multi-user.target
[root@localhost system]# 

端口为:5601 访问:192.168.1.70:5601

安装Elasticsearch -head

yum install git npm
git clone https://github.com/mobz/elasticsearch-head.git 
[root@localhost elasticsearch-head]# pwd
/home/elk/elasticsearch-head
[root@localhost elasticsearch-head]#
启动
npm install 
npm run start
nohup npm run start & 

curl -XPUT '192.168.2.67:9100/book'

访问192.168.2.67:9100 即可访问

下载kafka

修改配置文件如下
[root@localhost config]# pwd
/home/elk/kafka/config
[root@localhost config]# grep -v "^#" server.properties 
broker.id=0
listeners=PLAINTEXT://192.168.1.70:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/log/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
[root@localhost config]# 

kafka配置启动zookeeper

手动启动方式
[root@localhost bin]# pwd
/home/elk/kafka/bin
[root@localhost bin]#
./zookeeper-server-start.sh ../config/zookeeper.properties
systemctl 启动zookeeper
[root@localhost system]# pwd
/lib/systemd/system
[root@localhost system]# cat zookeeper.service 
[Service]
Type=forking
SyslogIdentifier=zookeeper
Restart=always
RestartSec=0s
ExecStart=/home/elk/kafka/bin/zookeeper-server-start.sh -daemon /home/elk/kafka/config/zookeeper.properties
ExecStop=/home/elk/kafka/bin/zookeeper-server-stop.sh
[root@localhost system]#

启动kafka服务

手动启动方式
./kafka-server-start.sh ../config/server.properties
systemctl 启动kafka
[root@localhost system]# pwd
/lib/systemd/system
[root@localhost system]# cat kafka.service 
[Unit]
Description=Apache kafka
After=network.target
[Service]
Type=simple
Restart=always
RestartSec=0s
ExecStart=/home/elk/kafka/bin/kafka-server-start.sh  /home/elk/kafka/config/server.properties
ExecStop=/home/elk/kafka/bin/kafka-server-stop.sh
[root@localhost system]# 

测试kafka

新建一个名字为test的topic
/kafka-topics.sh --create --zookeeper 192.168.1.70:2181 --replication-factor 1 --partitions 1 --topic test
查看kafka中的topic
./kafka-topics.sh --list  --zookeeper 192.168.1.70:2181
往kafka topic为test中 生产消息
./kafka-console-producer.sh --broker-list 192.168.1.70:9092 --topic test
在kafka topic为test中 消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.70:9092 --topic test --from-beginning

生产的消息,消费那边接受到即是ok的

目标机器安装filebeat

安装6.5版本的

[root@localhost filebeat]# pwd
/usr/local/filebeat
[root@localhost filebeat]# cat filebeat.yml 
filebeat.prospectors:
- type: log
  paths:
    - /opt/logs/workphone-tcp/catalina.out
  fields:
     tag: 54_tcp_catalina_out
- type: log
  paths:
    - /opt/logs/workphone-webservice/catalina.out
  fields:
     tag: 54_web_catalina_out
name: 192.168.1.54
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
output.kafka:
  hosts: ["192.168.1.70:9092"]
  topic: "filebeat-log"
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1

[root@localhost filebeat]# 

安装完成后去logstash编辑配置文件

logstash操作

[root@localhost logstash]# pwd
/home/elk/logstash
[root@localhost logstash]# cat dev.conf 
input {
  kafka{
    bootstrap_servers => "192.168.1.70:9092"
    topics => ["filebeat-log"]
    codec => "json"
  }
}
filter {
        if [fields][tag]=="jpwebmap" {
            json{
                source => "message"
                remove_field => "message"
            }
            geoip {
            source => "client"
            target => "geoip"
             add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
             add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
            }
             mutate {
                convert => [ "[geoip][coordinates]", "float"]
                }
        }
    if [fields][tag] == "54_tcp_catalina_out"{
            grok {
                match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
            }
            date {
                match => ["logdate", "ISO8601"]
            }
            mutate {
                 remove_field => [ "logdate" ]
            }
      }
    if [fields][tag] == "54_web_catalina_out"{
                grok {
                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
                }
                date {
                        match => ["logdate", "ISO8601"]
                }
                mutate {
                        remove_field => [ "logdate" ]
                }
        }
    if [fields][tag] == "55_tcp_catalina_out"{
                grok {
                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
                }
                date {
                        match => ["logdate", "ISO8601"]
                }
                mutate {
                        remove_field => [ "logdate" ]
                }
        }
        if [fields][tag] == "55_web_catalina_out"{
                grok {
                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
                }
                date {
                        match => ["logdate", "ISO8601"]
                }
                mutate {
                        remove_field => [ "logdate" ]
                }
        }
    if [fields][tag] == "51_nginx80_access_log" {
            mutate {
                add_field => { "spstr" => "%{[log][file][path]}" }
            }
               mutate {
                split => ["spstr" , "/"]
                # save the last element of the array as the api_method.
                add_field => ["src", "%{[spstr][-1]}" ]
            }
            mutate{
                remove_field => [ "friends", "ecs", "agent" , "spstr" ]
            }
            grok {
                match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }
                remove_field => "message"
            }
            date {
                    match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]
                    target => "@timestamp"
            }
            geoip {
                source => "x_forwarded_for"
                target => "geoip"
                database => "/home/elk/logstash/GeoLite2-City.mmdb"
                add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
            }
            mutate {
                convert => [ "[geoip][coordinates]", "float"]
            }
      }
}
output {
if [fields][tag] == "wori"{
  elasticsearch {
   hosts => ["192.168.1.70:9200"]
   index => "zabbix"
       }
   }
if [fields][tag] == "54_tcp_catalina_out"{
  elasticsearch {
   hosts => ["192.168.1.70:9200"]
   index => "54_tcp_catalina_out"
       } 
   }
if [fields][tag] == "54_web_catalina_out"{
  elasticsearch {
   hosts => ["192.168.1.70:9200"]
   index => "54_web_catalina_out"
       } 
   }
if [fields][tag] == "55_tcp_catalina_out"{
  elasticsearch {
   hosts => ["192.168.1.70:9200"]
   index => "55_tcp_catalina_out"
       } 
   }   
if [fields][tag] == "55_web_catalina_out"{
  elasticsearch {
   hosts => ["192.168.1.70:9200"]
   index => "55_web_catalina_out"
       } 
   }
if [fields][tag] == "51_nginx80_access_log" {
       stdout{}
      elasticsearch {
       hosts => ["192.168.1.70:9200"]
       index => "51_nginx80_access_log"
       }
   }
}

其他的配置文件

index.conf
filter {
    mutate {
        add_field => { "spstr" => "%{[log][file][path]}" }
    }
        mutate {
        split => ["spstr" , "/"]
        # save the last element of the array as the api_method.
        add_field => ["src", "%{[spstr][-1]}" ]
        }
        mutate{
    remove_field => [ "friends", "ecs", "agent" , "spstr" ]
    }
}
java.conf
filter {
if [fields][tag] == "java"{
    grok {
        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
    }
    date {
        match => ["logdate", "ISO8601"]
    }
    mutate {
         remove_field => [ "logdate" ]
    }
  } #End if
}
kafkainput.conf
input {
  kafka{
    bootstrap_servers => "172.16.11.68:9092"
    #topics => ["ql-prod-tomcat" ]
    topics => ["ql-prod-dubbo","ql-prod-nginx","ql-prod-tomcat" ]
    codec => "json"
    consumer_threads => 5
    decorate_events => true
    #auto_offset_reset => "latest"
    group_id => "logstash"
    #client_id => ""
    ############################# HELK Optimizing Latency #############################
    fetch_min_bytes => "1"
    request_timeout_ms => "305000"
    ############################# HELK Optimizing Availability #############################
    session_timeout_ms => "10000"
    max_poll_records => "550"
    max_poll_interval_ms => "300000"
  }

}
#input {
#  kafka{
#    bootstrap_servers => "172.16.11.68:9092"
#    topics => ["ql-prod-java-dubbo","ql-prod","ql-prod-java" ]
#    codec => "json"
#    consumer_threads => 15
#    decorate_events => true
#    auto_offset_reset => "latest"
#    group_id => "logstash-1"
#    ############################# HELK Optimizing Latency #############################
#    fetch_min_bytes => "1"
#    request_timeout_ms => "305000"
#    ############################# HELK Optimizing Availability #############################
#    session_timeout_ms => "10000"
#    max_poll_records => "550"
#    max_poll_interval_ms => "300000"
#  }

#}
nginx.conf
filter {
if [fields][tag] == "nginx-access" {
        mutate {
        add_field => { "spstr" => "%{[log][file][path]}" }
        }
        mutate {
        split => ["spstr" , "/"]
        # save the last element of the array as the api_method.
        add_field => ["src", "%{[spstr][-1]}" ]
        }
        mutate{
        remove_field => [ "friends", "ecs", "agent" , "spstr" ]
        }

    grok {
        match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }
        remove_field => "message"
    }
    date {
                match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]
                target => "@timestamp"
        }
    geoip {
        source => "x_forwarded_for"
        target => "geoip"
        database => "/opt/logstash-6.2.4/GeoLite2-City.mmdb"
        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]

        }
    mutate {
        convert => [ "[geoip][coordinates]", "float"]
    }

  } #endif
}
ouput.conf
output{
  if [fields][tag] == "nginx-access" {
       stdout{}
      elasticsearch {
       user => elastic
       password => WR141bp2sveJuGFaD4oR
       hosts => ["172.16.11.67:9200"]
       index => "logstash-%{[fields][proname]}-%{+YYYY.MM.dd}"
       }
   }
       #stdout{}
   if [fields][tag] == "java" {
        elasticsearch {
        user => elastic
        password => WR141bp2sveJuGFaD4oR
        hosts => ["172.16.11.66:9200","172.16.11.68:9200"]
        index => "%{[host][name]}-%{[src]}"
        }
  }
}
点赞
收藏
评论区
推荐文章
Stella981 Stella981
4年前
CentOS 7安装部署ELK 6.2.4
一、ELK介绍ELK是三款开源软件的缩写,即:ElasticSearchLogstashKibana。这三个工具组合形成了一套实用、易用的监控架构,可抓取系统日志、apache日志、nginx日志、mysql日志等多种日志类型,目前很多公司用它来搭建可视化的集中式日志分析平台。ElasticSearch:是一个分布式的R
Wesley13 Wesley13
4年前
ELK最佳实践
1.ELK最佳实践解析!(https://oscimg.oschina.net/oscnet/50d3ea4fa3946e374b0a03fb0e5795f4cb2.png)a.用户通过nginx或haproxy访问ELK日志统计平台,IP地址为keepalived的vip地址;b.nginx将请求转发到kibana;c.kibana到e
Stella981 Stella981
4年前
Docker 搭建 ELK 集群步骤
前言本篇文章主要介绍在两台机器上使用Docker搭建ELK。正文环境CentOS7.7系统Dockerversion19.03.8dockercomposeversion1.23.2系统设置vim编辑/etc/secur
Stella981 Stella981
4年前
ELK(ElasticSearch, Logstash, Kibana)搭建实时日志分析平台
ELK平台介绍在搜索ELK资料的时候,发现这篇文章比较好,于是摘抄一小段:以下内容来自:http://baidu.blog.51cto.com/71938/1676798(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fbaidu.blog.51cto.com%2F
Wesley13 Wesley13
4年前
ELK7.4.2安装教程
ELK简介“ELK”是三个开源项目的首字母缩写,这三个项目分别是:Elasticsearch、Logstash和Kibana。Elasticsearch是一个搜索和分析引擎。Logstash是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如Elasticsearch等“存储库”中。Kibana
Wesley13 Wesley13
4年前
ELK初探
EKL核心组成1.ElasticSearch开源分布式搜索引擎,他的特点是分布式、零配置、自动发现、索引自动分片,索引副本机制,restful接口,多数据源,自动搜索负载。安装ElasticSearch  高可用,易扩展,支持集群(cluster),分片和复制(sharding和replicas)验证启动:curlXGETht
Wesley13 Wesley13
4年前
ELK学习
   大型网站遇到性能瓶颈或发生故障时,分析日志往往是发现问题根源最有效的手段。传统的日志分析手段不外乎以下几类:1\.运维人员用脚本grep,分析再汇总2\.通过流式计算引擎,storm/spark实时产生汇总数据,供监控分析3\.将数据堆放到HDFS,之后通过map/reduce定期做批量分析一个完整的集中式日志系统,需要包
Wesley13 Wesley13
4年前
ELK环境搭建完整说明
ELK环境搭建完整说明ELK:ElasticSerach、Logstash、Kibana三款产品名称的首字母集合,用于日志的搜集和搜索。简单地理解为我们可以把服务端的日志(nginx、tomcat等)直接web化展示查看,十分方便。本机环境说明:系统:centos7.5Elastic
Stella981 Stella981
4年前
Centos7下ELK+Redis日志分析平台的集群环境部署记录
转载于http://www.cnblogs.com/kevingrace/p/9104423.html之前的文档介绍了ELK架构的基础知识(推荐参考下http://blog.oldboyedu.com/elk/(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fblog.oldboyed
Wesley13 Wesley13
4年前
ELK
一、基本概念1Node与ClusterElastic本质上是一个分布式数据库,允许多台服务器协同工作,每台服务器可以运行多个Elastic实例。单个Elastic实例称为一个节点(node)。一组节点构成一个集群(cluster)。2IndexElastic会索引所有字段,经过处理后写入一个反向索引
Stella981 Stella981
4年前
ELK项目es+kibana+jdk配置问题
简介ELK需求背景业务发展越来越庞大,服务器越来越多各种访问日志、应用日志、错误日志量越来越多开发人员排查问题,需要到服务器上查日志,不方便运营人员需要一些数据,需要我们运维到服务器上分析日志说白了就是日志那么多我要给你们搞牛逼的系统如果有机会再搞日志系统加消息列队eskafka消化来的数据ELK包含Elasti
构建客
构建客
Lv1
君不见高堂明镜悲白发,朝如青丝暮成雪。
文章
3
粉丝
0
获赞
0