Filter and aggregate logs

edit

[preview] This functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.

Filter and aggregate your log data to find specific information, gain insight, and monitor your systems more efficiently. You can filter and aggregate based on structured fields like timestamps, log levels, and IP addresses that you’ve extracted from your log data.

This guide shows you how to:

  • Filter logs: Narrow down your log data by applying specific criteria.
  • Aggregate logs: Analyze and summarize data to find patterns and gain insight.
Before you get started
edit

Required role

The Admin role or higher is required to create ingest pipelines and set the index template. To learn more, refer to Assign user roles and privileges.

The examples on this page use the following ingest pipeline and index template, which you can set in Developer Tools. If you haven’t used ingest pipelines and index templates to parse your log data and extract structured fields yet, start with the Parse and organize logs documentation.

Set the ingest pipeline with the following command:

PUT _ingest/pipeline/logs-example-default
{
  "description": "Extracts the timestamp log level and host ip",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{@timestamp} %{log.level} %{host.ip} %{message}"
      }
    }
  ]
}

Set the index template with the following command:

PUT _index_template/logs-example-default-template
{
  "index_patterns": [ "logs-example-*" ],
  "data_stream": { },
  "priority": 500,
  "template": {
    "settings": {
      "index.default_pipeline":"logs-example-default"
    }
  },
  "composed_of": [
    "logs-mappings",
    "logs-settings",
    "logs@custom",
    "ecs@dynamic_templates"
  ],
  "ignore_missing_component_templates": ["logs@custom"]
}
Filter logs
edit

Filter your data using the fields you’ve extracted so you can focus on log data with specific log levels, timestamp ranges, or host IPs. You can filter your log data in different ways:

Filter logs in Logs Explorer
edit

Logs Explorer is a tool that automatically provides views of your log data based on integrations and data streams. To open Logs Explorer, go to Discover and select the Logs Explorer tab.

From Logs Explorer, you can use the Kibana Query Language (KQL) in the search bar to narrow down the log data that’s displayed. For example, you might want to look into an event that occurred within a specific time range.

Add some logs with varying timestamps and log levels to your data stream:

  1. In your Observability project, go to Developer Tools.
  2. In the Console tab, run the following command:
POST logs-example-default/_bulk
{ "create": {} }
{ "message": "2023-09-15T08:15:20.234Z WARN 192.168.1.101 Disk usage exceeds 90%." }
{ "create": {} }
{ "message": "2023-09-14T10:30:45.789Z ERROR 192.168.1.102 Critical system failure detected." }
{ "create": {} }
{ "message": "2023-09-10T14:20:45.789Z ERROR 192.168.1.105 Database connection lost." }
{ "create": {} }
{ "message": "2023-09-20T09:40:32.345Z INFO 192.168.1.106 User logout initiated." }

For this example, let’s look for logs with a WARN or ERROR log level that occurred on September 14th or 15th. From Logs Explorer:

  1. Add the following KQL query in the search bar to filter for logs with log levels of WARN or ERROR:

    log.level: ("ERROR" or "WARN")
  2. Click the current time range, select Absolute, and set the Start date to Sep 14, 2023 @ 00:00:00.000.

    Set the time range start date

  3. Click the end of the current time range, select Absolute, and set the End date to Sep 15, 2023 @ 23:59:59.999.

    Set the time range end date

Under the Documents tab, you’ll see the filtered log data matching your query.

logs kql filter

For more on using Logs Explorer, refer to the Discover documentation.

Filter logs with Query DSL
edit

Query DSL is a JSON-based language that sends requests and retrieves data from indices and data streams. You can filter your log data using Query DSL from Developer Tools.

For example, you might want to troubleshoot an issue that happened on a specific date or at a specific time. To do this, use a boolean query with a range query to filter for the specific timestamp range and a term query to filter for WARN and ERROR log levels.

First, from Developer Tools, add some logs with varying timestamps and log levels to your data stream with the following command:

POST logs-example-default/_bulk
{ "create": {} }
{ "message": "2023-09-15T08:15:20.234Z WARN 192.168.1.101 Disk usage exceeds 90%." }
{ "create": {} }
{ "message": "2023-09-14T10:30:45.789Z ERROR 192.168.1.102 Critical system failure detected." }
{ "create": {} }
{ "message": "2023-09-10T14:20:45.789Z ERROR 192.168.1.105 Database connection lost." }
{ "create": {} }
{ "message": "2023-09-20T09:40:32.345Z INFO 192.168.1.106 User logout initiated." }

