Aggregating data for faster performance
editAggregating data for faster performance
editBy default, datafeeds fetch data from Elasticsearch using search and scroll requests. It can be significantly more efficient, however, to aggregate data in Elasticsearch and to configure your jobs to analyze aggregated data.
One of the benefits of aggregating data this way is that Elasticsearch automatically distributes these calculations across your cluster. You can then feed this aggregated data into the machine learning features instead of raw results, which reduces the volume of data that must be considered while detecting anomalies.
There are some limitations to using aggregations in datafeeds, however.
Your aggregation must include a date_histogram
aggregation, which in turn must
contain a max
aggregation on the time field. This requirement ensures that the
aggregated data is a time series and the timestamp of each bucket is the time
of the last record in the bucket. If you use a terms aggregation and the
cardinality of a term is high, then the aggregation might not be effective and
you might want to just use the default search and scroll behavior.
When you create or update a job, you can include the names of aggregations, for example:
PUT _ml/anomaly_detectors/farequote { "analysis_config": { "bucket_span": "60m", "detectors": [{ "function": "mean", "field_name": "responsetime", "by_field_name": "airline" }], "summary_count_field_name": "doc_count" }, "data_description": { "time_field":"time" } }
In this example, the airline
, responsetime
, and time
fields are
aggregations.
When the summary_count_field_name
property is set to a non-null value,
the job expects to receive aggregated input. The property must be set to the
name of the field that contains the count of raw data points that have been
aggregated. It applies to all detectors in the job.
The aggregations are defined in the datafeed as follows:
PUT _ml/datafeeds/datafeed-farequote { "job_id":"farequote", "indices": ["farequote"], "aggregations": { "buckets": { "date_histogram": { "field": "time", "interval": "360s", "time_zone": "UTC" }, "aggregations": { "time": { "max": {"field": "time"} }, "airline": { "terms": { "field": "airline", "size": 100 }, "aggregations": { "responsetime": { "avg": { "field": "responsetime" } } } } } } } }
In this example, the aggregations have names that match the fields that they
operate on. That is to say, the max
aggregation is named time
and its
field is also time
. The same is true for the aggregations with the names
airline
and responsetime
. Since you must create the job before you can
create the datafeed, synchronizing your aggregation and field names can simplify
these configuration steps.
If you use a max
aggregation on a time field, the aggregation name
in the datafeed must match the name of the time field, as in the previous example.
For all other aggregations, if the aggregation name doesn’t match the field name,
there are limitations in the drill-down functionality within the machine learning page in
Kibana.
Datafeeds support complex nested aggregations, this example uses the derivative
pipeline aggregation to find the first order derivative of the counter
system.network.out.bytes
for each value of the field beat.name
.
"aggregations": { "beat.name": { "terms": { "field": "beat.name" }, "aggregations": { "buckets": { "date_histogram": { "field": "@timestamp", "interval": "5m" }, "aggregations": { "@timestamp": { "max": { "field": "@timestamp" } }, "bytes_out_average": { "avg": { "field": "system.network.out.bytes" } }, "bytes_out_derivative": { "derivative": { "buckets_path": "bytes_out_average" } } } } } } }
Datafeeds not only supports multi-bucket aggregations, but also single bucket aggregations.
The following shows two filter
aggregations, each gathering the number of unique entries for
the error
field.
{ "job_id":"servers-unique-errors", "indices": ["logs-*"], "aggregations": { "buckets": { "date_histogram": { "field": "time", "interval": "360s", "time_zone": "UTC" }, "aggregations": { "time": { "max": {"field": "time"} } "server1": { "filter": {"term": {"source": "server-name-1"}}, "aggregations": { "server1_error_count": { "value_count": { "field": "error" } } } }, "server2": { "filter": {"term": {"source": "server-name-2"}}, "aggregations": { "server2_error_count": { "value_count": { "field": "error" } } } } } } } }
When you define an aggregation in a datafeed, it must have the following form:
"aggregations": { ["bucketing_aggregation": { "bucket_agg": { ... }, "aggregations": {] "data_histogram_aggregation": { "date_histogram": { "field": "time", }, "aggregations": { "timestamp": { "max": { "field": "time" } }, [,"<first_term>": { "terms":{... } [,"aggregations" : { [<sub_aggregation>]+ } ] }] } } } } }
The top level aggregation must be either a Bucket Aggregation
containing as single sub-aggregation that is a date_histogram
or the top level aggregation
is the required date_histogram
. There must be exactly 1 date_histogram
aggregation.
For more information, see
Date Histogram Aggregation.
The time_zone
parameter in the date histogram aggregation must be set to UTC
,
which is the default value.
Each histogram bucket has a key, which is the bucket start time. This key cannot be used for aggregations in datafeeds, however, because they need to know the time of the latest record within a bucket. Otherwise, when you restart a datafeed, it continues from the start time of the histogram bucket and possibly fetches the same data twice. The max aggregation for the time field is therefore necessary to provide the time of the latest record within a bucket.
You can optionally specify a terms aggregation, which creates buckets for different values of a field.
If you use a terms aggregation, by default it returns buckets for the top ten terms. Thus if the cardinality of the term is greater than 10, not all terms are analyzed.
You can change this behavior by setting the size
parameter. To
determine the cardinality of your data, you can run searches such as:
GET .../_search { "aggs": { "service_cardinality": { "cardinality": { "field": "service" } } } }
By default, Elasticsearch limits the maximum number of terms returned to 10000. For high cardinality fields, the query might not run. It might return errors related to circuit breaking exceptions that indicate that the data is too large. In such cases, do not use aggregations in your datafeed. For more information, see Terms Aggregation.
You can also optionally specify multiple sub-aggregations. The sub-aggregations are aggregated for the buckets that were created by their parent aggregation. For more information, see Aggregations.
If your detectors use metric or sum analytical functions, set the
interval
of the date histogram aggregation to a tenth of the bucket_span
that was defined in the job. This suggestion creates finer, more granular time
buckets, which are ideal for this type of analysis. If your detectors use count
or rare functions, set interval
to the same value as bucket_span
. For more
information about analytical functions, see Function reference.