Watcher transform context

edit

Use a Painless script as a watch transform to transform a payload into a new payload for further use in the watch. Transform scripts return an Object value of the new payload.

The following variables are available in all watcher contexts.

Variables

params (Map, read-only)
User-defined parameters passed in as part of the query.
ctx['watch_id'] (String, read-only)
The id of the watch.
ctx['id'] (String, read-only)
The server generated unique identifier for the run watch.
ctx['metadata'] (Map, read-only)
Metadata can be added to the top level of the watch definition. This is user defined and is typically used to consolidate duplicate values in a watch.
ctx['execution_time'] (ZonedDateTime, read-only)
The time the watch began execution.
ctx['trigger']['scheduled_time'] (ZonedDateTime, read-only)
The scheduled trigger time for the watch. This is the time the watch should be executed.
ctx['trigger']['triggered_time'] (ZonedDateTime, read-only)
The actual trigger time for the watch. This is the time the watch was triggered for execution.
ctx['payload'] (Map, read-only)
The accessible watch data based upon the watch input.

Return

Object
The new payload.

API

The standard Painless API is available.

Example

To run the examples, first follow the steps in context examples.

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : { "term": { "sold": "true"} },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "transform" : {
      "script":
      """
        return [
          'money_makers': ctx.payload.aggregations.theatres.buckets.stream()  
            .filter(t -> {                                                    
                return t.money.value > 50000
            })
            .map(t -> {                                                       
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList()),                                  
          'duds' : ctx.payload.aggregations.theatres.buckets.stream()         
            .filter(t -> {
                return t.money.value < 15000
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList())
          ]
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The output of the payload was transformed to {{ctx.payload}}"
        }
      }
    }
  }
}

The Java Stream API is used in the transform. This API allows manipulation of the elements of the list in a pipeline.

The stream filter removes items that do not meet the filter criteria.

The stream map transforms each element into a new object.

The collector reduces the stream to a java.util.List.

This is done again for the second set of values in the transform.

The following action transform changes each value in the mod_log action into a String. This transform does not change the values in the unmod_log action.

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "actions" : {
      "mod_log" : {
        "transform": {                                                                
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'msg': ctx.payload.aggregations.theatres.buckets.stream()
              .map(t-> formatter.format(t.money.value) + ' for the play ' + t.key)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The output of the payload was transformed to: {{ctx.payload.msg}}"
        }
      },
      "unmod_log" : {                                                                 
        "logging" : {
          "text" : "The output of the payload was not transformed and this value should not exist: {{ctx.payload.msg}}"
        }
      }
    }
  }
}

This example uses the streaming API in a very similar manner. The differences below are subtle and worth calling out.

The location of the transform is no longer at the top level, but is within an individual action.

A second action that does not transform the payload is given for reference.

The following example shows scripted watch and action transforms within the context of a complete watch. This watch also uses a scripted condition.

POST _watcher/watch/_execute
{
  "watch" : {
    "metadata" : { "high_threshold": 4000, "low_threshold": 1000 },
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": {
                      "field" : "cost",
                      "script": {
                       "source": "doc.cost.value * doc.number.value"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.aggregations.theatres.buckets.stream()
          .anyMatch(theatre -> theatre.money.value < ctx.metadata.low_threshold ||
                               theatre.money.value > ctx.metadata.high_threshold)
      """
    },
    "transform" : {
      "script":
      """
        return [
          'money_makers': ctx.payload.aggregations.theatres.buckets.stream()
            .filter(t -> {
                return t.money.value > ctx.metadata.high_threshold
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList()),
          'duds' : ctx.payload.aggregations.theatres.buckets.stream()
            .filter(t -> {
                return t.money.value < ctx.metadata.low_threshold
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList())
          ]
      """
    },
    "actions" : {
      "log_money_makers" : {
        "condition": {
          "script" : "return ctx.payload.money_makers.size() > 0"
        },
        "transform": {
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'plays_value': ctx.payload.money_makers.stream()
              .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The following plays contain the highest grossing total income: {{ctx.payload.plays_value}}"
        }
      },
      "log_duds" : {
        "condition": {
          "script" : "return ctx.payload.duds.size() > 0"
        },
        "transform": {
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'plays_value': ctx.payload.duds.stream()
              .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The following plays need more advertising due to their low total income: {{ctx.payload.plays_value}}"
        }
      }
    }
  }
}

The following example shows the use of metadata and transforming dates into a readable format.

POST _watcher/watch/_execute
{
  "watch" : {
    "metadata" : { "min_hits": 10 },
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.hits.total > ctx.metadata.min_hits
      """
    },
    "transform" : {
      "script" :
      """
        def theDate = ZonedDateTime.ofInstant(ctx.execution_time.toInstant(), ctx.execution_time.getZone());
        return ['human_date': DateTimeFormatter.RFC_1123_DATE_TIME.format(theDate),
                'aggregations': ctx.payload.aggregations]
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The watch was successfully executed on {{ctx.payload.human_date}} and contained {{ctx.payload.aggregations.theatres.buckets.size}} buckets"
        }
      }
    }
  }
}