Let’s say you want to look into an event that occurred between September 14th and 15th. The following boolean query filters for logs with timestamps during those days that also have a log level of ERROR or WARN.

POST /logs-example-default/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "2023-09-14T00:00:00",
              "lte": "2023-09-15T23:59:59"
            }
          }
        },
        {
          "terms": {
            "log.level": ["WARN", "ERROR"]
          }
        }
      ]
    }
  }
}

The filtered results should show WARN and ERROR logs that occurred within the timestamp range:

{
  ...
  "hits": {
    ...
    "hits": [
      {
        "_index": ".ds-logs-example-default-2023.09.25-000001",
        "_id": "JkwPzooBTddK4OtTQToP",
        "_score": 0,
        "_source": {
          "message": "192.168.1.101 Disk usage exceeds 90%.",
          "log": {
            "level": "WARN"
          },
          "@timestamp": "2023-09-15T08:15:20.234Z"
        }
      },
      {
        "_index": ".ds-logs-example-default-2023.09.25-000001",
        "_id": "A5YSzooBMYFrNGNwH75O",
        "_score": 0,
        "_source": {
          "message": "192.168.1.102 Critical system failure detected.",
          "log": {
            "level": "ERROR"
          },
          "@timestamp": "2023-09-14T10:30:45.789Z"
        }
      }
    ]
  }
}
Aggregate logs
edit

Use aggregation to analyze and summarize your log data to find patterns and gain insight. Bucket aggregations organize log data into meaningful groups making it easier to identify patterns, trends, and anomalies within your logs.

For example, you might want to understand error distribution by analyzing the count of logs per log level.

First, from Developer Tools, add some logs with varying log levels to your data stream using the following command:

POST logs-example-default/_bulk
{ "create": {} }
{ "message": "2023-09-15T08:15:20.234Z WARN 192.168.1.101 Disk usage exceeds 90%." }
{ "create": {} }
{ "message": "2023-09-14T10:30:45.789Z ERROR 192.168.1.102 Critical system failure detected." }
{ "create": {} }
{ "message": "2023-09-15T12:45:55.123Z INFO 192.168.1.103 Application successfully started." }
{ "create": {} }
{ "message": "2023-09-14T15:20:10.789Z WARN 192.168.1.104 Network latency exceeding threshold." }
{ "create": {} }
{ "message": "2023-09-10T14:20:45.789Z ERROR 192.168.1.105 Database connection lost." }
{ "create": {} }
{ "message": "2023-09-20T09:40:32.345Z INFO 192.168.1.106 User logout initiated." }
{ "create": {} }
{ "message": "2023-09-21T15:20:55.678Z DEBUG 192.168.1.102 Database connection established." }

Next, run this command to aggregate your log data using the log.level field:

POST logs-example-default/_search?size=0&filter_path=aggregations
{
"size": 0,  
"aggs": {
    "log_level_distribution": {
      "terms": {
        "field": "log.level"
      }
    }
  }
}

Searches with an aggregation return both the query results and the aggregation, so you would see the logs matching the data and the aggregation. Setting size to 0 limits the results to aggregations.

The results should show the number of logs in each log level:

{
  "aggregations": {
    "error_distribution": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "ERROR",
          "doc_count": 2
        },
        {
          "key": "INFO",
          "doc_count": 2
        },
        {
          "key": "WARN",
          "doc_count": 2
        },
        {
          "key": "DEBUG",
          "doc_count": 1
        }
      ]
    }
  }
}

You can also combine aggregations and queries. For example, you might want to limit the scope of the previous aggregation by adding a range query:

GET /logs-example-default/_search
{
  "size": 0,
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2023-09-14T00:00:00",
        "lte": "2023-09-15T23:59:59"
      }
    }
  },
  "aggs": {
    "my-agg-name": {
      "terms": {
        "field": "log.level"
      }
    }
  }
}

The results should show an aggregate of logs that occurred within your timestamp range:

{
  ...
  "hits": {
    ...
    "hits": []
  },
  "aggregations": {
    "my-agg-name": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "WARN",
          "doc_count": 2
        },
        {
          "key": "ERROR",
          "doc_count": 1
        },
        {
          "key": "INFO",
          "doc_count": 1
        }
      ]
    }
  }
}

For more on aggregation types and available aggregations, refer to the Aggregations documentation.