Create transform API

edit

Instantiates a transform.

Request

edit

PUT _transform/<transform_id>

Prerequisites

edit

Requires the following privileges:

  • cluster: manage_transform (the transform_admin built-in role grants this privilege)
  • source indices: read, view_index_metadata
  • destination index: read, create_index, index. If a retention_policy is configured, the delete privilege is also required.

Description

edit

This API defines a transform, which copies data from source indices, transforms it, and persists it into an entity-centric destination index. If you choose to use the pivot method for your transform, the entities are defined by the set of group_by fields in the pivot object. If you choose to use the latest method, the entities are defined by the unique_key field values in the latest object.

You can also think of the destination index as a two-dimensional tabular data structure (known as a data frame). The ID for each document in the data frame is generated from a hash of the entity, so there is a unique row per entity. For more information, see Transforming data.

When the transform is created, a series of validations occur to ensure its success. For example, there is a check for the existence of the source indices and a check that the destination index is not part of the source index pattern. You can use the defer_validation parameter to skip these checks.

Deferred validations are always run when the transform is started, with the exception of privilege checks.

  • The transform remembers which roles the user that created it had at the time of creation and uses those same roles. If those roles do not have the required privileges on the source and destination indices, the transform fails when it attempts unauthorized operations. If you provide secondary authorization headers, those credentials are used instead.
  • You must use Kibana or this API to create a transform. Do not add a transform directly into any .transform-internal* indices using the Elasticsearch index API. If Elasticsearch security features are enabled, do not give users any privileges on .transform-internal* indices. If you used transforms prior to 7.5, also do not give users any privileges on .data-frame-internal* indices.

You must choose either the latest or pivot method for your transform; you cannot use both in a single transform.

Path parameters

edit
<transform_id>
(Required, string) Identifier for the transform. This identifier can contain lowercase alphanumeric characters (a-z and 0-9), hyphens, and underscores. It has a 64 character limit and must start and end with alphanumeric characters.

Query parameters

edit
defer_validation
(Optional, Boolean) When true, deferrable validations are not run. This behavior may be desired if the source index does not exist until after the transform is created.
timeout
(Optional, time) Period to wait for a response. If no response is received before the timeout expires, the request fails and returns an error. Defaults to 30s.

Request body

edit
description
(Optional, string) Free text description of the transform.
dest

(Required, object) The destination for the transform.

Properties of dest
index
(Required, string) The destination index for the transform.

In the case of a pivot transform, the mappings of the destination index are deduced based on the source fields when possible. If alternate mappings are required, use the Create index API prior to starting the transform.

In the case of a latest transform, the mappings are never deduced. If dynamic mappings for the destination index are undesirable, use the Create index API prior to starting the transform.

aliases
(Optional, array of objects) The aliases that the destination index for the transform should have. Aliases are manipulated using the stored credentials of the transform, which means the secondary credentials supplied at creation time (if both primary and secondary credentials are specified).

The destination index is added to the aliases regardless whether the destination index was created by the transform or pre-created by the user.

+ .Properties of aliases

Details
alias
(Required, string) The name of the alias.
move_on_creation
(Optional, boolean) Whether or not the destination index should be the only index in this alias. If true, all the other indices will be removed from this alias before adding the destination index to this alias. Defaults to false.
pipeline
(Optional, string) The unique identifier for an ingest pipeline.
frequency
(Optional, time units) The interval between checks for changes in the source indices when the transform is running continuously. The minimum value is 1s and the maximum is 1h. The default value is 1m.
latest

(Required*, object) The latest method transforms the data by finding the latest document for each unique key.

Properties of latest
sort
(Required, string) Specifies the date field that is used to identify the latest documents.
unique_key
(Required, array of strings) Specifies an array of one or more fields that are used to group the data.
_meta
(Optional, object) Defines optional transform metadata.
pivot

(Required*, object) The pivot method transforms the data by aggregating and grouping it. These objects define the group by fields and the aggregation to reduce the data.

Properties of pivot
aggregations or aggs

(Required, object) Defines how to aggregate the grouped data. The following aggregations are currently supported:

