Aggregating data for faster performance

edit

When you aggregate data, Elasticsearch automatically distributes the calculations across your cluster. Then you can feed this aggregated data into the machine learning features instead of raw results. It reduces the volume of data that must be analyzed.

Requirements

edit

There are a number of requirements for using aggregations in datafeeds.

Aggregations
edit
  • Your aggregation must include a date_histogram aggregation or a top level composite aggregation, which in turn must contain a max aggregation on the time field. It ensures that the aggregated data is a time series and the timestamp of each bucket is the time of the last record in the bucket.
  • The time_zone parameter in the date histogram aggregation must be set to UTC, which is the default value.
  • The name of the aggregation and the name of the field that it operates on need to match. For example, if you use a max aggregation on a time field called responsetime, the name of the aggregation must also be responsetime.
  • For composite aggregation support, there must be exactly one date_histogram value source. That value source must not be sorted in descending order. Additional composite aggregation value sources are allowed, such as terms.
  • The size parameter of the non-composite aggregations must match the cardinality of your data. A greater value of the size parameter increases the memory requirement of the aggregation.
  • If you set the summary_count_field_name property to a non-null value, the anomaly detection job expects to receive aggregated input. The property must be set to the name of the field that contains the count of raw data points that have been aggregated. It applies to all detectors in the job.
  • The influencers or the partition fields must be included in the aggregation of your datafeed, otherwise they are not included in the job analysis. For more information on influencers, refer to Influencers.
Intervals
edit
  • The bucket span of your anomaly detection job must be divisible by the value of the calendar_interval or fixed_interval in your aggregation (with no remainder).
  • If you specify a frequency for your datafeed, it must be divisible by the calendar_interval or the fixed_interval.
  • Anomaly detection jobs cannot use date_histogram or composite aggregations with an interval measured in months because the length of the month is not fixed; they can use weeks or smaller units.

Limitations

edit
  • If your datafeed uses aggregations with nested terms aggs and model plot is not enabled for the anomaly detection job, neither the Single Metric Viewer nor the Anomaly Explorer can plot and display an anomaly chart. In these cases, an explanatory message is shown instead of the chart.
  • Your datafeed can contain multiple aggregations, but only the ones with names that match values in the job configuration are fed to the job.

Recommendations

edit
  • When your detectors use metric or sum analytical functions, it’s recommended to set the date_histogram or composite aggregation interval to a tenth of the bucket span. This creates finer, more granular time buckets, which are ideal for this type of analysis.
  • When your detectors use count or rare functions, set the interval to the same value as the bucket span.
  • If you have multiple influencers or partition fields or if your field cardinality is more than 1000, use composite aggregations.

    To determine the cardinality of your data, you can run searches such as:

    GET .../_search
    {
      "aggs": {
        "service_cardinality": {
          "cardinality": {
            "field": "service"
          }
        }
      }
    }

Including aggregations in anomaly detection jobs

edit

When you create or update an anomaly detection job, you can include aggregated fields in the analysis configuration. In the datafeed configuration object, you can define the aggregations.

PUT _ml/anomaly_detectors/kibana-sample-data-flights
{
  "analysis_config": {
    "bucket_span": "60m",
    "detectors": [{
      "function": "mean",
      "field_name": "responsetime",  
      "by_field_name": "airline"  
    }],
    "summary_count_field_name": "doc_count" 
  },
  "data_description": {
    "time_field":"time"  
  },
  "datafeed_config":{
    "indices": ["kibana-sample-data-flights"],
    "aggregations": {
      "buckets": {
        "date_histogram": {
          "field": "time",
          "fixed_interval": "360s",
          "time_zone": "UTC"
        },
        "aggregations": {
          "time": {  
            "max": {"field": "time"}
          },
          "airline": {  
            "terms": {
             "field": "airline",
              "size": 100
            },
            "aggregations": {
              "responsetime": {  
                "avg": {
                  "field": "responsetime"
                }
              }
            }
          }
        }
      }
    }
  }
}

The airline, responsetime, and time fields are aggregations. Only the aggregated fields defined in the analysis_config object are analyzed by the anomaly detection job.

The summary_count_field_name property is set to the doc_count field that is an aggregated field and contains the count of the aggregated data points.

The aggregations have names that match the fields that they operate on. The max aggregation is named time and its field also needs to be time.

The term aggregation is named airline and its field is also named airline.

The avg aggregation is named responsetime and its field is also named responsetime.

Use the following format to define a date_histogram aggregation to bucket by time in your datafeed:

