- Elasticsearch Guide: other versions:
- What’s new in 8.17
- Elasticsearch basics
- Quick starts
- Set up Elasticsearch
- Run Elasticsearch locally
- Installing Elasticsearch
- Configuring Elasticsearch
- Important Elasticsearch configuration
- Secure settings
- Auditing settings
- Circuit breaker settings
- Cluster-level shard allocation and routing settings
- Miscellaneous cluster settings
- Cross-cluster replication settings
- Discovery and cluster formation settings
- Data stream lifecycle settings
- Field data cache settings
- Local gateway settings
- Health Diagnostic settings
- Index lifecycle management settings
- Index management settings
- Index recovery settings
- Indexing buffer settings
- Inference settings
- License settings
- Machine learning settings
- Monitoring settings
- Node settings
- Networking
- Node query cache settings
- Path settings
- Search settings
- Security settings
- Shard request cache settings
- Snapshot and restore settings
- Transforms settings
- Thread pools
- Watcher settings
- Set JVM options
- Important system configuration
- Bootstrap Checks
- Heap size check
- File descriptor check
- Memory lock check
- Maximum number of threads check
- Max file size check
- Maximum size virtual memory check
- Maximum map count check
- Client JVM check
- Use serial collector check
- System call filter check
- OnError and OnOutOfMemoryError checks
- Early-access check
- All permission check
- Discovery configuration check
- Bootstrap Checks for X-Pack
- Starting Elasticsearch
- Stopping Elasticsearch
- Discovery and cluster formation
- Add and remove nodes in your cluster
- Full-cluster restart and rolling restart
- Remote clusters
- Plugins
- Upgrade Elasticsearch
- Index modules
- Mapping
- Dynamic mapping
- Explicit mapping
- Runtime fields
- Field data types
- Aggregate metric
- Alias
- Arrays
- Binary
- Boolean
- Completion
- Date
- Date nanoseconds
- Dense vector
- Flattened
- Geopoint
- Geoshape
- Histogram
- IP
- Join
- Keyword
- Nested
- Numeric
- Object
- Pass-through object
- Percolator
- Point
- Range
- Rank feature
- Rank features
- Search-as-you-type
- Semantic text
- Shape
- Sparse vector
- Text
- Token count
- Unsigned long
- Version
- Metadata fields
- Mapping parameters
analyzer
coerce
copy_to
doc_values
dynamic
eager_global_ordinals
enabled
format
ignore_above
index.mapping.ignore_above
ignore_malformed
index
index_options
index_phrases
index_prefixes
meta
fields
normalizer
norms
null_value
position_increment_gap
properties
search_analyzer
similarity
store
subobjects
term_vector
- Mapping limit settings
- Removal of mapping types
- Text analysis
- Overview
- Concepts
- Configure text analysis
- Built-in analyzer reference
- Tokenizer reference
- Token filter reference
- Apostrophe
- ASCII folding
- CJK bigram
- CJK width
- Classic
- Common grams
- Conditional
- Decimal digit
- Delimited payload
- Dictionary decompounder
- Edge n-gram
- Elision
- Fingerprint
- Flatten graph
- Hunspell
- Hyphenation decompounder
- Keep types
- Keep words
- Keyword marker
- Keyword repeat
- KStem
- Length
- Limit token count
- Lowercase
- MinHash
- Multiplexer
- N-gram
- Normalization
- Pattern capture
- Pattern replace
- Phonetic
- Porter stem
- Predicate script
- Remove duplicates
- Reverse
- Shingle
- Snowball
- Stemmer
- Stemmer override
- Stop
- Synonym
- Synonym graph
- Trim
- Truncate
- Unique
- Uppercase
- Word delimiter
- Word delimiter graph
- Character filters reference
- Normalizers
- Index templates
- Data streams
- Ingest pipelines
- Example: Parse logs
- Enrich your data
- Processor reference
- Append
- Attachment
- Bytes
- Circle
- Community ID
- Convert
- CSV
- Date
- Date index name
- Dissect
- Dot expander
- Drop
- Enrich
- Fail
- Fingerprint
- Foreach
- Geo-grid
- GeoIP
- Grok
- Gsub
- HTML strip
- Inference
- IP Location
- Join
- JSON
- KV
- Lowercase
- Network direction
- Pipeline
- Redact
- Registered domain
- Remove
- Rename
- Reroute
- Script
- Set
- Set security user
- Sort
- Split
- Terminate
- Trim
- Uppercase
- URL decode
- URI parts
- User agent
- Ingest pipelines in Search
- Aliases
- Search your data
- Re-ranking
- Query DSL
- Aggregations
- Bucket aggregations
- Adjacency matrix
- Auto-interval date histogram
- Categorize text
- Children
- Composite
- Date histogram
- Date range
- Diversified sampler
- Filter
- Filters
- Frequent item sets
- Geo-distance
- Geohash grid
- Geohex grid
- Geotile grid
- Global
- Histogram
- IP prefix
- IP range
- Missing
- Multi Terms
- Nested
- Parent
- Random sampler
- Range
- Rare terms
- Reverse nested
- Sampler
- Significant terms
- Significant text
- Terms
- Time series
- Variable width histogram
- Subtleties of bucketing range fields
- Metrics aggregations
- Pipeline aggregations
- Average bucket
- Bucket script
- Bucket count K-S test
- Bucket correlation
- Bucket selector
- Bucket sort
- Change point
- Cumulative cardinality
- Cumulative sum
- Derivative
- Extended stats bucket
- Inference bucket
- Max bucket
- Min bucket
- Moving function
- Moving percentiles
- Normalize
- Percentiles bucket
- Serial differencing
- Stats bucket
- Sum bucket
- Bucket aggregations
- Geospatial analysis
- Connectors
- EQL
- ES|QL
- SQL
- Overview
- Getting Started with SQL
- Conventions and Terminology
- Security
- SQL REST API
- SQL Translate API
- SQL CLI
- SQL JDBC
- SQL ODBC
- SQL Client Applications
- SQL Language
- Functions and Operators
- Comparison Operators
- Logical Operators
- Math Operators
- Cast Operators
- LIKE and RLIKE Operators
- Aggregate Functions
- Grouping Functions
- Date/Time and Interval Functions and Operators
- Full-Text Search Functions
- Mathematical Functions
- String Functions
- Type Conversion Functions
- Geo Functions
- Conditional Functions And Expressions
- System Functions
- Reserved keywords
- SQL Limitations
- Scripting
- Data management
- ILM: Manage the index lifecycle
- Tutorial: Customize built-in policies
- Tutorial: Automate rollover
- Index management in Kibana
- Overview
- Concepts
- Index lifecycle actions
- Configure a lifecycle policy
- Migrate index allocation filters to node roles
- Troubleshooting index lifecycle management errors
- Start and stop index lifecycle management
- Manage existing indices
- Skip rollover
- Restore a managed data stream or index
- Data tiers
- Autoscaling
- Monitor a cluster
- Roll up or transform your data
- Set up a cluster for high availability
- Snapshot and restore
- Secure the Elastic Stack
- Elasticsearch security principles
- Start the Elastic Stack with security enabled automatically
- Manually configure security
- Updating node security certificates
- User authentication
- Built-in users
- Service accounts
- Internal users
- Token-based authentication services
- User profiles
- Realms
- Realm chains
- Security domains
- Active Directory user authentication
- File-based user authentication
- LDAP user authentication
- Native user authentication
- OpenID Connect authentication
- PKI user authentication
- SAML authentication
- Kerberos authentication
- JWT authentication
- Integrating with other authentication systems
- Enabling anonymous access
- Looking up users without authentication
- Controlling the user cache
- Configuring SAML single-sign-on on the Elastic Stack
- Configuring single sign-on to the Elastic Stack using OpenID Connect
- User authorization
- Built-in roles
- Defining roles
- Role restriction
- Security privileges
- Document level security
- Field level security
- Granting privileges for data streams and aliases
- Mapping users and groups to roles
- Setting up field and document level security
- Submitting requests on behalf of other users
- Configuring authorization delegation
- Customizing roles and authorization
- Enable audit logging
- Restricting connections with IP filtering
- Securing clients and integrations
- Operator privileges
- Troubleshooting
- Some settings are not returned via the nodes settings API
- Authorization exceptions
- Users command fails due to extra arguments
- Users are frequently locked out of Active Directory
- Certificate verification fails for curl on Mac
- SSLHandshakeException causes connections to fail
- Common SSL/TLS exceptions
- Common Kerberos exceptions
- Common SAML issues
- Internal Server Error in Kibana
- Setup-passwords command fails due to connection failure
- Failures due to relocation of the configuration files
- Limitations
- Watcher
- Cross-cluster replication
- Data store architecture
- REST APIs
- API conventions
- Common options
- REST API compatibility
- Autoscaling APIs
- Behavioral Analytics APIs
- Compact and aligned text (CAT) APIs
- cat aliases
- cat allocation
- cat anomaly detectors
- cat component templates
- cat count
- cat data frame analytics
- cat datafeeds
- cat fielddata
- cat health
- cat indices
- cat master
- cat nodeattrs
- cat nodes
- cat pending tasks
- cat plugins
- cat recovery
- cat repositories
- cat segments
- cat shards
- cat snapshots
- cat task management
- cat templates
- cat thread pool
- cat trained model
- cat transforms
- Cluster APIs
- Cluster allocation explain
- Cluster get settings
- Cluster health
- Health
- Cluster reroute
- Cluster state
- Cluster stats
- Cluster update settings
- Nodes feature usage
- Nodes hot threads
- Nodes info
- Prevalidate node removal
- Nodes reload secure settings
- Nodes stats
- Cluster Info
- Pending cluster tasks
- Remote cluster info
- Task management
- Voting configuration exclusions
- Create or update desired nodes
- Get desired nodes
- Delete desired nodes
- Get desired balance
- Reset desired balance
- Cross-cluster replication APIs
- Connector APIs
- Create connector
- Delete connector
- Get connector
- List connectors
- Update connector API key id
- Update connector configuration
- Update connector index name
- Update connector features
- Update connector filtering
- Update connector name and description
- Update connector pipeline
- Update connector scheduling
- Update connector service type
- Create connector sync job
- Cancel connector sync job
- Delete connector sync job
- Get connector sync job
- List connector sync jobs
- Check in a connector
- Update connector error
- Update connector last sync stats
- Update connector status
- Check in connector sync job
- Claim connector sync job
- Set connector sync job error
- Set connector sync job stats
- Data stream APIs
- Document APIs
- Enrich APIs
- EQL APIs
- ES|QL APIs
- Features APIs
- Fleet APIs
- Graph explore API
- Index APIs
- Alias exists
- Aliases
- Analyze
- Analyze index disk usage
- Clear cache
- Clone index
- Close index
- Create index
- Create or update alias
- Create or update component template
- Create or update index template
- Create or update index template (legacy)
- Delete component template
- Delete dangling index
- Delete alias
- Delete index
- Delete index template
- Delete index template (legacy)
- Exists
- Field usage stats
- Flush
- Force merge
- Get alias
- Get component template
- Get field mapping
- Get index
- Get index settings
- Get index template
- Get index template (legacy)
- Get mapping
- Import dangling index
- Index recovery
- Index segments
- Index shard stores
- Index stats
- Index template exists (legacy)
- List dangling indices
- Open index
- Refresh
- Resolve index
- Resolve cluster
- Rollover
- Shrink index
- Simulate index
- Simulate template
- Split index
- Unfreeze index
- Update index settings
- Update mapping
- Index lifecycle management APIs
- Create or update lifecycle policy
- Get policy
- Delete policy
- Move to step
- Remove policy
- Retry policy
- Get index lifecycle management status
- Explain lifecycle
- Start index lifecycle management
- Stop index lifecycle management
- Migrate indices, ILM policies, and legacy, composable and component templates to data tiers routing
- Inference APIs
- Delete inference API
- Get inference API
- Perform inference API
- Create inference API
- Stream inference API
- Update inference API
- AlibabaCloud AI Search inference service
- Amazon Bedrock inference service
- Anthropic inference service
- Azure AI studio inference service
- Azure OpenAI inference service
- Cohere inference service
- Elasticsearch inference service
- ELSER inference service
- Google AI Studio inference service
- Google Vertex AI inference service
- HuggingFace inference service
- Mistral inference service
- OpenAI inference service
- Watsonx inference service
- Info API
- Ingest APIs
- Licensing APIs
- Logstash APIs
- Machine learning APIs
- Machine learning anomaly detection APIs
- Add events to calendar
- Add jobs to calendar
- Close jobs
- Create jobs
- Create calendars
- Create datafeeds
- Create filters
- Delete calendars
- Delete datafeeds
- Delete events from calendar
- Delete filters
- Delete forecasts
- Delete jobs
- Delete jobs from calendar
- Delete model snapshots
- Delete expired data
- Estimate model memory
- Flush jobs
- Forecast jobs
- Get buckets
- Get calendars
- Get categories
- Get datafeeds
- Get datafeed statistics
- Get influencers
- Get jobs
- Get job statistics
- Get model snapshots
- Get model snapshot upgrade statistics
- Get overall buckets
- Get scheduled events
- Get filters
- Get records
- Open jobs
- Post data to jobs
- Preview datafeeds
- Reset jobs
- Revert model snapshots
- Start datafeeds
- Stop datafeeds
- Update datafeeds
- Update filters
- Update jobs
- Update model snapshots
- Upgrade model snapshots
- Machine learning data frame analytics APIs
- Create data frame analytics jobs
- Delete data frame analytics jobs
- Evaluate data frame analytics
- Explain data frame analytics
- Get data frame analytics jobs
- Get data frame analytics jobs stats
- Preview data frame analytics
- Start data frame analytics jobs
- Stop data frame analytics jobs
- Update data frame analytics jobs
- Machine learning trained model APIs
- Clear trained model deployment cache
- Create or update trained model aliases
- Create part of a trained model
- Create trained models
- Create trained model vocabulary
- Delete trained model aliases
- Delete trained models
- Get trained models
- Get trained models stats
- Infer trained model
- Start trained model deployment
- Stop trained model deployment
- Update trained model deployment
- Migration APIs
- Node lifecycle APIs
- Query rules APIs
- Reload search analyzers API
- Repositories metering APIs
- Rollup APIs
- Root API
- Script APIs
- Search APIs
- Search Application APIs
- Searchable snapshots APIs
- Security APIs
- Authenticate
- Change passwords
- Clear cache
- Clear roles cache
- Clear privileges cache
- Clear API key cache
- Clear service account token caches
- Create API keys
- Create or update application privileges
- Create or update role mappings
- Create or update roles
- Bulk create or update roles API
- Bulk delete roles API
- Create or update users
- Create service account tokens
- Delegate PKI authentication
- Delete application privileges
- Delete role mappings
- Delete roles
- Delete service account token
- Delete users
- Disable users
- Enable users
- Enroll Kibana
- Enroll node
- Get API key information
- Get application privileges
- Get builtin privileges
- Get role mappings
- Get roles
- Query Role
- Get service accounts
- Get service account credentials
- Get Security settings
- Get token
- Get user privileges
- Get users
- Grant API keys
- Has privileges
- Invalidate API key
- Invalidate token
- OpenID Connect prepare authentication
- OpenID Connect authenticate
- OpenID Connect logout
- Query API key information
- Query User
- Update API key
- Update Security settings
- Bulk update API keys
- SAML prepare authentication
- SAML authenticate
- SAML logout
- SAML invalidate
- SAML complete logout
- SAML service provider metadata
- SSL certificate
- Activate user profile
- Disable user profile
- Enable user profile
- Get user profiles
- Suggest user profile
- Update user profile data
- Has privileges user profile
- Create Cross-Cluster API key
- Update Cross-Cluster API key
- Snapshot and restore APIs
- Snapshot lifecycle management APIs
- SQL APIs
- Synonyms APIs
- Text structure APIs
- Transform APIs
- Usage API
- Watcher APIs
- Definitions
- Command line tools
- elasticsearch-certgen
- elasticsearch-certutil
- elasticsearch-create-enrollment-token
- elasticsearch-croneval
- elasticsearch-keystore
- elasticsearch-node
- elasticsearch-reconfigure-node
- elasticsearch-reset-password
- elasticsearch-saml-metadata
- elasticsearch-service-tokens
- elasticsearch-setup-passwords
- elasticsearch-shard
- elasticsearch-syskeygen
- elasticsearch-users
- Optimizations
- Troubleshooting
- Fix common cluster issues
- Diagnose unassigned shards
- Add a missing tier to the system
- Allow Elasticsearch to allocate the data in the system
- Allow Elasticsearch to allocate the index
- Indices mix index allocation filters with data tiers node roles to move through data tiers
- Not enough nodes to allocate all shard replicas
- Total number of shards for an index on a single node exceeded
- Total number of shards per node has been reached
- Troubleshooting corruption
- Fix data nodes out of disk
- Fix master nodes out of disk
- Fix other role nodes out of disk
- Start index lifecycle management
- Start Snapshot Lifecycle Management
- Restore from snapshot
- Troubleshooting broken repositories
- Addressing repeated snapshot policy failures
- Troubleshooting an unstable cluster
- Troubleshooting discovery
- Troubleshooting monitoring
- Troubleshooting transforms
- Troubleshooting Watcher
- Troubleshooting searches
- Troubleshooting shards capacity health issues
- Troubleshooting an unbalanced cluster
- Capture diagnostics
- Migration guide
- Release notes
- Elasticsearch version 8.17.1
- Elasticsearch version 8.17.0
- Elasticsearch version 8.16.2
- Elasticsearch version 8.16.1
- Elasticsearch version 8.16.0
- Elasticsearch version 8.15.5
- Elasticsearch version 8.15.4
- Elasticsearch version 8.15.3
- Elasticsearch version 8.15.2
- Elasticsearch version 8.15.1
- Elasticsearch version 8.15.0
- Elasticsearch version 8.14.3
- Elasticsearch version 8.14.2
- Elasticsearch version 8.14.1
- Elasticsearch version 8.14.0
- Elasticsearch version 8.13.4
- Elasticsearch version 8.13.3
- Elasticsearch version 8.13.2
- Elasticsearch version 8.13.1
- Elasticsearch version 8.13.0
- Elasticsearch version 8.12.2
- Elasticsearch version 8.12.1
- Elasticsearch version 8.12.0
- Elasticsearch version 8.11.4
- Elasticsearch version 8.11.3
- Elasticsearch version 8.11.2
- Elasticsearch version 8.11.1
- Elasticsearch version 8.11.0
- Elasticsearch version 8.10.4
- Elasticsearch version 8.10.3
- Elasticsearch version 8.10.2
- Elasticsearch version 8.10.1
- Elasticsearch version 8.10.0
- Elasticsearch version 8.9.2
- Elasticsearch version 8.9.1
- Elasticsearch version 8.9.0
- Elasticsearch version 8.8.2
- Elasticsearch version 8.8.1
- Elasticsearch version 8.8.0
- Elasticsearch version 8.7.1
- Elasticsearch version 8.7.0
- Elasticsearch version 8.6.2
- Elasticsearch version 8.6.1
- Elasticsearch version 8.6.0
- Elasticsearch version 8.5.3
- Elasticsearch version 8.5.2
- Elasticsearch version 8.5.1
- Elasticsearch version 8.5.0
- Elasticsearch version 8.4.3
- Elasticsearch version 8.4.2
- Elasticsearch version 8.4.1
- Elasticsearch version 8.4.0
- Elasticsearch version 8.3.3
- Elasticsearch version 8.3.2
- Elasticsearch version 8.3.1
- Elasticsearch version 8.3.0
- Elasticsearch version 8.2.3
- Elasticsearch version 8.2.2
- Elasticsearch version 8.2.1
- Elasticsearch version 8.2.0
- Elasticsearch version 8.1.3
- Elasticsearch version 8.1.2
- Elasticsearch version 8.1.1
- Elasticsearch version 8.1.0
- Elasticsearch version 8.0.1
- Elasticsearch version 8.0.0
- Elasticsearch version 8.0.0-rc2
- Elasticsearch version 8.0.0-rc1
- Elasticsearch version 8.0.0-beta1
- Elasticsearch version 8.0.0-alpha2
- Elasticsearch version 8.0.0-alpha1
- Dependencies and versions
Ingest pipelines
editIngest pipelines
editIngest pipelines let you perform common transformations on your data before indexing. For example, you can use pipelines to remove fields, extract values from text, and enrich your data.
A pipeline consists of a series of configurable tasks called processors. Each processor runs sequentially, making specific changes to incoming documents. After the processors have run, Elasticsearch adds the transformed documents to your data stream or index.
You can create and manage ingest pipelines using Kibana’s Ingest Pipelines feature or the ingest APIs. Elasticsearch stores pipelines in the cluster state.
Prerequisites
edit-
Nodes with the
ingest
node role handle pipeline processing. To use ingest pipelines, your cluster must have at least one node with theingest
role. For heavy ingest loads, we recommend creating dedicated ingest nodes. -
If the Elasticsearch security features are enabled, you must have the
manage_pipeline
cluster privilege to manage ingest pipelines. To use Kibana’s Ingest Pipelines feature, you also need thecluster:monitor/nodes/info
cluster privileges. -
Pipelines including the
enrich
processor require additional setup. See Enrich your data.
Create and manage pipelines
editIn Kibana, open the main menu and click Stack Management > Ingest Pipelines. From the list view, you can:
- View a list of your pipelines and drill down into details
- Edit or clone existing pipelines
- Delete pipelines
To create a pipeline, click Create pipeline > New pipeline. For an example tutorial, see Example: Parse logs.
The New pipeline from CSV option lets you use a CSV to create an ingest pipeline that maps custom data to the Elastic Common Schema (ECS). Mapping your custom data to ECS makes the data easier to search and lets you reuse visualizations from other datasets. To get started, check Map custom data to ECS.
You can also use the ingest APIs to create and manage pipelines.
The following create pipeline API request creates
a pipeline containing two set
processors followed by a
lowercase
processor. The processors run sequentially
in the order specified.
resp = client.ingest.put_pipeline( id="my-pipeline", description="My optional pipeline description", processors=[ { "set": { "description": "My optional processor description", "field": "my-long-field", "value": 10 } }, { "set": { "description": "Set 'my-boolean-field' to true", "field": "my-boolean-field", "value": True } }, { "lowercase": { "field": "my-keyword-field" } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { description: 'My optional pipeline description', processors: [ { set: { description: 'My optional processor description', field: 'my-long-field', value: 10 } }, { set: { description: "Set 'my-boolean-field' to true", field: 'my-boolean-field', value: true } }, { lowercase: { field: 'my-keyword-field' } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", description: "My optional pipeline description", processors: [ { set: { description: "My optional processor description", field: "my-long-field", value: 10, }, }, { set: { description: "Set 'my-boolean-field' to true", field: "my-boolean-field", value: true, }, }, { lowercase: { field: "my-keyword-field", }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "description": "My optional pipeline description", "processors": [ { "set": { "description": "My optional processor description", "field": "my-long-field", "value": 10 } }, { "set": { "description": "Set 'my-boolean-field' to true", "field": "my-boolean-field", "value": true } }, { "lowercase": { "field": "my-keyword-field" } } ] }
Manage pipeline versions
editWhen you create or update a pipeline, you can specify an optional version
integer. You can use this version number with the
if_version
parameter to conditionally
update the pipeline. When the if_version
parameter is specified, a successful
update increments the pipeline’s version.
PUT _ingest/pipeline/my-pipeline-id { "version": 1, "processors": [ ... ] }
To unset the version
number using the API, replace or update the pipeline
without specifying the version
parameter.
Test a pipeline
editBefore using a pipeline in production, we recommend you test it using sample documents. When creating or editing a pipeline in Kibana, click Add documents. In the Documents tab, provide sample documents and click Run the pipeline.
You can also test pipelines using the simulate pipeline
API. You can specify a configured pipeline in the request path. For example,
the following request tests my-pipeline
.
resp = client.ingest.simulate( id="my-pipeline", docs=[ { "_source": { "my-keyword-field": "FOO" } }, { "_source": { "my-keyword-field": "BAR" } } ], ) print(resp)
response = client.ingest.simulate( id: 'my-pipeline', body: { docs: [ { _source: { "my-keyword-field": 'FOO' } }, { _source: { "my-keyword-field": 'BAR' } } ] } ) puts response
const response = await client.ingest.simulate({ id: "my-pipeline", docs: [ { _source: { "my-keyword-field": "FOO", }, }, { _source: { "my-keyword-field": "BAR", }, }, ], }); console.log(response);
POST _ingest/pipeline/my-pipeline/_simulate { "docs": [ { "_source": { "my-keyword-field": "FOO" } }, { "_source": { "my-keyword-field": "BAR" } } ] }
Alternatively, you can specify a pipeline and its processors in the request body.
resp = client.ingest.simulate( pipeline={ "processors": [ { "lowercase": { "field": "my-keyword-field" } } ] }, docs=[ { "_source": { "my-keyword-field": "FOO" } }, { "_source": { "my-keyword-field": "BAR" } } ], ) print(resp)
response = client.ingest.simulate( body: { pipeline: { processors: [ { lowercase: { field: 'my-keyword-field' } } ] }, docs: [ { _source: { "my-keyword-field": 'FOO' } }, { _source: { "my-keyword-field": 'BAR' } } ] } ) puts response
const response = await client.ingest.simulate({ pipeline: { processors: [ { lowercase: { field: "my-keyword-field", }, }, ], }, docs: [ { _source: { "my-keyword-field": "FOO", }, }, { _source: { "my-keyword-field": "BAR", }, }, ], }); console.log(response);
POST _ingest/pipeline/_simulate { "pipeline": { "processors": [ { "lowercase": { "field": "my-keyword-field" } } ] }, "docs": [ { "_source": { "my-keyword-field": "FOO" } }, { "_source": { "my-keyword-field": "BAR" } } ] }
The API returns transformed documents:
{ "docs": [ { "doc": { "_index": "_index", "_id": "_id", "_version": "-3", "_source": { "my-keyword-field": "foo" }, "_ingest": { "timestamp": "2099-03-07T11:04:03.000Z" } } }, { "doc": { "_index": "_index", "_id": "_id", "_version": "-3", "_source": { "my-keyword-field": "bar" }, "_ingest": { "timestamp": "2099-03-07T11:04:04.000Z" } } } ] }
Add a pipeline to an indexing request
editUse the pipeline
query parameter to apply a pipeline to documents in
individual or bulk indexing requests.
resp = client.index( index="my-data-stream", pipeline="my-pipeline", document={ "@timestamp": "2099-03-07T11:04:05.000Z", "my-keyword-field": "foo" }, ) print(resp) resp1 = client.bulk( index="my-data-stream", pipeline="my-pipeline", operations=[ { "create": {} }, { "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }, { "create": {} }, { "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" } ], ) print(resp1)
response = client.index( index: 'my-data-stream', pipeline: 'my-pipeline', body: { "@timestamp": '2099-03-07T11:04:05.000Z', "my-keyword-field": 'foo' } ) puts response response = client.bulk( index: 'my-data-stream', pipeline: 'my-pipeline', body: [ { create: {} }, { "@timestamp": '2099-03-07T11:04:06.000Z', "my-keyword-field": 'foo' }, { create: {} }, { "@timestamp": '2099-03-07T11:04:07.000Z', "my-keyword-field": 'bar' } ] ) puts response
const response = await client.index({ index: "my-data-stream", pipeline: "my-pipeline", document: { "@timestamp": "2099-03-07T11:04:05.000Z", "my-keyword-field": "foo", }, }); console.log(response); const response1 = await client.bulk({ index: "my-data-stream", pipeline: "my-pipeline", operations: [ { create: {}, }, { "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo", }, { create: {}, }, { "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar", }, ], }); console.log(response1);
POST my-data-stream/_doc?pipeline=my-pipeline { "@timestamp": "2099-03-07T11:04:05.000Z", "my-keyword-field": "foo" } PUT my-data-stream/_bulk?pipeline=my-pipeline { "create":{ } } { "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" } { "create":{ } } { "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
You can also use the pipeline
parameter with the update
by query or reindex APIs.
resp = client.update_by_query( index="my-data-stream", pipeline="my-pipeline", ) print(resp) resp1 = client.reindex( source={ "index": "my-data-stream" }, dest={ "index": "my-new-data-stream", "op_type": "create", "pipeline": "my-pipeline" }, ) print(resp1)
response = client.update_by_query( index: 'my-data-stream', pipeline: 'my-pipeline' ) puts response response = client.reindex( body: { source: { index: 'my-data-stream' }, dest: { index: 'my-new-data-stream', op_type: 'create', pipeline: 'my-pipeline' } } ) puts response
const response = await client.updateByQuery({ index: "my-data-stream", pipeline: "my-pipeline", }); console.log(response); const response1 = await client.reindex({ source: { index: "my-data-stream", }, dest: { index: "my-new-data-stream", op_type: "create", pipeline: "my-pipeline", }, }); console.log(response1);
POST my-data-stream/_update_by_query?pipeline=my-pipeline POST _reindex { "source": { "index": "my-data-stream" }, "dest": { "index": "my-new-data-stream", "op_type": "create", "pipeline": "my-pipeline" } }
Set a default pipeline
editUse the index.default_pipeline
index setting to set
a default pipeline. Elasticsearch applies this pipeline to indexing requests if no
pipeline
parameter is specified.
Set a final pipeline
editUse the index.final_pipeline
index setting to set a
final pipeline. Elasticsearch applies this pipeline after the request or default
pipeline, even if neither is specified.
Pipelines for Beats
editTo add an ingest pipeline to an Elastic Beat, specify the pipeline
parameter under output.elasticsearch
in <BEAT_NAME>.yml
. For example,
for Filebeat, you’d specify pipeline
in filebeat.yml
.
output.elasticsearch: hosts: ["localhost:9200"] pipeline: my-pipeline
Pipelines for Fleet and Elastic Agent
editElastic Agent integrations ship with default ingest pipelines that preprocess and enrich data before indexing. Fleet applies these pipelines using index templates that include pipeline index settings. Elasticsearch matches these templates to your Fleet data streams based on the stream’s naming scheme.
Each default integration pipeline calls a nonexistent, unversioned *@custom
ingest pipeline.
If unaltered, this pipeline call has no effect on your data. However, you can modify this call to
create custom pipelines for integrations that persist across upgrades.
Refer to Tutorial: Transform data with custom ingest pipelines to learn more.
Fleet doesn’t provide a default ingest pipeline for the Custom logs integration, but you can specify a pipeline for this integration using an index template or a custom configuration.
-
Create and test your ingest pipeline. Name your pipeline
logs-<dataset-name>-default
. This makes tracking the pipeline for your integration easier.For example, the following request creates a pipeline for the
my-app
dataset. The pipeline’s name islogs-my_app-default
.PUT _ingest/pipeline/logs-my_app-default { "description": "Pipeline for `my_app` dataset", "processors": [ ... ] }
-
Create an index template that includes your pipeline in the
index.default_pipeline
orindex.final_pipeline
index setting. Ensure the template is data stream enabled. The template’s index pattern should matchlogs-<dataset-name>-*
.You can create this template using Kibana’s Index Management feature or the create index template API.
For example, the following request creates a template matching
logs-my_app-*
. The template uses a component template that contains theindex.default_pipeline
index setting.resp = client.cluster.put_component_template( name="logs-my_app-settings", template={ "settings": { "index.default_pipeline": "logs-my_app-default", "index.lifecycle.name": "logs" } }, ) print(resp) resp1 = client.indices.put_index_template( name="logs-my_app-template", index_patterns=[ "logs-my_app-*" ], data_stream={}, priority=500, composed_of=[ "logs-my_app-settings", "logs-my_app-mappings" ], ) print(resp1)
const response = await client.cluster.putComponentTemplate({ name: "logs-my_app-settings", template: { settings: { "index.default_pipeline": "logs-my_app-default", "index.lifecycle.name": "logs", }, }, }); console.log(response); const response1 = await client.indices.putIndexTemplate({ name: "logs-my_app-template", index_patterns: ["logs-my_app-*"], data_stream: {}, priority: 500, composed_of: ["logs-my_app-settings", "logs-my_app-mappings"], }); console.log(response1);
# Creates a component template for index settings PUT _component_template/logs-my_app-settings { "template": { "settings": { "index.default_pipeline": "logs-my_app-default", "index.lifecycle.name": "logs" } } } # Creates an index template matching `logs-my_app-*` PUT _index_template/logs-my_app-template { "index_patterns": ["logs-my_app-*"], "data_stream": { }, "priority": 500, "composed_of": ["logs-my_app-settings", "logs-my_app-mappings"] }
- When adding or editing your Custom logs integration in Fleet, click Configure integration > Custom log file > Advanced options.
-
In Dataset name, specify your dataset’s name. Fleet will add new data for the integration to the resulting
logs-<dataset-name>-default
data stream.For example, if your dataset’s name was
my_app
, Fleet adds new data to thelogs-my_app-default
data stream. -
Use the rollover API to roll over your data stream. This ensures Elasticsearch applies the index template and its pipeline settings to any new data for the integration.
resp = client.indices.rollover( alias="logs-my_app-default", ) print(resp)
response = client.indices.rollover( alias: 'logs-my_app-default' ) puts response
const response = await client.indices.rollover({ alias: "logs-my_app-default", }); console.log(response);
POST logs-my_app-default/_rollover/
Option 2: Custom configuration
-
Create and test your ingest pipeline. Name your pipeline
logs-<dataset-name>-default
. This makes tracking the pipeline for your integration easier.For example, the following request creates a pipeline for the
my-app
dataset. The pipeline’s name islogs-my_app-default
.PUT _ingest/pipeline/logs-my_app-default { "description": "Pipeline for `my_app` dataset", "processors": [ ... ] }
- When adding or editing your Custom logs integration in Fleet, click Configure integration > Custom log file > Advanced options.
-
In Dataset name, specify your dataset’s name. Fleet will add new data for the integration to the resulting
logs-<dataset-name>-default
data stream.For example, if your dataset’s name was
my_app
, Fleet adds new data to thelogs-my_app-default
data stream. -
In Custom Configurations, specify your pipeline in the
pipeline
policy setting.
Elastic Agent standalone
If you run Elastic Agent standalone, you can apply pipelines using an
index template that includes the
index.default_pipeline
or
index.final_pipeline
index setting. Alternatively,
you can specify the pipeline
policy setting in your elastic-agent.yml
configuration. See Install standalone Elastic Agents.
Pipelines for search indices
editWhen you create Elasticsearch indices for search use cases, for example, using the web crawler or connectors, these indices are automatically set up with specific ingest pipelines. These processors help optimize your content for search. See Ingest pipelines in Search for more information.
Access source fields in a processor
editProcessors have read and write access to an incoming document’s source fields.
To access a field key in a processor, use its field name. The following set
processor accesses my-long-field
.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "set": { "field": "my-long-field", "value": 10 } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { set: { field: 'my-long-field', value: 10 } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { set: { field: "my-long-field", value: 10, }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "set": { "field": "my-long-field", "value": 10 } } ] }
You can also prepend the _source
prefix.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "set": { "field": "_source.my-long-field", "value": 10 } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { set: { field: '_source.my-long-field', value: 10 } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { set: { field: "_source.my-long-field", value: 10, }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "set": { "field": "_source.my-long-field", "value": 10 } } ] }
Use dot notation to access object fields.
If your document contains flattened objects, use the
dot_expander
processor to expand them first. Other
ingest processors cannot access flattened objects.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "dot_expander": { "description": "Expand 'my-object-field.my-property'", "field": "my-object-field.my-property" } }, { "set": { "description": "Set 'my-object-field.my-property' to 10", "field": "my-object-field.my-property", "value": 10 } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { dot_expander: { description: "Expand 'my-object-field.my-property'", field: 'my-object-field.my-property' } }, { set: { description: "Set 'my-object-field.my-property' to 10", field: 'my-object-field.my-property', value: 10 } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { dot_expander: { description: "Expand 'my-object-field.my-property'", field: "my-object-field.my-property", }, }, { set: { description: "Set 'my-object-field.my-property' to 10", field: "my-object-field.my-property", value: 10, }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "dot_expander": { "description": "Expand 'my-object-field.my-property'", "field": "my-object-field.my-property" } }, { "set": { "description": "Set 'my-object-field.my-property' to 10", "field": "my-object-field.my-property", "value": 10 } } ] }
Several processor parameters support Mustache
template snippets. To access field values in a template snippet, enclose the
field name in triple curly brackets:{{{field-name}}}
. You can use template
snippets to dynamically set field names.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "set": { "description": "Set dynamic '<service>' field to 'code' value", "field": "{{{service}}}", "value": "{{{code}}}" } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { set: { description: "Set dynamic '<service>' field to 'code' value", field: '{{{service}}}', value: '{{{code}}}' } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { set: { description: "Set dynamic '<service>' field to 'code' value", field: "{{{service}}}", value: "{{{code}}}", }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "set": { "description": "Set dynamic '<service>' field to 'code' value", "field": "{{{service}}}", "value": "{{{code}}}" } } ] }
Access metadata fields in a processor
editProcessors can access the following metadata fields by name:
-
_index
-
_id
-
_routing
-
_dynamic_templates
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "set": { "description": "Set '_routing' to 'geoip.country_iso_code' value", "field": "_routing", "value": "{{{geoip.country_iso_code}}}" } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { set: { description: "Set '_routing' to 'geoip.country_iso_code' value", field: '_routing', value: '{{{geoip.country_iso_code}}}' } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { set: { description: "Set '_routing' to 'geoip.country_iso_code' value", field: "_routing", value: "{{{geoip.country_iso_code}}}", }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "set": { "description": "Set '_routing' to 'geoip.country_iso_code' value", "field": "_routing", "value": "{{{geoip.country_iso_code}}}" } } ] }
Use a Mustache template snippet to access metadata field values. For example,
{{{_routing}}}
retrieves a document’s routing value.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "set": { "description": "Use geo_point dynamic template for address field", "field": "_dynamic_templates", "value": { "address": "geo_point" } } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { set: { description: 'Use geo_point dynamic template for address field', field: '_dynamic_templates', value: { address: 'geo_point' } } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { set: { description: "Use geo_point dynamic template for address field", field: "_dynamic_templates", value: { address: "geo_point", }, }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "set": { "description": "Use geo_point dynamic template for address field", "field": "_dynamic_templates", "value": { "address": "geo_point" } } } ] }
The set processor above tells ES to use the dynamic template named geo_point
for the field address
if this field is not defined in the mapping of the index
yet. This processor overrides the dynamic template for the field address
if
already defined in the bulk request, but has no effect on other dynamic
templates defined in the bulk request.
If you automatically generate
document IDs, you cannot use {{{_id}}}
in a processor. Elasticsearch assigns
auto-generated _id
values after ingest.
Access ingest metadata in a processor
editIngest processors can add and access ingest metadata using the _ingest
key.
Unlike source and metadata fields, Elasticsearch does not index ingest metadata fields by
default. Elasticsearch also allows source fields that start with an _ingest
key. If
your data includes such source fields, use _source._ingest
to access them.
Pipelines only create the _ingest.timestamp
ingest metadata field by default.
This field contains a timestamp of when Elasticsearch received the document’s indexing
request. To index _ingest.timestamp
or other ingest metadata fields, use the
set
processor.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "set": { "description": "Index the ingest timestamp as 'event.ingested'", "field": "event.ingested", "value": "{{{_ingest.timestamp}}}" } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { set: { description: "Index the ingest timestamp as 'event.ingested'", field: 'event.ingested', value: '{{{_ingest.timestamp}}}' } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { set: { description: "Index the ingest timestamp as 'event.ingested'", field: "event.ingested", value: "{{{_ingest.timestamp}}}", }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "set": { "description": "Index the ingest timestamp as 'event.ingested'", "field": "event.ingested", "value": "{{{_ingest.timestamp}}}" } } ] }
Handling pipeline failures
editA pipeline’s processors run sequentially. By default, pipeline processing stops when one of these processors fails or encounters an error.
To ignore a processor failure and run the pipeline’s remaining processors, set
ignore_failure
to true
.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "rename": { "description": "Rename 'provider' to 'cloud.provider'", "field": "provider", "target_field": "cloud.provider", "ignore_failure": True } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { rename: { description: "Rename 'provider' to 'cloud.provider'", field: 'provider', target_field: 'cloud.provider', ignore_failure: true } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { rename: { description: "Rename 'provider' to 'cloud.provider'", field: "provider", target_field: "cloud.provider", ignore_failure: true, }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "rename": { "description": "Rename 'provider' to 'cloud.provider'", "field": "provider", "target_field": "cloud.provider", "ignore_failure": true } } ] }
Use the on_failure
parameter to specify a list of processors to run
immediately after a processor failure. If on_failure
is specified, Elasticsearch
afterward runs the pipeline’s remaining processors, even if the on_failure
configuration is empty.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "rename": { "description": "Rename 'provider' to 'cloud.provider'", "field": "provider", "target_field": "cloud.provider", "on_failure": [ { "set": { "description": "Set 'error.message'", "field": "error.message", "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'", "override": False } } ] } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { rename: { description: "Rename 'provider' to 'cloud.provider'", field: 'provider', target_field: 'cloud.provider', on_failure: [ { set: { description: "Set 'error.message'", field: 'error.message', value: "Field 'provider' does not exist. Cannot rename to 'cloud.provider'", override: false } } ] } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { rename: { description: "Rename 'provider' to 'cloud.provider'", field: "provider", target_field: "cloud.provider", on_failure: [ { set: { description: "Set 'error.message'", field: "error.message", value: "Field 'provider' does not exist. Cannot rename to 'cloud.provider'", override: false, }, }, ], }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "rename": { "description": "Rename 'provider' to 'cloud.provider'", "field": "provider", "target_field": "cloud.provider", "on_failure": [ { "set": { "description": "Set 'error.message'", "field": "error.message", "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'", "override": false } } ] } } ] }
Nest a list of on_failure
processors for nested error handling.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "rename": { "description": "Rename 'provider' to 'cloud.provider'", "field": "provider", "target_field": "cloud.provider", "on_failure": [ { "set": { "description": "Set 'error.message'", "field": "error.message", "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'", "override": False, "on_failure": [ { "set": { "description": "Set 'error.message.multi'", "field": "error.message.multi", "value": "Document encountered multiple ingest errors", "override": True } } ] } } ] } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { rename: { description: "Rename 'provider' to 'cloud.provider'", field: 'provider', target_field: 'cloud.provider', on_failure: [ { set: { description: "Set 'error.message'", field: 'error.message', value: "Field 'provider' does not exist. Cannot rename to 'cloud.provider'", override: false, on_failure: [ { set: { description: "Set 'error.message.multi'", field: 'error.message.multi', value: 'Document encountered multiple ingest errors', override: true } } ] } } ] } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { rename: { description: "Rename 'provider' to 'cloud.provider'", field: "provider", target_field: "cloud.provider", on_failure: [ { set: { description: "Set 'error.message'", field: "error.message", value: "Field 'provider' does not exist. Cannot rename to 'cloud.provider'", override: false, on_failure: [ { set: { description: "Set 'error.message.multi'", field: "error.message.multi", value: "Document encountered multiple ingest errors", override: true, }, }, ], }, }, ], }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "rename": { "description": "Rename 'provider' to 'cloud.provider'", "field": "provider", "target_field": "cloud.provider", "on_failure": [ { "set": { "description": "Set 'error.message'", "field": "error.message", "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'", "override": false, "on_failure": [ { "set": { "description": "Set 'error.message.multi'", "field": "error.message.multi", "value": "Document encountered multiple ingest errors", "override": true } } ] } } ] } } ] }
You can also specify on_failure
for a pipeline. If a processor without an
on_failure
value fails, Elasticsearch uses this pipeline-level parameter as a fallback.
Elasticsearch will not attempt to run the pipeline’s remaining processors.
PUT _ingest/pipeline/my-pipeline { "processors": [ ... ], "on_failure": [ { "set": { "description": "Index document to 'failed-<index>'", "field": "_index", "value": "failed-{{{ _index }}}" } } ] }
Additional information about the pipeline failure may be available in the
document metadata fields on_failure_message
, on_failure_processor_type
,
on_failure_processor_tag
, and on_failure_pipeline
. These fields are
accessible only from within an on_failure
block.
The following example uses the metadata fields to include information about pipeline failures in documents.
PUT _ingest/pipeline/my-pipeline { "processors": [ ... ], "on_failure": [ { "set": { "description": "Record error information", "field": "error_information", "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}" } } ] }
Conditionally run a processor
editEach processor supports an optional if
condition, written as a
Painless script. If provided, the processor only
runs when the if
condition is true
.
if
condition scripts run in Painless’s
ingest processor context. In
if
conditions, ctx
values are read-only.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "drop": { "description": "Drop documents with 'network.name' of 'Guest'", "if": "ctx?.network?.name == 'Guest'" } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { drop: { description: "Drop documents with 'network.name' of 'Guest'", if: "ctx?.network?.name == 'Guest'" } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { drop: { description: "Drop documents with 'network.name' of 'Guest'", if: "ctx?.network?.name == 'Guest'", }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "drop": { "description": "Drop documents with 'network.name' of 'Guest'", "if": "ctx?.network?.name == 'Guest'" } } ] }
If the script.painless.regex.enabled
cluster
setting is enabled, you can use regular expressions in your if
condition
scripts. For supported syntax, see Painless
regular expressions.
If possible, avoid using regular expressions. Expensive regular expressions can slow indexing speeds.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "set": { "description": "If 'url.scheme' is 'http', set 'url.insecure' to true", "if": "ctx.url?.scheme =~ /^http[^s]/", "field": "url.insecure", "value": True } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'my-pipeline', body: { processors: [ { set: { description: "If 'url.scheme' is 'http', set 'url.insecure' to true", if: 'ctx.url?.scheme =~ /^http[^s]/', field: 'url.insecure', value: true } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { set: { description: "If 'url.scheme' is 'http', set 'url.insecure' to true", if: "ctx.url?.scheme =~ /^http[^s]/", field: "url.insecure", value: true, }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "set": { "description": "If 'url.scheme' is 'http', set 'url.insecure' to true", "if": "ctx.url?.scheme =~ /^http[^s]/", "field": "url.insecure", "value": true } } ] }
You must specify if
conditions as valid JSON on a single line. However, you
can use the Kibana
console's triple quote syntax to write and debug larger scripts.
If possible, avoid using complex or expensive if
condition scripts.
Expensive condition scripts can slow indexing speeds.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "drop": { "description": "Drop documents that don't contain 'prod' tag", "if": "\n Collection tags = ctx.tags;\n if(tags != null){\n for (String tag : tags) {\n if (tag.toLowerCase().contains('prod')) {\n return false;\n }\n }\n }\n return true;\n " } } ], ) print(resp)
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { drop: { description: "Drop documents that don't contain 'prod' tag", if: "\n Collection tags = ctx.tags;\n if(tags != null){\n for (String tag : tags) {\n if (tag.toLowerCase().contains('prod')) {\n return false;\n }\n }\n }\n return true;\n ", }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "drop": { "description": "Drop documents that don't contain 'prod' tag", "if": """ Collection tags = ctx.tags; if(tags != null){ for (String tag : tags) { if (tag.toLowerCase().contains('prod')) { return false; } } } return true; """ } } ] }
You can also specify a stored script as the
if
condition.
resp = client.put_script( id="my-prod-tag-script", script={ "lang": "painless", "source": "\n Collection tags = ctx.tags;\n if(tags != null){\n for (String tag : tags) {\n if (tag.toLowerCase().contains('prod')) {\n return false;\n }\n }\n }\n return true;\n " }, ) print(resp) resp1 = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "drop": { "description": "Drop documents that don't contain 'prod' tag", "if": { "id": "my-prod-tag-script" } } } ], ) print(resp1)
const response = await client.putScript({ id: "my-prod-tag-script", script: { lang: "painless", source: "\n Collection tags = ctx.tags;\n if(tags != null){\n for (String tag : tags) {\n if (tag.toLowerCase().contains('prod')) {\n return false;\n }\n }\n }\n return true;\n ", }, }); console.log(response); const response1 = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { drop: { description: "Drop documents that don't contain 'prod' tag", if: { id: "my-prod-tag-script", }, }, }, ], }); console.log(response1);
PUT _scripts/my-prod-tag-script { "script": { "lang": "painless", "source": """ Collection tags = ctx.tags; if(tags != null){ for (String tag : tags) { if (tag.toLowerCase().contains('prod')) { return false; } } } return true; """ } } PUT _ingest/pipeline/my-pipeline { "processors": [ { "drop": { "description": "Drop documents that don't contain 'prod' tag", "if": { "id": "my-prod-tag-script" } } } ] }
Incoming documents often contain object fields. If a processor script attempts
to access a field whose parent object does not exist, Elasticsearch returns a
NullPointerException
. To avoid these exceptions, use
null safe
operators, such as ?.
, and write your scripts to be null safe.
For example, ctx.network?.name.equalsIgnoreCase('Guest')
is not null safe.
ctx.network?.name
can return null. Rewrite the script as
'Guest'.equalsIgnoreCase(ctx.network?.name)
, which is null safe because
Guest
is always non-null.
If you can’t rewrite a script to be null safe, include an explicit null check.
resp = client.ingest.put_pipeline( id="my-pipeline", processors=[ { "drop": { "description": "Drop documents that contain 'network.name' of 'Guest'", "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')" } } ], ) print(resp)
const response = await client.ingest.putPipeline({ id: "my-pipeline", processors: [ { drop: { description: "Drop documents that contain 'network.name' of 'Guest'", if: "ctx.network?.name != null && ctx.network.name.contains('Guest')", }, }, ], }); console.log(response);
PUT _ingest/pipeline/my-pipeline { "processors": [ { "drop": { "description": "Drop documents that contain 'network.name' of 'Guest'", "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')" } } ] }
Conditionally apply pipelines
editCombine an if
condition with the pipeline
processor
to apply other pipelines to documents based on your criteria. You can use this
pipeline as the default pipeline in an
index template used to configure multiple data streams or
indices.
resp = client.ingest.put_pipeline( id="one-pipeline-to-rule-them-all", processors=[ { "pipeline": { "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'", "if": "ctx.service?.name == 'apache_httpd'", "name": "httpd_pipeline" } }, { "pipeline": { "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'", "if": "ctx.service?.name == 'syslog'", "name": "syslog_pipeline" } }, { "fail": { "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message", "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'", "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`" } } ], ) print(resp)
response = client.ingest.put_pipeline( id: 'one-pipeline-to-rule-them-all', body: { processors: [ { pipeline: { description: "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'", if: "ctx.service?.name == 'apache_httpd'", name: 'httpd_pipeline' } }, { pipeline: { description: "If 'service.name' is 'syslog', use 'syslog_pipeline'", if: "ctx.service?.name == 'syslog'", name: 'syslog_pipeline' } }, { fail: { description: "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message", if: "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'", message: 'This pipeline requires service.name to be either `syslog` or `apache_httpd`' } } ] } ) puts response
const response = await client.ingest.putPipeline({ id: "one-pipeline-to-rule-them-all", processors: [ { pipeline: { description: "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'", if: "ctx.service?.name == 'apache_httpd'", name: "httpd_pipeline", }, }, { pipeline: { description: "If 'service.name' is 'syslog', use 'syslog_pipeline'", if: "ctx.service?.name == 'syslog'", name: "syslog_pipeline", }, }, { fail: { description: "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message", if: "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'", message: "This pipeline requires service.name to be either `syslog` or `apache_httpd`", }, }, ], }); console.log(response);
PUT _ingest/pipeline/one-pipeline-to-rule-them-all { "processors": [ { "pipeline": { "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'", "if": "ctx.service?.name == 'apache_httpd'", "name": "httpd_pipeline" } }, { "pipeline": { "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'", "if": "ctx.service?.name == 'syslog'", "name": "syslog_pipeline" } }, { "fail": { "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message", "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'", "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`" } } ] }
Get pipeline usage statistics
editUse the node stats API to get global and per-pipeline ingest statistics. Use these stats to determine which pipelines run most frequently or spend the most time processing.
resp = client.nodes.stats( metric="ingest", filter_path="nodes.*.ingest", ) print(resp)
response = client.nodes.stats( metric: 'ingest', filter_path: 'nodes.*.ingest' ) puts response
const response = await client.nodes.stats({ metric: "ingest", filter_path: "nodes.*.ingest", }); console.log(response);
GET _nodes/stats/ingest?filter_path=nodes.*.ingest
On this page
- Prerequisites
- Create and manage pipelines
- Manage pipeline versions
- Test a pipeline
- Add a pipeline to an indexing request
- Set a default pipeline
- Set a final pipeline
- Pipelines for Beats
- Pipelines for Fleet and Elastic Agent
- Pipelines for search indices
- Access source fields in a processor
- Access metadata fields in a processor
- Access ingest metadata in a processor
- Handling pipeline failures
- Conditionally run a processor
- Conditionally apply pipelines
- Get pipeline usage statistics