Pipeline processor

edit

Executes another pipeline.

Table 34. Pipeline Options

Name Required Default Description

name

yes

-

The name of the pipeline to execute. Supports template snippets.

ignore_missing_pipeline

no

false

Whether to ignore missing pipelines instead of failing.

description

no

-

Description of the processor. Useful for describing the purpose of the processor or its configuration.

if

no

-

Conditionally execute the processor. See Conditionally run a processor.

ignore_failure

no

false

Ignore failures for the processor. See Handling pipeline failures.

on_failure

no

-

Handle failures for the processor. See Handling pipeline failures.

tag

no

-

Identifier for the processor. Useful for debugging and metrics.

{
  "pipeline": {
    "name": "inner-pipeline"
  }
}

The name of the current pipeline can be accessed from the _ingest.pipeline ingest metadata key.

An example of using this processor for nesting pipelines would be:

Define an inner pipeline:

resp = client.ingest.put_pipeline(
    id="pipelineA",
    description="inner pipeline",
    processors=[
        {
            "set": {
                "field": "inner_pipeline_set",
                "value": "inner"
            }
        }
    ],
)
print(resp)
response = client.ingest.put_pipeline(
  id: 'pipelineA',
  body: {
    description: 'inner pipeline',
    processors: [
      {
        set: {
          field: 'inner_pipeline_set',
          value: 'inner'
        }
      }
    ]
  }
)
puts response
const response = await client.ingest.putPipeline({
  id: "pipelineA",
  description: "inner pipeline",
  processors: [
    {
      set: {
        field: "inner_pipeline_set",
        value: "inner",
      },
    },
  ],
});
console.log(response);
PUT _ingest/pipeline/pipelineA
{
  "description" : "inner pipeline",
  "processors" : [
    {
      "set" : {
        "field": "inner_pipeline_set",
        "value": "inner"
      }
    }
  ]
}

Define another pipeline that uses the previously defined inner pipeline:

resp = client.ingest.put_pipeline(
    id="pipelineB",
    description="outer pipeline",
    processors=[
        {
            "pipeline": {
                "name": "pipelineA"
            }
        },
        {
            "set": {
                "field": "outer_pipeline_set",
                "value": "outer"
            }
        }
    ],
)
print(resp)
response = client.ingest.put_pipeline(
  id: 'pipelineB',
  body: {
    description: 'outer pipeline',
    processors: [
      {
        pipeline: {
          name: 'pipelineA'
        }
      },
      {
        set: {
          field: 'outer_pipeline_set',
          value: 'outer'
        }
      }
    ]
  }
)
puts response
const response = await client.ingest.putPipeline({
  id: "pipelineB",
  description: "outer pipeline",
  processors: [
    {
      pipeline: {
        name: "pipelineA",
      },
    },
    {
      set: {
        field: "outer_pipeline_set",
        value: "outer",
      },
    },
  ],
});
console.log(response);
PUT _ingest/pipeline/pipelineB
{
  "description" : "outer pipeline",
  "processors" : [
    {
      "pipeline" : {
        "name": "pipelineA"
      }
    },
    {
      "set" : {
        "field": "outer_pipeline_set",
        "value": "outer"
      }
    }
  ]
}

Now indexing a document while applying the outer pipeline will see the inner pipeline executed from the outer pipeline:

resp = client.index(
    index="my-index-000001",
    id="1",
    pipeline="pipelineB",
    document={
        "field": "value"
    },
)
print(resp)
response = client.index(
  index: 'my-index-000001',
  id: 1,
  pipeline: 'pipelineB',
  body: {
    field: 'value'
  }
)
puts response
const response = await client.index({
  index: "my-index-000001",
  id: 1,
  pipeline: "pipelineB",
  document: {
    field: "value",
  },
});
console.log(response);
PUT /my-index-000001/_doc/1?pipeline=pipelineB
{
  "field": "value"
}

Response from the index request:

{
  "_index": "my-index-000001",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 66,
  "_primary_term": 1
}

Indexed document:

{
  "field": "value",
  "inner_pipeline_set": "inner",
  "outer_pipeline_set": "outer"
}