Use a data stream

edit

After you set up a data stream, you can do the following:

Add documents to a data stream

edit

To add an individual document, use the index API. Ingest pipelines are supported.

resp = client.index(
    index="my-data-stream",
    document={
        "@timestamp": "2099-03-08T11:06:07.000Z",
        "user": {
            "id": "8a4f500d"
        },
        "message": "Login successful"
    },
)
print(resp)
response = client.index(
  index: 'my-data-stream',
  body: {
    "@timestamp": '2099-03-08T11:06:07.000Z',
    user: {
      id: '8a4f500d'
    },
    message: 'Login successful'
  }
)
puts response
const response = await client.index({
  index: "my-data-stream",
  document: {
    "@timestamp": "2099-03-08T11:06:07.000Z",
    user: {
      id: "8a4f500d",
    },
    message: "Login successful",
  },
});
console.log(response);
POST /my-data-stream/_doc/
{
  "@timestamp": "2099-03-08T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

You cannot add new documents to a data stream using the index API’s PUT /<target>/_doc/<_id> request format. To specify a document ID, use the PUT /<target>/_create/<_id> format instead. Only an op_type of create is supported.

To add multiple documents with a single request, use the bulk API. Only create actions are supported.

resp = client.bulk(
    index="my-data-stream",
    refresh=True,
    operations=[
        {
            "create": {}
        },
        {
            "@timestamp": "2099-03-08T11:04:05.000Z",
            "user": {
                "id": "vlb44hny"
            },
            "message": "Login attempt failed"
        },
        {
            "create": {}
        },
        {
            "@timestamp": "2099-03-08T11:06:07.000Z",
            "user": {
                "id": "8a4f500d"
            },
            "message": "Login successful"
        },
        {
            "create": {}
        },
        {
            "@timestamp": "2099-03-09T11:07:08.000Z",
            "user": {
                "id": "l7gk7f82"
            },
            "message": "Logout successful"
        }
    ],
)
print(resp)
response = client.bulk(
  index: 'my-data-stream',
  refresh: true,
  body: [
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-08T11:04:05.000Z',
      user: {
        id: 'vlb44hny'
      },
      message: 'Login attempt failed'
    },
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-08T11:06:07.000Z',
      user: {
        id: '8a4f500d'
      },
      message: 'Login successful'
    },
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-09T11:07:08.000Z',
      user: {
        id: 'l7gk7f82'
      },
      message: 'Logout successful'
    }
  ]
)
puts response
const response = await client.bulk({
  index: "my-data-stream",
  refresh: "true",
  operations: [
    {
      create: {},
    },
    {
      "@timestamp": "2099-03-08T11:04:05.000Z",
      user: {
        id: "vlb44hny",
      },
      message: "Login attempt failed",
    },
    {
      create: {},
    },
    {
      "@timestamp": "2099-03-08T11:06:07.000Z",
      user: {
        id: "8a4f500d",
      },
      message: "Login successful",
    },
    {
      create: {},
    },
    {
      "@timestamp": "2099-03-09T11:07:08.000Z",
      user: {
        id: "l7gk7f82",
      },
      message: "Logout successful",
    },
  ],
});
console.log(response);
PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

Search a data stream

edit

The following search APIs support data streams:

Get statistics for a data stream

edit

Use the data stream stats API to get statistics for one or more data streams:

resp = client.indices.data_streams_stats(
    name="my-data-stream",
    human=True,
)
print(resp)
response = client.indices.data_streams_stats(
  name: 'my-data-stream',
  human: true
)
puts response
const response = await client.indices.dataStreamsStats({
  name: "my-data-stream",
  human: "true",
});
console.log(response);
GET /_data_stream/my-data-stream/_stats?human=true

Manually roll over a data stream

edit

Use the rollover API to manually roll over a data stream. You have two options when manually rolling over:

  1. To immediately trigger a rollover:

    resp = client.indices.rollover(
        alias="my-data-stream",
    )
    print(resp)
    response = client.indices.rollover(
      alias: 'my-data-stream'
    )
    puts response
    const response = await client.indices.rollover({
      alias: "my-data-stream",
    });
    console.log(response);
    POST /my-data-stream/_rollover/
  2. Or to postpone the rollover until the next indexing event occurs:

    resp = client.indices.rollover(
        alias="my-data-stream",
        lazy=True,
    )
    print(resp)
    response = client.indices.rollover(
      alias: 'my-data-stream',
      lazy: true
    )
    puts response
    const response = await client.indices.rollover({
      alias: "my-data-stream",
      lazy: "true",
    });
    console.log(response);
    POST /my-data-stream/_rollover?lazy

    Use the second to avoid having empty backing indices in data streams that do not get updated often.