group_by

(Required, object) Defines how to group the data. More than one grouping can be defined per pivot. The following groupings are currently supported:

The grouping properties can optionally have a missing_bucket property. If it’s true, documents without a value in the respective group_by field are included. Defaults to false.

retention_policy

(Optional, object) Defines a retention policy for the transform. Data that meets the defined criteria is deleted from the destination index.

Properties of retention_policy
time

(Required, object) Specifies that the transform uses a time field to set the retention policy. Data is deleted if time.field for the retention policy exists and contains data older than max.age.

Properties of time
field
(Required, string) The date field that is used to calculate the age of the document. Set time.field to an existing date field.
max_age
(Required, time units) Specifies the maximum age of a document in the destination index. Documents that are older than the configured value are removed from the destination index.
settings

(Optional, object) Defines optional transform settings.

Properties of settings
align_checkpoints
(Optional, boolean) Specifies whether the transform checkpoint ranges should be optimized for performance. Such optimization can align checkpoint ranges with date histogram interval when date histogram is specified as a group source in the transform config. As an effect, less document updates in the destination index will be performed thus improving overall performance. The default value is true, which means the checkpoint ranges will be optimized if possible.
dates_as_epoch_millis
(Optional, boolean) Defines if dates in the output should be written as ISO formatted string (default) or as millis since epoch. epoch_millis has been the default for transforms created before version 7.11. For compatible output set this to true. The default value is false.
deduce_mappings
(Optional, boolean) Specifies whether the transform should deduce the destination index mappings from the transform config. The default value is true, which means the destination index mappings will be deduced if possible.
docs_per_second
(Optional, float) Specifies a limit on the number of input documents per second. This setting throttles the transform by adding a wait time between search requests. The default value is null, which disables throttling.
max_page_search_size
(Optional, integer) Defines the initial page size to use for the composite aggregation for each checkpoint. If circuit breaker exceptions occur, the page size is dynamically adjusted to a lower value. The minimum value is 10 and the maximum is 65,536. The default value is 500.
num_failure_retries
(Optional, integer) Defines the number of retries on a recoverable failure before the transform task is marked as failed. The minimum value is 0 and the maximum is 100. -1 can be used to denote infinity. In this case, the transform never gives up on retrying a recoverable failure. The default value is the cluster-level setting num_transform_failure_retries.
unattended
(Optional, boolean) If true, the transform runs in unattended mode. In unattended mode, the transform retries indefinitely in case of an error which means the transform never fails. Setting the number of retries other than infinite fails in validation. Defaults to false.
source

(Required, object) The source of the data for the transform.

Properties of source
index

