Enrich your Elasticsearch documents within Elasticsearch
With Elasticsearch®, we know that joins should be done "at index time" instead of query time. This blog post starts a series of three posts as there are many approaches we can take within the Elastic® ecosystem. We are going to cover how to do it in Elasticsearch. The next blog post will cover how to do this with a centralized component — Logstash — and the last post will show how to do this on the edge with Elastic Agent/Beats.
As a trivial example, let's say that we are a eCommerce website, collecting logs in kibana_sample_data_logs
:
{
"agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"bytes": 1831,
"clientip": "30.156.16.164",
"extension": "",
"geo": {
"srcdest": "US:IN",
"src": "US",
"dest": "IN",
"coordinates": {
"lat": 55.53741389,
"lon": -132.3975144
}
},
"host": "elastic-elastic-elastic.org",
"index": "kibana_sample_data_logs",
"ip": "30.156.16.163",
"machine": {
"ram": 9663676416,
"os": "win xp"
},
"memory": 73240,
"message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
"phpmemory": 73240,
"referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
"request": "/wp-login.php",
"response": 404,
"tags": [
"success",
"info"
],
"timestamp": "2023-03-18T12:43:49.756Z",
"url": "https://elastic-elastic-elastic.org/wp-login.php",
"utc_time": "2023-03-18T12:43:49.756Z",
"event": {
"dataset": "sample_web_logs"
}
}
Note that you can easily import this dataset using Kibana® sample datasets by clicking on the "Add data" button within the "Sample web logs" box:
We also have an index vip
which contains information about our customers:
{
"ip" : "30.156.16.164",
"vip": true,
"name": "David P"
}
To import this sample data set, we just ran:
DELETE /vip
PUT /vip
{
"mappings": {
"properties": {
"ip": { "type": "keyword" },
"name": { "type": "text" },
"vip": { "type": "boolean" }
}
}
}
POST /vip/_bulk
{ "index" : { } }
{ "ip" : "30.156.16.164", "vip": true, "name": "David P" }
{ "index" : { } }
{ "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" }
{ "index" : { } }
{ "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" }
{ "index" : { } }
{ "ip" : "236.212.255.77", "vip": true, "name": "Carly R" }
{ "index" : { } }
{ "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" }
{ "index" : { } }
{ "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" }
{ "index" : { } }
{ "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" }
{ "index" : { } }
{ "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }
To do "joins at index time," we need to enrich our data set to get at the end logs looking like:
{
"agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
"bytes": 1831,
"clientip": "30.156.16.164",
"extension": "",
"geo": {
"srcdest": "US:IN",
"src": "US",
"dest": "IN",
"coordinates": {
"lat": 55.53741389,
"lon": -132.3975144
}
},
"host": "elastic-elastic-elastic.org",
"index": "kibana_sample_data_logs",
"ip": "30.156.16.163",
"machine": {
"ram": 9663676416,
"os": "win xp"
},
"memory": 73240,
"message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
"phpmemory": 73240,
"referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
"request": "/wp-login.php",
"response": 404,
"tags": [
"success",
"info"
],
"timestamp": "2023-03-18T12:43:49.756Z",
"url": "https://elastic-elastic-elastic.org/wp-login.php",
"utc_time": "2023-03-18T12:43:49.756Z",
"event": {
"dataset": "sample_web_logs"
},
"vip": true,
"name": "David P"
}
That's something you can do out of the box with the Elasticsearch Enrich Processor within an ingest pipeline. Let's have a look on how to do this.
Enriching Elasticsearch data within Elasticsearch
Ingest pipeline
Let's first use an ingest pipeline.
We can start with an empty one that we will be using to simulate the behavior we want. We don't need the full set of fields of the original data set, so we over simplify it:
POST /_ingest/pipeline/_simulate
{
"docs": [
{
"_source": {
"clientip": "30.156.16.164"
}
}
],
"pipeline": {
"processors": []
}
}
We now need to add an enrich processor to our pipeline. But for this, we need to first create an enrich policy:
PUT /_enrich/policy/vip-policy
{
"match": {
"indices": "vip",
"match_field": "ip",
"enrich_fields": ["name", "vip"]
}
}
Once the enrich policy is created, we can execute it using the execute enrich policy API:
PUT /_enrich/policy/vip-policy/_execute
We can now simulate it:
POST /_ingest/pipeline/_simulate
{
"docs": [
{
"_source": {
"clientip": "30.156.16.164"
}
}
],
"pipeline": {
"processors": [{
"enrich": {
"policy_name": "vip-policy",
"field": "clientip",
"target_field": "enriched"
}
}]
}
}
Which gives:
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"enriched": {
"name": "David P",
"vip": true,
"ip": "30.156.16.164"
},
"clientip": "30.156.16.164"
},
"_ingest": {
"timestamp": "2023-04-06T17:14:29.127569953Z"
}
}
}
]
}
We just have to clean a bit the data to have the structure we are expecting:
POST /_ingest/pipeline/_simulate
{
"docs": [
{
"_source": {
"clientip": "30.156.16.164"
}
}
],
"pipeline": {
"processors": [{
"enrich": {
"policy_name": "vip-policy",
"field": "clientip",
"target_field": "enriched"
}
},{
"rename": {
"field": "enriched.name",
"target_field": "name"
}
},{
"rename": {
"field": "enriched.vip",
"target_field": "vip"
}
},{
"remove": {
"field": "enriched"
}
}
]
}
}
Which now gives the expected result:
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"name": "David P",
"vip": true,
"clientip": "30.156.16.164"
},
"_ingest": {
"timestamp": "2023-04-06T17:16:08.175186282Z"
}
}
}
]
}
We can now store our final pipeline:
PUT /_ingest/pipeline/vip
{
"processors": [{
"enrich": {
"policy_name": "vip-policy",
"field": "clientip",
"target_field": "enriched"
}
},{
"rename": {
"field": "enriched.name",
"target_field": "name",
"ignore_failure": true
}
},{
"rename": {
"field": "enriched.vip",
"target_field": "vip",
"ignore_failure": true
}
},{
"remove": {
"field": "enriched",
"ignore_failure": true
}
}
]
}
Note that we changed it a bit by adding some ignore_failure
directives as we may not find any related data within the vip
index.
We can create the target index using the same mapping as the source index:
# Get the source mapping
GET /kibana_sample_data_logs/_mapping
# Create the destination index
PUT /kibana_sample_data_logs_new
{
// Paste the source mappings structure
"mappings": {
"properties": {
// And add the properties we are adding
"name": {
"type": "keyword"
},
"vip": {
"type": "boolean"
}
}
}
}
And call the reindex API:
POST _reindex
{
"source": {
"index": "kibana_sample_data_logs"
},
"dest": {
"index": "kibana_sample_data_logs_new",
"pipeline": "vip"
}
}
Let's check that the job has been done:
GET /kibana_sample_data_logs_new/_search?filter_path=aggregations.by_name.buckets
{
"size": 0,
"aggs": {
"by_name": {
"terms": {
"field": "name"
}
}
}
}
This gives:
{
"aggregations": {
"by_name": {
"buckets": [
{
"key": "David P",
"doc_count": 100
},
{
"key": "Philipp K",
"doc_count": 29
},
{
"key": "Adrienne V",
"doc_count": 26
},
{
"key": "Carly R",
"doc_count": 26
},
{
"key": "Iulia F",
"doc_count": 25
},
{
"key": "Naoise R",
"doc_count": 25
},
{
"key": "Jelena Z",
"doc_count": 24
},
{
"key": "Matt R",
"doc_count": 24
}
]
}
}
}
Runtime field enrich
Another way to enrich your data would be to do this at search time instead of index time. That goes against the first sentence of this post, but sometimes, you need to do some tradeoffs. Here, we want to trade the flexibility with the search speed.
The runtime field feature allows enriching the search response object but can not be used to query or aggregate data. A simple example of this feature:
GET kibana_sample_data_logs/_search?filter_path=hits.hits.fields
{
"size": 1,
"query": {
"match": {
"clientip": "30.156.16.164"
}
},
"runtime_mappings": {
"enriched": {
"type": "lookup",
"target_index": "vip",
"input_field": "clientip",
"target_field": "ip",
"fetch_fields": ["name", "vip"]
}
},
"fields": [
"clientip",
"enriched"
],
"_source": false
}
This now gives:
{
"hits": {
"hits": [
{
"fields": {
"enriched": [
{
"name": [
"David P"
],
"vip": [
true
]
}
],
"clientip": [
"30.156.16.164"
]
}
}
]
}
}
Note that this could be also be added as part of the mapping with:
PUT kibana_sample_data_logs/_mappings
{
"runtime": {
"enriched": {
"type": "lookup",
"target_index": "vip",
"input_field": "clientip",
"target_field": "ip",
"fetch_fields": ["name", "vip"]
}
}
}
GET kibana_sample_data_logs/_search
{
"size": 1,
"query": {
"match": {
"clientip": "30.156.16.164"
}
},
"fields": [
"clientip",
"enriched"
]
}
But if you want to be able to search or aggregate on those fields, you need to actually emit some content at search time.
Note that we can not do lookups in another index using this method. So because, and only because, the length of the list is tiny, we can use a script to do the "enrichment" on the fly:
PUT kibana_sample_data_logs/_mappings
{
"runtime": {
"name": {
"type": "keyword",
"script": {
"source":
"""
def name=params.name;
for (int i=0; i< params.lookup.length; i++) {
if (params.lookup[i].ip == doc['clientip'].value) {
emit(params.lookup[i].name);
break;
}
}
""",
"lang": "painless",
"params": {
"name": "David P",
"lookup": [
{ "ip" : "30.156.16.164", "vip": true, "name": "David P" },
{ "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" },
{ "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" },
{ "ip" : "236.212.255.77", "vip": true, "name": "Carly R" },
{ "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" },
{ "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" },
{ "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" },
{ "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }
]
}
}
},
"vip": {
"type": "boolean",
"script": {
"source":
"""
def name=params.name;
for (int i=0; i< params.lookup.length; i++) {
if (params.lookup[i].ip == doc['clientip'].value) {
emit(params.lookup[i].vip);
break;
}
}
""",
"lang": "painless",
"params": {
"name": "David P",
"lookup": [
{ "ip" : "30.156.16.164", "vip": true, "name": "David P" },
{ "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" },
{ "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" },
{ "ip" : "236.212.255.77", "vip": true, "name": "Carly R" },
{ "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" },
{ "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" },
{ "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" },
{ "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }
]
}
}
}
}
}
Once again, we can aggregate on those runtime fields:
GET /kibana_sample_data_logs/_search?filter_path=aggregations.by_name.buckets
{
"size": 0,
"aggs": {
"by_name": {
"terms": {
"field": "name"
}
}
}
}
Which gives the same result as we saw previously but of course a bit slower:
{
"aggregations": {
"by_name": {
"buckets": [
{
"key": "David P",
"doc_count": 100
},
{
"key": "Philipp K",
"doc_count": 29
},
{
"key": "Adrienne V",
"doc_count": 26
},
{
"key": "Carly R",
"doc_count": 26
},
{
"key": "Iulia F",
"doc_count": 25
},
{
"key": "Naoise R",
"doc_count": 25
},
{
"key": "Jelena Z",
"doc_count": 24
},
{
"key": "Matt R",
"doc_count": 24
}
]
}
}
}
Again, this method will not work for big indices so reindexing your data as we saw in the first part, will be the preferred way to go.
The release and timing of any features or functionality described in this post remain at Elastic's sole discretion. Any features or functionality not currently available may not be delivered on time or at all.