Open closed backing indices

edit

You cannot search a closed backing index, even by searching its data stream. You also cannot update or delete documents in a closed index.

To re-open a closed backing index, submit an open index API request directly to the index:

resp = client.indices.open(
    index=".ds-my-data-stream-2099.03.07-000001",
)
print(resp)
response = client.indices.open(
  index: '.ds-my-data-stream-2099.03.07-000001'
)
puts response
const response = await client.indices.open({
  index: ".ds-my-data-stream-2099.03.07-000001",
});
console.log(response);
POST /.ds-my-data-stream-2099.03.07-000001/_open/

To re-open all closed backing indices for a data stream, submit an open index API request to the stream:

resp = client.indices.open(
    index="my-data-stream",
)
print(resp)
response = client.indices.open(
  index: 'my-data-stream'
)
puts response
const response = await client.indices.open({
  index: "my-data-stream",
});
console.log(response);
POST /my-data-stream/_open/

Reindex with a data stream

edit

Use the reindex API to copy documents from an existing index, alias, or data stream to a data stream. Because data streams are append-only, a reindex into a data stream must use an op_type of create. A reindex cannot update existing documents in a data stream.

resp = client.reindex(
    source={
        "index": "archive"
    },
    dest={
        "index": "my-data-stream",
        "op_type": "create"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'archive'
    },
    dest: {
      index: 'my-data-stream',
      op_type: 'create'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: "archive",
  },
  dest: {
    index: "my-data-stream",
    op_type: "create",
  },
});
console.log(response);
POST /_reindex
{
  "source": {
    "index": "archive"
  },
  "dest": {
    "index": "my-data-stream",
    "op_type": "create"
  }
}

Update documents in a data stream by query

edit

Use the update by query API to update documents in a data stream that match a provided query:

resp = client.update_by_query(
    index="my-data-stream",
    query={
        "match": {
            "user.id": "l7gk7f82"
        }
    },
    script={
        "source": "ctx._source.user.id = params.new_id",
        "params": {
            "new_id": "XgdX0NoX"
        }
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-data-stream',
  body: {
    query: {
      match: {
        'user.id' => 'l7gk7f82'
      }
    },
    script: {
      source: 'ctx._source.user.id = params.new_id',
      params: {
        new_id: 'XgdX0NoX'
      }
    }
  }
)
puts response
const response = await client.updateByQuery({
  index: "my-data-stream",
  query: {
    match: {
      "user.id": "l7gk7f82",
    },
  },
  script: {
    source: "ctx._source.user.id = params.new_id",
    params: {
      new_id: "XgdX0NoX",
    },
  },
});
console.log(response);
POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
    }
  },
  "script": {
    "source": "ctx._source.user.id = params.new_id",
    "params": {
      "new_id": "XgdX0NoX"
    }
  }
}

Delete documents in a data stream by query

edit

Use the delete by query API to delete documents in a data stream that match a provided query:

resp = client.delete_by_query(
    index="my-data-stream",
    query={
        "match": {
            "user.id": "vlb44hny"
        }
    },
)
print(resp)
response = client.delete_by_query(
  index: 'my-data-stream',
  body: {
    query: {
      match: {
        'user.id' => 'vlb44hny'
      }
    }
  }
)
puts response
const response = await client.deleteByQuery({
  index: "my-data-stream",
  query: {
    match: {
      "user.id": "vlb44hny",
    },
  },
});
console.log(response);
POST /my-data-stream/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

Update or delete documents in a backing index

edit

If needed, you can update or delete documents in a data stream by sending requests to the backing index containing the document. You’ll need:

To get this information, use a search request:

