Logstash定时从ES获取日志

Logstash定时从ES获取日志

问题

日志数据存在elasticsearch中,索引为event_YYYYMMdd,logstash获取数据转发kafka,业务消费有去重机制。

logstash定时取日志

  • 时间范围查询最近六小时内日志数据
    {
    "query": {
      "bool": {
        "must": [
            {
                "match": {
                    "parse_filter_name": "UEBA"
                }
            }
        ],
      "filter": {
        "range": {
          "receive_time": {
            "gt": "now-1h/h",
            "lt": "now/h", 
            "format": "yyyy-MM-dd HH:mm:ss"
          }
        }
      } 
    }
    }
    }
  • logstash每六小时查询一次
  1. 设置环境变量,查询索引需要
    • 方式一 <推荐>
      vi bin/logstash添加 export INDEXDATE=date --date="-1 second" '+%Y%m%d'
    • 方式二
      Linux加载环境变量的顺序如下:
    • /etc/environment
    • /etc/profile
    • /etc/bash.bashrc
    • /etc/profile.d/test.sh
    • ~/.profile
    • ~/.bashrc
      添加变量,随后source file生效:export INDEXDATE=`date --date="-1 second" '+%Y%m%d'
  2. 配置文件设置
    input {
        elasticsearch {
           hosts=>["192.168.30.110:9200"]
           ssl=>true
           ca_file =>'/home/lukelee/Desktop/certs/ca.crt'
           user=>'elastic'
           password=>'Transfar'
           index=>"event_${INDEXDATE}"
           query=>'{"query":{"bool":{"must":[{"match":{"parse_filter_name":"UEBA"}}],"filter":{"range":{"receive_time":{"gt":"now-96h/h","format":"yyyy-MM-dd HH:mm:ss"}}}}}}'
        }
    }
    filter {
        json {
            source => "original_log"
        }
        ruby {
           # remove_field => [ "original_log"]
        }
    }
    output {
        # stdout { codec => rubydebug { metadata =>true } }
        kafka {
            client_id => "logstash-ztf-producer"
            codec => json
            compression_type => "gzip"
            bootstrap_servers =>"127.0.0.1:9092"
            topic_id => "logstash-t"
        }
    }
  3. 设置系统定时任务,定时启动logstash收集日志数据

    • /etc/cron.hourly放置脚本:
    SHELL=/bin/bash 
    PATH=/sbin:/bin:/usr/sbin:/usr/bin 
    MAILTO=522924775@qq.com      ##如果出现错误,或者有数据输出,数据作为邮件发给这个帐号 
    HOME=/home         ##使用者运行的路径,这里是根目录 
    # run-parts 
    0 * * * * nohup logstash -f xxx.conf &

    run-parts --test /etc/cron.hourly查看任务列表

    • 直接添加任务
      • 边界定时任务里列表:sudo crontab -e
      • 添加任务:0 supervisorctl start logstash
      • 查看定时任务:sudo crontab -l

其他

LOGSTASH导入导出ES数据

input {
    elasticsearch {
        hosts => "192.168.30.102:9200"
        index => "event_20210115"
        query =>'{"query":{"match_all":{}}}'
        docinfo => true
      user => "elastic"
    password => "Transfar@123"
    ssl => true
    ca_file =>  'D:/Doc/Desktop/102/ca.pem'
    }
}

filter {

}
output {

        stdout {
            codec => rubydebug
        }

        elasticsearch{
            hosts=>"192.168.30.172:9200"
            index => "event_20210115"
      user => "elastic"
      password => "Transfar@123"
      ssl => true
     cacert => 'D:\Doc\Desktop\172\ca.pem'
        }
}