(Required, string or array) The source indices for the transform. It can be a single index, an index pattern (for example, "my-index-*"), an array of indices (for example, ["my-index-000001", "my-index-000002"]), or an array of index patterns (for example, ["my-index-*", "my-other-index-*"]. For remote indices use the syntax "remote_name:index_name".

If any indices are in remote clusters then the master node and at least one transform node must have the remote_cluster_client node role.

query
(Optional, object) A query clause that retrieves a subset of data from the source index. See Query DSL.
runtime_mappings
(Optional, object) Definitions of search-time runtime fields that can be used by the transform. For search runtime fields all data nodes, including remote nodes, must be 7.12 or later.
sync

(Optional, object) Defines the properties transforms require to run continuously.

Properties of sync
time

(Required, object) Specifies that the transform uses a time field to synchronize the source and destination indices.

Properties of time
delay
(Optional, time units) The time delay between the current time and the latest input data time. The default value is 60s.
field

(Required, string) The date field that is used to identify new documents in the source.

It is strongly recommended to use a field that contains the ingest timestamp. If you use a different field, you might need to set the delay such that it accounts for data transmission delays.

Examples

edit

The following transform uses the pivot method:

resp = client.transform.put_transform(
    transform_id="ecommerce_transform1",
    source={
        "index": "kibana_sample_data_ecommerce",
        "query": {
            "term": {
                "geoip.continent_name": {
                    "value": "Asia"
                }
            }
        }
    },
    pivot={
        "group_by": {
            "customer_id": {
                "terms": {
                    "field": "customer_id",
                    "missing_bucket": True
                }
            }
        },
        "aggregations": {
            "max_price": {
                "max": {
                    "field": "taxful_total_price"
                }
            }
        }
    },
    description="Maximum priced ecommerce data by customer_id in Asia",
    dest={
        "index": "kibana_sample_data_ecommerce_transform1",
        "pipeline": "add_timestamp_pipeline"
    },
    frequency="5m",
    sync={
        "time": {
            "field": "order_date",
            "delay": "60s"
        }
    },
    retention_policy={
        "time": {
            "field": "order_date",
            "max_age": "30d"
        }
    },
)
print(resp)
const response = await client.transform.putTransform({
  transform_id: "ecommerce_transform1",
  source: {
    index: "kibana_sample_data_ecommerce",
    query: {
      term: {
        "geoip.continent_name": {
          value: "Asia",
        },
      },
    },
  },
  pivot: {
    group_by: {
      customer_id: {
        terms: {
          field: "customer_id",
          missing_bucket: true,
        },
      },
    },
    aggregations: {
      max_price: {
        max: {
          field: "taxful_total_price",
        },
      },
    },
  },
  description: "Maximum priced ecommerce data by customer_id in Asia",
  dest: {
    index: "kibana_sample_data_ecommerce_transform1",
    pipeline: "add_timestamp_pipeline",
  },
  frequency: "5m",
  sync: {
    time: {
      field: "order_date",
      delay: "60s",
    },
  },
  retention_policy: {
    time: {
      field: "order_date",
      max_age: "30d",
    },
  },
});
console.log(response);
PUT _transform/ecommerce_transform1
{
  "source": {
    "index": "kibana_sample_data_ecommerce",
    "query": {
      "term": {
        "geoip.continent_name": {
          "value": "Asia"
        }
      }
    }
  },
  "pivot": {
    "group_by": {
      "customer_id": {
        "terms": {
          "field": "customer_id",
          "missing_bucket": true
        }
      }
    },
    "aggregations": {
      "max_price": {
        "max": {
          "field": "taxful_total_price"
        }
      }
    }
  },
  "description": "Maximum priced ecommerce data by customer_id in Asia",
  "dest": {
    "index": "kibana_sample_data_ecommerce_transform1",
    "pipeline": "add_timestamp_pipeline"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "order_date",
      "delay": "60s"
    }
  },
  "retention_policy": {
    "time": {
      "field": "order_date",
      "max_age": "30d"
    }
  }
}

When the transform is created, you receive the following results:

{
  "acknowledged" : true
}

The following transform uses the latest method:

resp = client.transform.put_transform(
    transform_id="ecommerce_transform2",
    source={
        "index": "kibana_sample_data_ecommerce"
    },
    latest={
        "unique_key": [
            "customer_id"
        ],
        "sort": "order_date"
    },
    description="Latest order for each customer",
    dest={
        "index": "kibana_sample_data_ecommerce_transform2"
    },
    frequency="5m",
    sync={
        "time": {
            "field": "order_date",
            "delay": "60s"
        }
    },
)
print(resp)
const response = await client.transform.putTransform({
  transform_id: "ecommerce_transform2",
  source: {
    index: "kibana_sample_data_ecommerce",
  },
  latest: {
    unique_key: ["customer_id"],
    sort: "order_date",
  },
  description: "Latest order for each customer",
  dest: {
    index: "kibana_sample_data_ecommerce_transform2",
  },
  frequency: "5m",
  sync: {
    time: {
      field: "order_date",
      delay: "60s",
    },
  },
});
console.log(response);
PUT _transform/ecommerce_transform2
{
  "source": {
    "index": "kibana_sample_data_ecommerce"
  },
  "latest": {
    "unique_key": ["customer_id"],
    "sort": "order_date"
  },
  "description": "Latest order for each customer",
  "dest": {
    "index": "kibana_sample_data_ecommerce_transform2"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "order_date",
      "delay": "60s"
    }
  }
}