linux

ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台

发布时间:6年前热度: 7563 ℃评论数:

一、说明

1.Filebeat是一个日志文件托运工具,在你的服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读)

2.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据

3.Logstash是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景
4.ElasticSearch它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口

5.Kibana是ElasticSearch的用户界面

6.在实际应用场景下,为了满足大数据实时检索的场景,利用Filebeat去监控日志文件,将Kafka作为Filebeat的输出端,Kafka实时接收到Filebeat后以Logstash作为输出端输出,到Logstash的数据也许还不是我们想要的格式化或者特定业务的数据,这时可以通过Logstash的一些过了插件对数据进行过滤最后达到想要的数据格式以ElasticSearch作为输出端输出,数据到ElasticSearch就可以进行丰富的分布式检索了

二、系统环境

软件版本:filebeat6.5 logstash6.5 Centos7  yjava-1.8.0-openjdk 

数据流向:filebeat --->> kafka--->> logstash

三、安装

1、Filebeat安装

[root@test ~]# yum install https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.5.4-x86_64.rpm

2.Logstash的安装

[root@test ~]# yum install https://artifacts.elastic.co/downloads/logstash/logstash-6.5.4.rpm

3.kafka的安装

[root@localhost ~]#  wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.12-2.1.0.tgz

[root@localhost ~]# wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

[root@localhost ~]# tar xf zookeeper-3.4.10.tar.gz -C /usr/local/

[root@localhost ~]# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/

[root@localhost ~]# mv /usr/local/zookeeper-3.4.10/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.10/conf/zoo.cfg

[root@localhost ~]# /usr/local/zookeeper-3.4.10/bin/zkServer.sh start

[root@localhost ~]# vim  /usr/local/kafka_2.12-2.1.0/config/server.properties 

broker.id=0

listeners=PLAINTEXT://192.168.30.160:9092  # 默认监听所有地址,这个需要指定一个IP,否则在filebeat那往kafka中发送数据会失败。

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=24

log.segment.bytes=1073741824

log.cleaner.enable=true

log.retention.check.interval.ms=300000

zookeeper.connect=192.168.30.160:2181

zookeeper.connection.timeout.ms=600

group.initial.rebalance.delay.ms=0

[root@localhost ~]#  /usr/local/kafka_2.12-2.1.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.1.0/config/server.properties

[root@localhost ~]# ss -tunl|grep 9092

tcp    LISTEN     0      50     ::ffff:192.168.30.160:9092                 :::*     

备注:一定得注意filebeat版本所兼容的kafka版本,详细查看:https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html#_configuration_options_9

四、filebeat配置

[root@test ~]# cat /etc/filebeat/filebeat.yml

filebeat.inputs:

- type: log

  enabled: true

  paths:

    - /var/log/httpd/access_log

  fields:

    log_topics: httpd

  tags: ["httpd"]


- type: log

  enabled: true

  paths:

  - /var/log/messages

  fields:

    log_topics: messages

  tags: ["messages"]

  include_lines: ['^Dec'] # 匹配需要行

  tail_files: true # 从文件尾开始监控文件新增内容

processors:

- drop_fields:  

     fields: ["beat","input","source","offset"] # 过滤不需要的字段

name api-server # 添加字段,标记主机的日志来源

output.kafka:

  enabled: true  

  codec.format:                # 默认json格式

    string: '%{[message]}'   # 只输原始日志格式

  hosts: ["192.168.30.160:9092"]

  topic: '%{[fields][log_topics]}'

  partition.round_robin: # 集群是否开启kafka的partition分区

    reachable_only: false

  worker: 2

  required_acks: 1

  compression: gzip # 压缩格式

  max_message_bytes: 10000000 # 压缩格式字节大小


配置文件测试

filebeat test config -c filebeat.yml 

[root@test ~]# systemctl start filebeat && systemctl enable filebeat

五、logstash配置文件


[root@test ~]# cat /etc/logstash/conf.d/logstash.conf

input {

  kafka {

        bootstrap_servers => "192.168.30.160:9092"

        topics => ["httpd","messages"]

        codec => "json" 

        consumer_threads => 4                                   # 消费进程数

        enable_auto_commit => true                              # 自动提交offset

        auto_commit_interval_ms => "1000"                       # consumer 隔多久提交 

                                                                # offsets --消费指针

        auto_offset_reset => "latest"                           # earliest 

                                                                # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 

                                                                # latest 

                                                                # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 

                                                                # none 

                                                                # topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

  }

}

output {


     if [tags] == "httpd" {

         elasticsearch {

         hosts => ["192.168.30.160:9200"]

         index => "logstash-httpd-%{+YYYY.MM.dd}"

              manage_template => true

         }

      }

     if [tags] == "messages" {

         elasticsearch {

         hosts => ["192.168.30.160:9200"]

         index => "logstash-messages-%{+YYYY.MM.dd}"

              manage_template => true

        }

     }

}


  #stdout { codec => rubydebug }

  #path => "/tmp/logstash.log"


}


配置文件测试

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf -t

[root@test ~]# systemctl start logstash && systemctl enable logstash

六、kafka常用命令

创建kafka topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1  --replication-factor 1


删除kafka-topic

bin/kafka-topics.sh --zookeeper localhost:2181  --delete  --topic test


查看所有topic列表

bin/kafka-topics.sh --zookeeper localhost:2181 --list


查看指定topic信息

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test


控制台向topic生产数据

bin/kafka-console-producer.sh --broker-list 192.168.30.160:9092 --topic test


控制台消费topic的数据

bin/kafka-console-consumer.sh --bootstrap-server 192.168.30.160:9092 --topic test --from-beginning


Filebeat,Kafka

手机扫码访问