New

The executive guide to generative AI

Read more

Handling Failures in Pipelines

edit

Handling Failures in Pipelines

edit

In its simplest use case, a pipeline defines a list of processors that are executed sequentially, and processing halts at the first exception. This behavior may not be desirable when failures are expected. For example, you may have logs that don’t match the specified grok expression. Instead of halting execution, you may want to index such documents into a separate index.

To enable this behavior, you can use the on_failure parameter. The on_failure parameter defines a list of processors to be executed immediately following the failed processor. You can specify this parameter at the pipeline level, as well as at the processor level. If a processor specifies an on_failure configuration, whether it is empty or not, any exceptions that are thrown by the processor are caught, and the pipeline continues executing the remaining processors. Because you can define further processors within the scope of an on_failure statement, you can nest failure handling.

The following example defines a pipeline that renames the foo field in the processed document to bar. If the document does not contain the foo field, the processor attaches an error message to the document for later analysis within Elasticsearch.

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "on_failure" : [
          {
            "set" : {
              "field" : "error.message",
              "value" : "field \"foo\" does not exist, cannot rename to \"bar\""
            }
          }
        ]
      }
    }
  ]
}

The following example defines an on_failure block on a whole pipeline to change the index to which failed documents get sent.

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [ ... ],
  "on_failure" : [
    {
      "set" : {
        "field" : "_index",
        "value" : "failed-{{ _index }}"
      }
    }
  ]
}

Alternatively instead of defining behaviour in case of processor failure, it is also possible to ignore a failure and continue with the next processor by specifying the ignore_failure setting.

In case in the example below the field foo doesn’t exist the failure will be caught and the pipeline continues to execute, which in this case means that the pipeline does nothing.

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "ignore_failure" : true
      }
    }
  ]
}

The ignore_failure can be set on any processor and defaults to false.

Accessing Error Metadata From Processors Handling Exceptions

edit

You may want to retrieve the actual error message that was thrown by a failed processor. To do so you can access metadata fields called on_failure_message, on_failure_processor_type, on_failure_processor_tag and on_failure_pipeline (in case an error occurred inside a pipeline processor). These fields are only accessible from within the context of an on_failure block.

Here is an updated version of the example that you saw earlier. But instead of setting the error message manually, the example leverages the on_failure_message metadata field to provide the error message.

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "to" : "bar",
        "on_failure" : [
          {
            "set" : {
              "field" : "error.message",
              "value" : "{{ _ingest.on_failure_message }}"
            }
          }
        ]
      }
    }
  ]
}
Was this helpful?
Feedback