Watcher transform context
editWatcher transform context
editUse a Painless script as a watch transform to transform a payload into a new payload for further use in the watch. Transform scripts return an Object value of the new payload.
The following variables are available in all watcher contexts.
Variables
-
params
(Map
, read-only) - User-defined parameters passed in as part of the query.
-
ctx['watch_id']
(String
, read-only) - The id of the watch.
-
ctx['id']
(String
, read-only) - The server generated unique identifier for the run watch.
-
ctx['metadata']
(Map
, read-only) - Metadata can be added to the top level of the watch definition. This is user defined and is typically used to consolidate duplicate values in a watch.
-
ctx['execution_time']
(ZonedDateTime
, read-only) - The time the watch began execution.
-
ctx['trigger']['scheduled_time']
(ZonedDateTime
, read-only) - The scheduled trigger time for the watch. This is the time the watch should be executed.
-
ctx['trigger']['triggered_time']
(ZonedDateTime
, read-only) - The actual trigger time for the watch. This is the time the watch was triggered for execution.
-
ctx['payload']
(Map
, read-only) - The accessible watch data based upon the watch input.
Return
-
Object
- The new payload.
API
The standard Painless API is available.
Example
To run the examples, first follow the steps in context examples.
POST _watcher/watch/_execute { "watch" : { "trigger" : { "schedule" : { "interval" : "24h" } }, "input" : { "search" : { "request" : { "indices" : [ "seats" ], "body" : { "query" : { "term": { "sold": "true"} }, "aggs" : { "theatres" : { "terms" : { "field" : "play" }, "aggs" : { "money" : { "sum": { "field" : "cost" } } } } } } } } }, "transform" : { "script": """ return [ 'money_makers': ctx.payload.aggregations.theatres.buckets.stream() .filter(t -> { return t.money.value > 50000 }) .map(t -> { return ['play': t.key, 'total_value': t.money.value ] }).collect(Collectors.toList()), 'duds' : ctx.payload.aggregations.theatres.buckets.stream() .filter(t -> { return t.money.value < 15000 }) .map(t -> { return ['play': t.key, 'total_value': t.money.value ] }).collect(Collectors.toList()) ] """ }, "actions" : { "my_log" : { "logging" : { "text" : "The output of the payload was transformed to {{ctx.payload}}" } } } } }
The Java Stream API is used in the transform. This API allows manipulation of the elements of the list in a pipeline. |
|
The stream filter removes items that do not meet the filter criteria. |
|
The stream map transforms each element into a new object. |
|
The collector reduces the stream to a |
|
This is done again for the second set of values in the transform. |
The following action transform changes each value in the mod_log action into a String
.
This transform does not change the values in the unmod_log action.
POST _watcher/watch/_execute { "watch" : { "trigger" : { "schedule" : { "interval" : "24h" } }, "input" : { "search" : { "request" : { "indices" : [ "seats" ], "body" : { "query" : { "term": { "sold": "true"} }, "aggs" : { "theatres" : { "terms" : { "field" : "play" }, "aggs" : { "money" : { "sum": { "field" : "cost" } } } } } } } } }, "actions" : { "mod_log" : { "transform": { "script" : """ def formatter = NumberFormat.getCurrencyInstance(); return [ 'msg': ctx.payload.aggregations.theatres.buckets.stream() .map(t-> formatter.format(t.money.value) + ' for the play ' + t.key) .collect(Collectors.joining(", ")) ] """ }, "logging" : { "text" : "The output of the payload was transformed to: {{ctx.payload.msg}}" } }, "unmod_log" : { "logging" : { "text" : "The output of the payload was not transformed and this value should not exist: {{ctx.payload.msg}}" } } } } }
This example uses the streaming API in a very similar manner. The differences below are subtle and worth calling out.
The location of the transform is no longer at the top level, but is within an individual action. |
|
A second action that does not transform the payload is given for reference. |
The following example shows scripted watch and action transforms within the context of a complete watch. This watch also uses a scripted condition.
POST _watcher/watch/_execute { "watch" : { "metadata" : { "high_threshold": 4000, "low_threshold": 1000 }, "trigger" : { "schedule" : { "interval" : "24h" } }, "input" : { "search" : { "request" : { "indices" : [ "seats" ], "body" : { "query" : { "term": { "sold": "true"} }, "aggs" : { "theatres" : { "terms" : { "field" : "play" }, "aggs" : { "money" : { "sum": { "field" : "cost", "script": { "source": "doc.cost.value * doc.number.value" } } } } } } } } } }, "condition" : { "script" : """ return ctx.payload.aggregations.theatres.buckets.stream() .anyMatch(theatre -> theatre.money.value < ctx.metadata.low_threshold || theatre.money.value > ctx.metadata.high_threshold) """ }, "transform" : { "script": """ return [ 'money_makers': ctx.payload.aggregations.theatres.buckets.stream() .filter(t -> { return t.money.value > ctx.metadata.high_threshold }) .map(t -> { return ['play': t.key, 'total_value': t.money.value ] }).collect(Collectors.toList()), 'duds' : ctx.payload.aggregations.theatres.buckets.stream() .filter(t -> { return t.money.value < ctx.metadata.low_threshold }) .map(t -> { return ['play': t.key, 'total_value': t.money.value ] }).collect(Collectors.toList()) ] """ }, "actions" : { "log_money_makers" : { "condition": { "script" : "return ctx.payload.money_makers.size() > 0" }, "transform": { "script" : """ def formatter = NumberFormat.getCurrencyInstance(); return [ 'plays_value': ctx.payload.money_makers.stream() .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play) .collect(Collectors.joining(", ")) ] """ }, "logging" : { "text" : "The following plays contain the highest grossing total income: {{ctx.payload.plays_value}}" } }, "log_duds" : { "condition": { "script" : "return ctx.payload.duds.size() > 0" }, "transform": { "script" : """ def formatter = NumberFormat.getCurrencyInstance(); return [ 'plays_value': ctx.payload.duds.stream() .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play) .collect(Collectors.joining(", ")) ] """ }, "logging" : { "text" : "The following plays need more advertising due to their low total income: {{ctx.payload.plays_value}}" } } } } }
The following example shows the use of metadata and transforming dates into a readable format.
POST _watcher/watch/_execute { "watch" : { "metadata" : { "min_hits": 10 }, "trigger" : { "schedule" : { "interval" : "24h" } }, "input" : { "search" : { "request" : { "indices" : [ "seats" ], "body" : { "query" : { "term": { "sold": "true"} }, "aggs" : { "theatres" : { "terms" : { "field" : "play" }, "aggs" : { "money" : { "sum": { "field" : "cost" } } } } } } } } }, "condition" : { "script" : """ return ctx.payload.hits.total > ctx.metadata.min_hits """ }, "transform" : { "script" : """ def theDate = ZonedDateTime.ofInstant(ctx.execution_time.toInstant(), ctx.execution_time.getZone()); return ['human_date': DateTimeFormatter.RFC_1123_DATE_TIME.format(theDate), 'aggregations': ctx.payload.aggregations] """ }, "actions" : { "my_log" : { "logging" : { "text" : "The watch was successfully executed on {{ctx.payload.human_date}} and contained {{ctx.payload.aggregations.theatres.buckets.size}} buckets" } } } } }