Enriching Data with Lookups
editEnriching Data with Lookups
editThe plugins described in this section are useful for enriching data with additional info, such as GeoIP and user agent info.
- dns filter
-
Performs a standard or reverse DNS lookup.
The following config performs a reverse lookup on the address in the
source_host
field and replaces it with the domain name:filter { dns { reverse => [ "source_host" ] action => "replace" } }
- elasticsearch filter
-
Copies fields from previous log events in Elasticsearch to current events.
The following config shows a complete example of how this filter might be used. Whenever Logstash receives an "end" event, it uses this Elasticsearch filter to find the matching "start" event based on some operation identifier. Then it copies the
@timestamp
field from the "start" event into a new field on the "end" event. Finally, using a combination of the date filter and the ruby filter, the code in the example calculates the time duration in hours between the two events.if [type] == "end" { elasticsearch { hosts => ["es-server"] query => "type:start AND operation:%{[opid]}" fields => { "@timestamp" => "started" } } date { match => ["[started]", "ISO8601"] target => "[started]" } ruby { code => 'event.set("duration_hrs", (event.get("@timestamp") - event.get("started")) / 3600) rescue nil' } }
- geoip filter
-
Adds geographical information about the location of IP addresses. For example:
filter { geoip { source => "clientip" } }
After the geoip filter is applied, the event will be enriched with geoip fields. For example:
"geoip" => { "timezone" => "Europe/Moscow", "ip" => "83.149.9.216", "latitude" => 55.7522, "continent_code" => "EU", "city_name" => "Moscow", "country_code2" => "RU", "country_name" => "Russia", "dma_code" => nil, "country_code3" => "RU", "region_name" => "Moscow", "location" => [ [0] 37.6156, [1] 55.7522 ], "postal_code" => "101194", "longitude" => 37.6156, "region_code" => "MOW" }
- jdbc_static filter
-
Enriches events with data pre-loaded from a remote database.
The following example fetches data from a remote database, caches it in a local database, and uses lookups to enrich events with data cached in the local database.
filter { jdbc_static { loaders => [ { id => "remote-servers" query => "select ip, descr from ref.local_ips order by ip" local_table => "servers" }, { id => "remote-users" query => "select firstname, lastname, userid from ref.local_users order by userid" local_table => "users" } ] local_db_objects => [ { name => "servers" index_columns => ["ip"] columns => [ ["ip", "varchar(15)"], ["descr", "varchar(255)"] ] }, { name => "users" index_columns => ["userid"] columns => [ ["firstname", "varchar(255)"], ["lastname", "varchar(255)"], ["userid", "int"] ] } ] local_lookups => [ { id => "local-servers" query => "select descr as description from servers WHERE ip = :ip" parameters => {ip => "[from_ip]"} target => "server" }, { id => "local-users" query => "select firstname, lastname from users WHERE userid = :id" parameters => {id => "[loggedin_userid]"} target => "user" } ] # using add_field here to add & rename values to the event root add_field => { server_name => "%{[server][0][description]}" } add_field => { user_firstname => "%{[user][0][firstname]}" } add_field => { user_lastname => "%{[user][0][lastname]}" } remove_field => ["server", "user"] jdbc_user => "logstash" jdbc_password => "example" jdbc_driver_class => "org.postgresql.Driver" jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar" jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2" } }
Queries an external database to fetch the dataset that will be cached locally.
Defines the columns, types, and indexes used to build the local database structure. The column names and types should match the external database.
Performs lookup queries on the local database to enrich the events.
Specifies the event field that will store the looked-up data. If the lookup returns multiple columns, the data is stored as a JSON object within the field.
Takes data from the JSON object and stores it in top-level event fields for easier analysis in Kibana.
- jdbc_streaming filter
-
Enriches events with database data.
The following example executes a SQL query and stores the result set in a field called
country_details
:filter { jdbc_streaming { jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase" jdbc_user => "me" jdbc_password => "secret" statement => "select * from WORLD.COUNTRY WHERE Code = :code" parameters => { "code" => "country_code"} target => "country_details" } }
- translate filter
-
Replaces field contents based on replacement values specified in a hash or file. Currently supports these file types: YAML, JSON, and CSV.
The following example takes the value of the
response_code
field, translates it to a description based on the values specified in the dictionary, and then removes theresponse_code
field from the event:filter { translate { field => "response_code" destination => "http_response" dictionary => { "200" => "OK" "403" => "Forbidden" "404" => "Not Found" "408" => "Request Timeout" } remove_field => "response_code" } }
- useragent filter
-
Parses user agent strings into fields.
The following example takes the user agent string in the
agent
field, parses it into user agent fields, and adds the user agent fields to a new field calleduser_agent
. It also removes the originalagent
field:filter { useragent { source => "agent" target => "user_agent" remove_field => "agent" } }
After the filter is applied, the event will be enriched with user agent fields. For example:
"user_agent": { "os": "Mac OS X 10.12", "major": "50", "minor": "0", "os_minor": "12", "os_major": "10", "name": "Firefox", "os_name": "Mac OS X", "device": "Other" }