resp = client.search(
    index="my-data-stream",
    seq_no_primary_term=True,
    query={
        "match": {
            "user.id": "yWIumJd7"
        }
    },
)
print(resp)
response = client.search(
  index: 'my-data-stream',
  body: {
    seq_no_primary_term: true,
    query: {
      match: {
        'user.id' => 'yWIumJd7'
      }
    }
  }
)
puts response
const response = await client.search({
  index: "my-data-stream",
  seq_no_primary_term: true,
  query: {
    match: {
      "user.id": "yWIumJd7",
    },
  },
});
console.log(response);
GET /my-data-stream/_search
{
  "seq_no_primary_term": true,
  "query": {
    "match": {
      "user.id": "yWIumJd7"
    }
  }
}

Response:

{
  "took": 20,
  "timed_out": false,
  "_shards": {
    "total": 3,
    "successful": 3,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 0.2876821,
    "hits": [
      {
        "_index": ".ds-my-data-stream-2099.03.08-000003",      
        "_id": "bfspvnIBr7VVZlfp2lqX",              
        "_seq_no": 0,                               
        "_primary_term": 1,                         
        "_score": 0.2876821,
        "_source": {
          "@timestamp": "2099-03-08T11:06:07.000Z",
          "user": {
            "id": "yWIumJd7"
          },
          "message": "Login successful"
        }
      }
    ]
  }
}

Backing index containing the matching document

Document ID for the document

Current sequence number for the document

Primary term for the document

To update the document, use an index API request with valid if_seq_no and if_primary_term arguments:

resp = client.index(
    index=".ds-my-data-stream-2099-03-08-000003",
    id="bfspvnIBr7VVZlfp2lqX",
    if_seq_no="0",
    if_primary_term="1",
    document={
        "@timestamp": "2099-03-08T11:06:07.000Z",
        "user": {
            "id": "8a4f500d"
        },
        "message": "Login successful"
    },
)
print(resp)
const response = await client.index({
  index: ".ds-my-data-stream-2099-03-08-000003",
  id: "bfspvnIBr7VVZlfp2lqX",
  if_seq_no: 0,
  if_primary_term: 1,
  document: {
    "@timestamp": "2099-03-08T11:06:07.000Z",
    user: {
      id: "8a4f500d",
    },
    message: "Login successful",
  },
});
console.log(response);
PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
  "@timestamp": "2099-03-08T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

To delete the document, use the delete API:

resp = client.delete(
    index=".ds-my-data-stream-2099.03.08-000003",
    id="bfspvnIBr7VVZlfp2lqX",
)
print(resp)
response = client.delete(
  index: '.ds-my-data-stream-2099.03.08-000003',
  id: 'bfspvnIBr7VVZlfp2lqX'
)
puts response
const response = await client.delete({
  index: ".ds-my-data-stream-2099.03.08-000003",
  id: "bfspvnIBr7VVZlfp2lqX",
});
console.log(response);
DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX

To delete or update multiple documents with a single request, use the bulk API's delete, index, and update actions. For index actions, include valid if_seq_no and if_primary_term arguments.

resp = client.bulk(
    refresh=True,
    operations=[
        {
            "index": {
                "_index": ".ds-my-data-stream-2099.03.08-000003",
                "_id": "bfspvnIBr7VVZlfp2lqX",
                "if_seq_no": 0,
                "if_primary_term": 1
            }
        },
        {
            "@timestamp": "2099-03-08T11:06:07.000Z",
            "user": {
                "id": "8a4f500d"
            },
            "message": "Login successful"
        }
    ],
)
print(resp)
response = client.bulk(
  refresh: true,
  body: [
    {
      index: {
        _index: '.ds-my-data-stream-2099.03.08-000003',
        _id: 'bfspvnIBr7VVZlfp2lqX',
        if_seq_no: 0,
        if_primary_term: 1
      }
    },
    {
      "@timestamp": '2099-03-08T11:06:07.000Z',
      user: {
        id: '8a4f500d'
      },
      message: 'Login successful'
    }
  ]
)
puts response
const response = await client.bulk({
  refresh: "true",
  operations: [
    {
      index: {
        _index: ".ds-my-data-stream-2099.03.08-000003",
        _id: "bfspvnIBr7VVZlfp2lqX",
        if_seq_no: 0,
        if_primary_term: 1,
      },
    },
    {
      "@timestamp": "2099-03-08T11:06:07.000Z",
      user: {
        id: "8a4f500d",
      },
      message: "Login successful",
    },
  ],
});
console.log(response);
PUT /_bulk?refresh
{ "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }