Tutorial: Transform data with custom ingest pipelines
editTutorial: Transform data with custom ingest pipelines
editThis tutorial explains how to add a custom ingest pipeline to an Elastic Integration. Custom pipelines can be used to add custom data processing, like adding fields, obfuscate sensitive information, and more.
Scenario: You have Elastic Agents collecting system metrics with the System integration.
Goal: Add a custom ingest pipeline that adds a new field to each Elasticsearch document before it is indexed.
Step 1: Create a custom ingest pipeline
editCreate a custom ingest pipeline that will be called by the default integration pipeline. In this tutorial, we’ll create a pipeline that adds a new field to our documents.
- In Kibana, navigate to Stack Management → Ingest Pipelines → Create pipeline → New pipeline.
-
Name your pipeline. We’ll call this one,
add_field
. -
Select Add a processor. Fill out the following information:
- Processor: "Set"
-
Field:
test
-
Value:
true
The Set processor sets a document field and associates it with the specified value.
- Click Add.
- Click Create pipeline.
Step 2: Apply your ingest pipeline
editAdd a custom pipeline to an integration by calling it from the default ingest pipeline. The custom pipeline will run after the default pipeline but before the final pipeline.
Edit integration
editAdd a custom pipeline to an integration from the Edit integration workflow. The integration must already be configured and installed before a custom pipeline can be added. To enter this workflow, do the following:
- Navigate to Fleet
- Select the relevant Elastic Agent policy
- Search for the integration you want to edit
- Select Actions → Edit integration
Select a data stream
editMost integrations write to multiple data streams. You’ll need to add the custom pipeline to each data stream individually.
- Find the first data stream you wish to edit and select Change defaults. For this tutorial, find the data stream configuration titled, Collect metrics from System instances.
-
Scroll to System CPU metrics and under Advanced options select Add custom pipeline.
This will take you to the Create pipeline workflow in Stack management.
Add the pipeline
editAdd the pipeline you created in step one.
-
Select Add a processor. Fill out the following information:
- Processor: "Pipeline"
- Pipeline name: "add_field"
-
Value:
true
- Click Create pipeline to return to the Edit integration page.
Roll over the data stream (optional)
editFor pipeline changes to take effect immediately, you must roll over the data stream. If you do not, the changes will not take effect until the next scheduled roll over. Select Apply now and rollover.
After the data stream rolls over, note the name of the custom ingest pipeline.
In this tutorial, it’s metrics-system.cpu@custom
.
The name follows the pattern <type>-<dataset>@custom
:
-
type:
metrics
-
dataset:
system.cpu
-
Custom ingest pipeline designation:
@custom
Repeat
editAdd the custom ingest pipeline to any other data streams you wish to update.
Step 3: Test the ingest pipeline (optional)
editAllow time for new data to be ingested before testing your pipeline. In a new window, open Kibana and navigate to Kibana Dev tools.
Use an exists query to ensure that the new field, "test" is being applied to documents.
The data stream to search. In this tutorial, we’ve edited the |
|
The name of the field set in step one. |
If your custom pipeline is working correctly, this query will return at least one document.
Step 4: Add custom mappings
editNow that a new field is being set in your Elasticsearch documents, you’ll want to assign a new mapping for that field.
Use the @custom
component template to apply custom mappings to an integration data stream.
In the Edit integration workflow, do the following:
-
Under Advanced options select the pencil icon to edit the
@custom
component template. -
Define the new field for your indexed documents. Select Add field and add the following information:
-
Field name:
test
-
Field type:
Boolean
-
Field name:
- Click Add field.
- Click Review to fast-forward to the review step and click Save component template to return to the Edit integration workflow.
- For changes to take effect immediately, select Apply now and rollover.
Step 5: Test the custom mappings (optional)
editAllow time for new data to be ingested before testing your mappings. In a new window, open Kibana and navigate to Kibana Dev tools.
Use the Get field mapping API to ensure that the custom mapping has been applied.
The data stream to search. In this tutorial, we’ve edited the |
The result should include type: "boolean"
for the specified field.
".ds-metrics-system.cpu-default-2022.08.10-000002": { "mappings": { "test": { "full_name": "test", "mapping": { "test": { "type": "boolean" } } } } }
Step 6: Add an ingest pipeline for a data type
editThe previous steps demonstrated how to create a custom ingest pipeline that adds a new field to each Elasticsearch document generated for the Systems integration CPU metrics (system.cpu
) dataset.
You can create an ingest pipeline to process data at various levels of customization. An ingest pipeline processor can be applied:
- Globally to all events
-
To all events of a certain type (for example
logs
ormetrics
) - To all events of a certain type in an integration
- To all events in a specific dataset
Let’s create a new custom ingest pipeline logs@custom
that processes all log events.
- Open Kibana and navigate to Kibana Dev tools.
-
Run a pipeline API request to add a new field
my-logs-field
:PUT _ingest/pipeline/logs@custom { "processors": [ { "set": { "description": "Custom field for all log events", "field": "my-logs-field", "value": "true" } } ] }
-
Allow some time for new data to be ingested, and then use a new exists query to confirm that the new field "my-logs-field" is being applied to log event documents.
For this example, we’ll check the System integration
system.syslog
dataset:GET /logs-system.syslog-default/_search?pretty { "query": { "exists": { "field": "my-logs-field" } } }
With the new pipeline applied, this query should return at least one document.
You can modify your pipeline API request as needed to apply custom processing at various levels. Refer to Ingest pipelines to learn more.