Use Logstash pipelines for parsing
editUse Logstash pipelines for parsing
editThe examples in this section show how to build Logstash pipeline configurations that replace the ingest pipelines provided with Filebeat modules. The pipelines take the data collected by Filebeat modules, parse it into fields expected by the Filebeat index, and send the fields to Elasticsearch so that you can visualize the data in the pre-built dashboards provided by Filebeat.
This approach is more time consuming than using the existing ingest pipelines to parse the data, but it gives you more control over how the data is processed. By writing your own pipeline configurations, you can do additional processing, such as dropping fields, after the fields are extracted, or you can move your load from Elasticsearch ingest nodes to Logstash nodes.
Before deciding to replaced the ingest pipelines with Logstash configurations, read Use ingest pipelines for parsing.
Here are some examples that show how to implement Logstash configurations to replace ingest pipelines:
Logstash provides an ingest pipeline conversion tool to help you migrate ingest pipeline definitions to Logstash configs. The tool does not currently support all the processors that are available for ingest node, but it’s a good starting point.
Apache 2 Logs
editThe Logstash pipeline configuration in this example shows how to ship and parse
access and error logs collected by the
apache2
Filebeat module.
input { beats { port => 5044 host => "0.0.0.0" } } filter { if [fileset][module] == "apache2" { if [fileset][name] == "access" { grok { match => { "message" => ["%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \[%{HTTPDATE:[apache2][access][time]}\] \"%{WORD:[apache2][access][method]} %{DATA:[apache2][access][url]} HTTP/%{NUMBER:[apache2][access][http_version]}\" %{NUMBER:[apache2][access][response_code]} %{NUMBER:[apache2][access][body_sent][bytes]}( \"%{DATA:[apache2][access][referrer]}\")?( \"%{DATA:[apache2][access][agent]}\")?", "%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \\[%{HTTPDATE:[apache2][access][time]}\\] \"-\" %{NUMBER:[apache2][access][response_code]} -" ] } remove_field => "message" } mutate { add_field => { "read_timestamp" => "%{@timestamp}" } } date { match => [ "[apache2][access][time]", "dd/MMM/YYYY:H:m:s Z" ] remove_field => "[apache2][access][time]" } useragent { source => "[apache2][access][agent]" target => "[apache2][access][user_agent]" remove_field => "[apache2][access][agent]" } geoip { source => "[apache2][access][remote_ip]" target => "[apache2][access][geoip]" } } else if [fileset][name] == "error" { grok { match => { "message" => ["\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{LOGLEVEL:[apache2][error][level]}\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message]}", "\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}\] \[pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message1]}" ] } pattern_definitions => { "APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}" } remove_field => "message" } mutate { rename => { "[apache2][error][message1]" => "[apache2][error][message]" } } date { match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ] remove_field => "[apache2][error][timestamp]" } } } } output { elasticsearch { hosts => localhost manage_template => false index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" } }
MySQL Logs
editThe Logstash pipeline configuration in this example shows how to ship and parse
error and slowlog logs collected by the
mysql
Filebeat module.
input { beats { port => 5044 host => "0.0.0.0" } } filter { if [fileset][module] == "mysql" { if [fileset][name] == "error" { grok { match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} (\[%{DATA:[mysql][error][level]}\] )?%{GREEDYDATA:[mysql][error][message]}", "%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} \[%{DATA:[mysql][error][level]}\] %{GREEDYDATA:[mysql][error][message1]}", "%{GREEDYDATA:[mysql][error][message2]}"] } pattern_definitions => { "LOCALDATETIME" => "[0-9]+ %{TIME}" } remove_field => "message" } mutate { rename => { "[mysql][error][message1]" => "[mysql][error][message]" } } mutate { rename => { "[mysql][error][message2]" => "[mysql][error][message]" } } date { match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ] remove_field => "[mysql][error][time]" } } else if [fileset][name] == "slowlog" { grok { match => { "message" => ["^# User@Host: %{USER:[mysql][slowlog][user]}(\[[^\]]+\])? @ %{HOSTNAME:[mysql][slowlog][host]} \[(IP:[mysql][slowlog][ip])?\](\s*Id:\s* %{NUMBER:[mysql][slowlog][id]})?\n# Query_time: %{NUMBER:[mysql][slowlog][query_time][sec]}\s* Lock_time: %{NUMBER:[mysql][slowlog][lock_time][sec]}\s* Rows_sent: %{NUMBER:[mysql][slowlog][rows_sent]}\s* Rows_examined: %{NUMBER:[mysql][slowlog][rows_examined]}\n(SET timestamp=%{NUMBER:[mysql][slowlog][timestamp]};\n)?%{GREEDYMULTILINE:[mysql][slowlog][query]}"] } pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" } remove_field => "message" } date { match => [ "[mysql][slowlog][timestamp]", "UNIX" ] } mutate { gsub => ["[mysql][slowlog][query]", "\n# Time: [0-9]+ [0-9][0-9]:[0-9][0-9]:[0-9][0-9](\\.[0-9]+)?$", ""] } } } } output { elasticsearch { hosts => localhost manage_template => false index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" } }
Nginx Logs
editThe Logstash pipeline configuration in this example shows how to ship and parse
access and error logs collected by the
nginx
Filebeat module.
input { beats { port => 5044 host => "0.0.0.0" } } filter { if [fileset][module] == "nginx" { if [fileset][name] == "access" { grok { match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] } remove_field => "message" } mutate { add_field => { "read_timestamp" => "%{@timestamp}" } } date { match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ] remove_field => "[nginx][access][time]" } useragent { source => "[nginx][access][agent]" target => "[nginx][access][user_agent]" remove_field => "[nginx][access][agent]" } geoip { source => "[nginx][access][remote_ip]" target => "[nginx][access][geoip]" } } else if [fileset][name] == "error" { grok { match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] } remove_field => "message" } mutate { rename => { "@timestamp" => "read_timestamp" } } date { match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ] remove_field => "[nginx][error][time]" } } } } output { elasticsearch { hosts => localhost manage_template => false index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" } }
System Logs
editThe Logstash pipeline configuration in this example shows how to ship and parse
system logs collected by the
system
Filebeat module.
input { beats { port => 5044 host => "0.0.0.0" } } filter { if [fileset][module] == "system" { if [fileset][name] == "auth" { grok { match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] } pattern_definitions => { "GREEDYMULTILINE"=> "(.|\n)*" } remove_field => "message" } date { match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } geoip { source => "[system][auth][ssh][ip]" target => "[system][auth][ssh][geoip]" } } else if [fileset][name] == "syslog" { grok { match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] } pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" } remove_field => "message" } date { match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } } } output { elasticsearch { hosts => localhost manage_template => false index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" } }