Enriching Data with Lookups

edit

Enriching Data with Lookups

edit

The plugins described in this section are useful for enriching data with additional info, such as GeoIP and user agent info.

dns filter

Performs a standard or reverse DNS lookup.

The following config performs a reverse lookup on the address in the source_host field and replaces it with the domain name:

filter {
  dns {
    reverse => [ "source_host" ]
    action => "replace"
  }
}
elasticsearch filter

Copies fields from previous log events in Elasticsearch to current events.

The following config shows a complete example of how this filter might be used. Whenever Logstash receives an "end" event, it uses this Elasticsearch filter to find the matching "start" event based on some operation identifier. Then it copies the @timestamp field from the "start" event into a new field on the "end" event. Finally, using a combination of the date filter and the ruby filter, the code in the example calculates the time duration in hours between the two events.

      if [type] == "end" {
         elasticsearch {
            hosts => ["es-server"]
            query => "type:start AND operation:%{[opid]}"
            fields => { "@timestamp" => "started" }
         }
         date {
            match => ["[started]", "ISO8601"]
            target => "[started]"
         }
         ruby {
            code => 'event.set("duration_hrs", (event.get("@timestamp") - event.get("started")) / 3600) rescue nil'
         }
      }
geoip filter

Adds geographical information about the location of IP addresses. For example:

filter {
  geoip {
    source => "clientip"
  }
}

After the geoip filter is applied, the event will be enriched with geoip fields. For example:

          "geoip" => {
              "timezone" => "Europe/Moscow",
                    "ip" => "83.149.9.216",
              "latitude" => 55.7522,
        "continent_code" => "EU",
             "city_name" => "Moscow",
         "country_code2" => "RU",
          "country_name" => "Russia",
              "dma_code" => nil,
         "country_code3" => "RU",
           "region_name" => "Moscow",
              "location" => [
            [0] 37.6156,
            [1] 55.7522
        ],
           "postal_code" => "101194",
             "longitude" => 37.6156,
           "region_code" => "MOW"
    }
jdbc_static filter

Enriches events with data pre-loaded from a remote database.

The following example fetches data from a remote database, caches it in a local database, and uses lookups to enrich events with data cached in the local database.

filter {
  jdbc_static {
    loaders => [ 
      {
        id => "remote-servers"
        query => "select ip, descr from ref.local_ips order by ip"
        local_table => "servers"
      },
      {
        id => "remote-users"
        query => "select firstname, lastname, userid from ref.local_users order by userid"
        local_table => "users"
      }
    ]
    local_db_objects => [ 
      {
        name => "servers"
        index_columns => ["ip"]
        columns => [
          ["ip", "varchar(15)"],
          ["descr", "varchar(255)"]
        ]
      },
      {
        name => "users"
        index_columns => ["userid"]
        columns => [
          ["firstname", "varchar(255)"],
          ["lastname", "varchar(255)"],
          ["userid", "int"]
        ]
      }
    ]
    local_lookups => [ 
      {
        id => "local-servers"
        query => "select descr as description from servers WHERE ip = :ip"
        parameters => {ip => "[from_ip]"}
        target => "server"
      },
      {
        id => "local-users"
        query => "select firstname, lastname from users WHERE userid = :id"
        parameters => {id => "[loggedin_userid]"}
        target => "user" 
      }
    ]
    # using add_field here to add & rename values to the event root
    add_field => { server_name => "%{[server][0][description]}" }
    add_field => { user_firstname => "%{[user][0][firstname]}" } 
    add_field => { user_lastname => "%{[user][0][lastname]}" }
    remove_field => ["server", "user"]
    jdbc_user => "logstash"
    jdbc_password => "example"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2"
  }
}

Queries an external database to fetch the dataset that will be cached locally.

Defines the columns, types, and indexes used to build the local database structure. The column names and types should match the external database.

Performs lookup queries on the local database to enrich the events.

Specifies the event field that will store the looked-up data. If the lookup returns multiple columns, the data is stored as a JSON object within the field.

Takes data from the JSON object and stores it in top-level event fields for easier analysis in Kibana.

jdbc_streaming filter

Enriches events with database data.

The following example executes a SQL query and stores the result set in a field called country_details:

filter {
  jdbc_streaming {
    jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "me"
    jdbc_password => "secret"
    statement => "select * from WORLD.COUNTRY WHERE Code = :code"
    parameters => { "code" => "country_code"}
    target => "country_details"
  }
}
translate filter

Replaces field contents based on replacement values specified in a hash or file. Currently supports these file types: YAML, JSON, and CSV.

The following example takes the value of the response_code field, translates it to a description based on the values specified in the dictionary, and then removes the response_code field from the event:

filter {
  translate {
    field => "response_code"
    destination => "http_response"
    dictionary => {
      "200" => "OK"
      "403" => "Forbidden"
      "404" => "Not Found"
      "408" => "Request Timeout"
    }
    remove_field => "response_code"
  }
}
useragent filter

Parses user agent strings into fields.

The following example takes the user agent string in the agent field, parses it into user agent fields, and adds the user agent fields to a new field called user_agent. It also removes the original agent field:

filter {
  useragent {
    source => "agent"
    target => "user_agent"
    remove_field => "agent"
  }
}

After the filter is applied, the event will be enriched with user agent fields. For example:

        "user_agent": {
          "os": "Mac OS X 10.12",
          "major": "50",
          "minor": "0",
          "os_minor": "12",
          "os_major": "10",
          "name": "Firefox",
          "os_name": "Mac OS X",
          "device": "Other"
        }