Home > Archives > ELK+logback+kafka+nginx 搭建分布式日志分析平台

ELK+logback+kafka+nginx 搭建分布式日志分析平台

Publish:

概要图

用户通过java应用程序的Slf4j写入日志,SpringBoot默认使用的是logback。我们通过实现自定义的Appender将日志写入kafka,同时logstash通过input插件操作kafka订阅其对应的主题。当有日志输出后被kafka的客户端logstash所收集,经过相关过滤操作后将日志写入Elasticsearch,此时用户可以通过kibana获取elasticsearch中的日志信息

  1. 我们是通过logback打印日志,然后将日志通过kafka消息队列发送到Logstash,经过处理以后存储到Elasticsearch中,然后通过Kibana图形化界面进行分析和处理。
  2. 我们使用Logstash读取日志文件,经过处理以后存储到Elasticsearch中,然后通过Kibana图形化界面进行分析和处理。例如我们读取nginx的日志文件,可以统计访问用户的ip地域,请求地址等等。

实验环境

安装配置Java

安装Elasticsearch

adduser elsearch
su elsearch
cd ~
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.2.tar.gz
tar -zxvf elasticsearch-6.3.2.tar.gz
cd elasticsearch-6.3.2
vim config/elasticsearch.yml

#如果配置集群的话,只要name一样就可以自动集群了,不需要单独配置
cluster.name: nelson
network.host: 0.0.0.0 #4个0表示外网可以访问
http.port: 9200 #默认http端口
transport.tcp.port: 9300 #默认tcp端口

vim config/jvm.options

-Xms450M
-Xmx450M

./bin/elasticsearch &

启动过程中,如有错误,百度可以找到解决方案

访问 http://101.132.227.185:9200/

安装Nginx

yum install -y nginx
vim /etc/nginx/nginx.conf

修改nginx的日志默认输出格式