"aggregations": {
  ["bucketing_aggregation": {
    "bucket_agg": {
      ...
    },
    "aggregations": {
      "data_histogram_aggregation": {
        "date_histogram": {
          "field": "time",
        },
        "aggregations": {
          "timestamp": {
            "max": {
              "field": "time"
            }
          },
          [,"<first_term>": {
            "terms":{...
            }
            [,"aggregations" : {
              [<sub_aggregation>]+
            } ]
          }]
        }
      }
    }
  }
}

Composite aggregations

edit

Composite aggregations are optimized for queries that are either match_all or range filters. Use composite aggregations in your datafeeds for these cases. Other types of queries may cause the composite aggregation to be inefficient.

The following is an example of a job with a datafeed that uses a composite aggregation to bucket the metrics based on time and terms:

PUT _ml/anomaly_detectors/kibana-sample-data-flights-composite
{
  "analysis_config": {
    "bucket_span": "60m",
    "detectors": [{
      "function": "mean",
      "field_name": "responsetime",
      "by_field_name": "airline"
    }],
    "summary_count_field_name": "doc_count"
  },
  "data_description": {
    "time_field":"time"
  },
  "datafeed_config":{
    "indices": ["kibana-sample-data-flights"],
    "aggregations": {
      "buckets": {
        "composite": {
          "size": 1000,  
          "sources": [
            {
              "time_bucket": {  
                "date_histogram": {
                  "field": "time",
                  "fixed_interval": "360s",
                  "time_zone": "UTC"
                }
              }
            },
            {
              "airline": {  
                "terms": {
                  "field": "airline"
                }
              }
            }
          ]
        },
        "aggregations": {
          "time": {  
            "max": {
              "field": "time"
            }
          },
          "responsetime": { 
            "avg": {
              "field": "responsetime"
            }
          }
        }
      }
    }
  }
}

The number of resources to use when aggregating the data. A larger size means a faster datafeed but more cluster resources are used when searching.

The required date_histogram composite aggregation source. Make sure it is named differently than your desired time field.

Instead of using a regular term aggregation, adding a composite aggregation term source with the name airline works. Note its name is the same as the field.

The required max aggregation whose name is the time field in the job analysis config.

The avg aggregation is named responsetime and its field is also named responsetime.

Use the following format to define a composite aggregation in your datafeed:

"aggregations": {
  "composite_agg": {
    "sources": [
      {
        "date_histogram_agg": {
          "field": "time",
          ...settings...
        }
      },
      ...other valid sources...
      ],
      ...composite agg settings...,
      "aggregations": {
        "timestamp": {
            "max": {
              "field": "time"
            }
          },
          ...other aggregations...
          [
            [,"aggregations" : {
              [<sub_aggregation>]+
            } ]
          }]
      }
   }
}

Nested aggregations

edit

You can also use complex nested aggregations in datafeeds.

The next example uses the derivative pipeline aggregation to find the first order derivative of the counter system.network.out.bytes for each value of the field beat.name.

derivative or other pipeline aggregations may not work within composite aggregations. See composite aggregations and pipeline aggregations.

"aggregations": {
  "beat.name": {
    "terms": {
      "field": "beat.name"
    },
    "aggregations": {
      "buckets": {
        "date_histogram": {
          "field": "@timestamp",
          "fixed_interval": "5m"
        },
        "aggregations": {
          "@timestamp": {
            "max": {
              "field": "@timestamp"
            }
          },
          "bytes_out_average": {
            "avg": {
              "field": "system.network.out.bytes"
            }
          },
          "bytes_out_derivative": {
            "derivative": {
              "buckets_path": "bytes_out_average"
            }
          }
        }
      }
    }
  }
}

Single bucket aggregations

edit

You can also use single bucket aggregations in datafeeds. The following example shows two filter aggregations, each gathering the number of unique entries for the error field.

{
  "job_id":"servers-unique-errors",
  "indices": ["logs-*"],
  "aggregations": {
    "buckets": {
      "date_histogram": {
        "field": "time",
        "interval": "360s",
        "time_zone": "UTC"
      },
      "aggregations": {
        "time": {
          "max": {"field": "time"}
        }
        "server1": {
          "filter": {"term": {"source": "server-name-1"}},
          "aggregations": {
            "server1_error_count": {
              "value_count": {
                "field": "error"
              }
            }
          }
        },
        "server2": {
          "filter": {"term": {"source": "server-name-2"}},
          "aggregations": {
            "server2_error_count": {
              "value_count": {
                "field": "error"
              }
            }
          }
        }
      }
    }
  }
}