http {
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /var/log/nginx/access.log  main;

    sendfile            on;

改为

http {
    log_format  json '{"@timestamp":"$time_iso8601",'
             '"@version":"1",'
             '"client":"$remote_addr",'
             '"url":"$uri",'
             '"status":"$status",'
             '"domian":"$host",'
             '"host":"$server_addr",'
             '"size":"$body_bytes_sent",'
             '"responsetime":"$request_time",'
             '"referer":"$http_referer",'
             '"ua":"$http_user_agent"'
          '}';

    access_log  /var/log/nginx/access.log  json;

    sendfile            on;

启动nginx服务

service nginx start

访问 http://101.132.227.185:80/

安装Logstash

cd /usr/local/
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2.tar.gz
tar -zxvf logstash-6.3.2.tar.gz

vim config/logstash.yml

http.host: "0.0.0.0" # 确保外网也可以访问
http.port: 9600

vim config/jvm.options

-Xms400M
-Xmx400M

配置输入输出

在config目录下新建文件nginx.conf

  1. input表示输入源,他这里有好多插件,支持很多数据源包括文件,http等,我们这里首先收集nginx的日志。file表示读取文件;codec表示读取的文件格式,因为我们前边配置了nginx的日志格式为json,所以这里是json;start_position表示从那一行读取,他会记录上一次读取到那个位置,所以就不用担心遗漏日志了。type相当于一个tag一样,可能这里有很多输入源,后面会根据这个type进行过滤。
  2. filter表示处理输入数据,因为我们前边配置了nginx的日志里边记录了用户的ip,所以我们使用geoip组件,可以根据ip匹配位置信息,下面表示你将使用那些fields字段;source表示输入json的那个属性。
  3. output表示输出到哪里,可以文件、redis等,这里我们保存到es里。利用elasticsearch插件,然后配置一下es的地址,索引我们是通过日期自动生成,表示每天创建一个索引
input {
    file {
        path => ["/var/log/nginx/access.log"]
        type => "nginx"
        codec => "json"
        start_position => "beginning"
    }
}

filter {
    geoip {
        fields => ["city_name", "country_name", "latitude", "longitude", "region_name","region_code"]
        source => "client"
    }
}

output {
    if [type] == "nginx" {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "nelson-nginx-%{+YYYY.MM.dd}"
        }
        stdout {}
    }
}

启动

./bin/logstash -f ./config/nginx.conf 

安装Kibana

cd /usr/local/
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.3.2-linux-x86_64.tar.gz
tar -zxvf kibana-6.3.2-linux-x86_64.tar.gz 

修改配置


server.host: 0.0.0.0

elasticsearch.url: "http://localhost:9200"
//我的配置
//elasticsearch.url: "http://101.132.227.185:9200"

启动

./bin/kibana &

访问

http://101.132.227.185:5601

安装Zookeeper

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
tar -zxvf zookeeper-3.4.12.tar.gz
#复制配置
cp zoo_sample.cfg zoo.cfg
修改配置
vi zoo.cfg
dataDir=/root/zk/data #改为你zk目录/data
./bin/zkServer.sh start

安装Kafka

wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz
tar -zxvf kafka_2.12-2.0.0.tgz
cd  kafka_2.12-2.0.0
vim config/server.properties

listeners = PLAINTEXT://172.19.95.188:9092  #阿里云内网地址
advertised.listeners=PLAINTEXT://101.132.227.185:9092  #阿里云外网地址

启动

bin/kafka-server-start.sh config/server.properties
# 创建一个 名称为applog 的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic applog

# 查看所有topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 消息的生产者,启动以后,在控制台输入信息,然后回车发送
# 指定了地址之后,localhost可能不适用
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic applog
bin/kafka-console-producer.sh --broker-list 172.19.95.188:9092 --topic applog
bin/kafka-console-producer.sh --broker-list 101.132.227.185:9092 --topic applog

# 消息的消费者,如果生产者那里给applog这个top输入信息发送,消费者这边就会在收到,然后在控制台打印出来。
# 指定了地址之后,localhost可能不适用
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic applog --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 172.19.95.188:9092 --topic applog --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 101.132.227.185:9092 --topic applog --from-beginning

程序中使用logback

修改logbstash的config目录吓得nginx.conf

input {
    #nginx日志的输入
    file {
        path => "/opt/access.log"
        type => "nginx"
        codec => "json"
        start_position => "beginning"
    }
    #kafka日志输入
     kafka {
        topics => "applog"
        type => "kafka"
        bootstrap_servers => "101.132.227.185:9092"
        codec => "json"
    }
}

filter {
    if [type] == "nginx" {
         geoip {
                fields => ["city_name", "country_name", "latitude", "longitude", "region_name","region_code"]
                source => "client"
         }
    }
}

output {
    #都输出到es中,但是索引不一样
    if [type] == "nginx" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "nelson-nginx-%{+YYYY.MM.dd}"
        }
        stdout {}
    }
    if [type] == "kafka" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "nelson-applogs-%{+YYYY.MM.dd}"
        }
        stdout {}
    }
}

阿里云网络端口配置

本篇文章就结束了,elk+nginx 和 elk+logback+kafka都已经实现了


—————— 报错如下: ERROR: [2] bootstrap checks failed [1]: max file descriptors [65535] for elasticsearch process is too low, increase to at least [65536] [2]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144] ————— 解决办法: sudo vi /etc/sysctl.conf

添加下面配置:

vm.max_map_count=655360 【修该项即可】

并执行命令:

sysctl -p

然后,重新启动elasticsearch,即可启动成功。

sudo vi /etc/security/limits.conf

添加如下内容:

* soft nofile 65536 #【修改该项即可】
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096

需要系统重启。 ——————- elasticSearch启用匿名登录。直接在config/elasticsearch.conf添加该行即可。 xpack.security.enabled: false ####开启用户认证 ——————– elasticSearch数据持久化,默认情况不启用数据保存,故数据一般会几分钟就消失,按以下步骤保存索引数据。

#不同的集群名字不能相同。
cluster.name: es_vm_test
node.name: vmmaster、
#数据索引保存
path.data: /home/abc/elk-5.5.1/elkdata/data
path.logs: /home/abc/elk-5.5.1/elkdata/log
#关闭登录验证
xpack.security.enabled: false

参考

参考2

实战配置如下

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Ding Bao Guo