Use a data stream
editUse a data stream
editAfter you set up a data stream, you can do the following:
- Add documents to a data stream
- Search a data stream
- Get statistics for a data stream
- Manually roll over a data stream
- Open closed backing indices
- Reindex with a data stream
- Update documents in a data stream by query
- Delete documents in a data stream by query
- Update or delete documents in a backing index
Add documents to a data stream
editTo add an individual document, use the index API. Ingest pipelines are supported.
resp = client.index( index="my-data-stream", document={ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }, ) print(resp)
response = client.index( index: 'my-data-stream', body: { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' } ) puts response
const response = await client.index({ index: "my-data-stream", document: { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, }); console.log(response);
POST /my-data-stream/_doc/ { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
You cannot add new documents to a data stream using the index API’s PUT
/<target>/_doc/<_id>
request format. To specify a document ID, use the PUT
/<target>/_create/<_id>
format instead. Only an
op_type
of create
is supported.
To add multiple documents with a single request, use the bulk API.
Only create
actions are supported.
resp = client.bulk( index="my-data-stream", refresh=True, operations=[ { "create": {} }, { "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }, { "create": {} }, { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }, { "create": {} }, { "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" } ], ) print(resp)
response = client.bulk( index: 'my-data-stream', refresh: true, body: [ { create: {} }, { "@timestamp": '2099-03-08T11:04:05.000Z', user: { id: 'vlb44hny' }, message: 'Login attempt failed' }, { create: {} }, { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' }, { create: {} }, { "@timestamp": '2099-03-09T11:07:08.000Z', user: { id: 'l7gk7f82' }, message: 'Logout successful' } ] ) puts response
const response = await client.bulk({ index: "my-data-stream", refresh: "true", operations: [ { create: {}, }, { "@timestamp": "2099-03-08T11:04:05.000Z", user: { id: "vlb44hny", }, message: "Login attempt failed", }, { create: {}, }, { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, { create: {}, }, { "@timestamp": "2099-03-09T11:07:08.000Z", user: { id: "l7gk7f82", }, message: "Logout successful", }, ], }); console.log(response);
PUT /my-data-stream/_bulk?refresh {"create":{ }} { "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" } {"create":{ }} { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } {"create":{ }} { "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
Search a data stream
editThe following search APIs support data streams:
Get statistics for a data stream
editUse the data stream stats API to get statistics for one or more data streams:
resp = client.indices.data_streams_stats( name="my-data-stream", human=True, ) print(resp)
response = client.indices.data_streams_stats( name: 'my-data-stream', human: true ) puts response
const response = await client.indices.dataStreamsStats({ name: "my-data-stream", human: "true", }); console.log(response);
GET /_data_stream/my-data-stream/_stats?human=true
Manually roll over a data stream
editUse the rollover API to manually roll over a data stream. You have two options when manually rolling over:
-
To immediately trigger a rollover:
resp = client.indices.rollover( alias="my-data-stream", ) print(resp)
response = client.indices.rollover( alias: 'my-data-stream' ) puts response
const response = await client.indices.rollover({ alias: "my-data-stream", }); console.log(response);
POST /my-data-stream/_rollover/
-
Or to postpone the rollover until the next indexing event occurs:
resp = client.indices.rollover( alias="my-data-stream", lazy=True, ) print(resp)
response = client.indices.rollover( alias: 'my-data-stream', lazy: true ) puts response
const response = await client.indices.rollover({ alias: "my-data-stream", lazy: "true", }); console.log(response);
POST /my-data-stream/_rollover?lazy
Use the second to avoid having empty backing indices in data streams that do not get updated often.
Open closed backing indices
editYou cannot search a closed backing index, even by searching its data stream. You also cannot update or delete documents in a closed index.
To re-open a closed backing index, submit an open index API request directly to the index:
resp = client.indices.open( index=".ds-my-data-stream-2099.03.07-000001", ) print(resp)
response = client.indices.open( index: '.ds-my-data-stream-2099.03.07-000001' ) puts response
const response = await client.indices.open({ index: ".ds-my-data-stream-2099.03.07-000001", }); console.log(response);
POST /.ds-my-data-stream-2099.03.07-000001/_open/
To re-open all closed backing indices for a data stream, submit an open index API request to the stream:
resp = client.indices.open( index="my-data-stream", ) print(resp)
response = client.indices.open( index: 'my-data-stream' ) puts response
const response = await client.indices.open({ index: "my-data-stream", }); console.log(response);
POST /my-data-stream/_open/
Reindex with a data stream
editUse the reindex API to copy documents from an existing index,
alias, or data stream to a data stream. Because data streams are
append-only, a reindex into a data stream must use
an op_type
of create
. A reindex cannot update existing documents in a data
stream.
resp = client.reindex( source={ "index": "archive" }, dest={ "index": "my-data-stream", "op_type": "create" }, ) print(resp)
response = client.reindex( body: { source: { index: 'archive' }, dest: { index: 'my-data-stream', op_type: 'create' } } ) puts response
const response = await client.reindex({ source: { index: "archive", }, dest: { index: "my-data-stream", op_type: "create", }, }); console.log(response);
POST /_reindex { "source": { "index": "archive" }, "dest": { "index": "my-data-stream", "op_type": "create" } }
Update documents in a data stream by query
editUse the update by query API to update documents in a data stream that match a provided query:
resp = client.update_by_query( index="my-data-stream", query={ "match": { "user.id": "l7gk7f82" } }, script={ "source": "ctx._source.user.id = params.new_id", "params": { "new_id": "XgdX0NoX" } }, ) print(resp)
response = client.update_by_query( index: 'my-data-stream', body: { query: { match: { 'user.id' => 'l7gk7f82' } }, script: { source: 'ctx._source.user.id = params.new_id', params: { new_id: 'XgdX0NoX' } } } ) puts response
const response = await client.updateByQuery({ index: "my-data-stream", query: { match: { "user.id": "l7gk7f82", }, }, script: { source: "ctx._source.user.id = params.new_id", params: { new_id: "XgdX0NoX", }, }, }); console.log(response);
POST /my-data-stream/_update_by_query { "query": { "match": { "user.id": "l7gk7f82" } }, "script": { "source": "ctx._source.user.id = params.new_id", "params": { "new_id": "XgdX0NoX" } } }
Delete documents in a data stream by query
editUse the delete by query API to delete documents in a data stream that match a provided query:
resp = client.delete_by_query( index="my-data-stream", query={ "match": { "user.id": "vlb44hny" } }, ) print(resp)
response = client.delete_by_query( index: 'my-data-stream', body: { query: { match: { 'user.id' => 'vlb44hny' } } } ) puts response
const response = await client.deleteByQuery({ index: "my-data-stream", query: { match: { "user.id": "vlb44hny", }, }, }); console.log(response);
POST /my-data-stream/_delete_by_query { "query": { "match": { "user.id": "vlb44hny" } } }
Update or delete documents in a backing index
editIf needed, you can update or delete documents in a data stream by sending requests to the backing index containing the document. You’ll need:
- The document ID
- The name of the backing index containing the document
- If updating the document, its sequence number and primary term
To get this information, use a search request:
resp = client.search( index="my-data-stream", seq_no_primary_term=True, query={ "match": { "user.id": "yWIumJd7" } }, ) print(resp)
response = client.search( index: 'my-data-stream', body: { seq_no_primary_term: true, query: { match: { 'user.id' => 'yWIumJd7' } } } ) puts response
const response = await client.search({ index: "my-data-stream", seq_no_primary_term: true, query: { match: { "user.id": "yWIumJd7", }, }, }); console.log(response);
GET /my-data-stream/_search { "seq_no_primary_term": true, "query": { "match": { "user.id": "yWIumJd7" } } }
Response:
{ "took": 20, "timed_out": false, "_shards": { "total": 3, "successful": 3, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 0.2876821, "hits": [ { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "_seq_no": 0, "_primary_term": 1, "_score": 0.2876821, "_source": { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "yWIumJd7" }, "message": "Login successful" } } ] } }
Backing index containing the matching document |
|
Document ID for the document |
|
Current sequence number for the document |
|
Primary term for the document |
To update the document, use an index API request with valid
if_seq_no
and if_primary_term
arguments:
resp = client.index( index=".ds-my-data-stream-2099-03-08-000003", id="bfspvnIBr7VVZlfp2lqX", if_seq_no="0", if_primary_term="1", document={ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }, ) print(resp)
const response = await client.index({ index: ".ds-my-data-stream-2099-03-08-000003", id: "bfspvnIBr7VVZlfp2lqX", if_seq_no: 0, if_primary_term: 1, document: { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, }); console.log(response);
PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1 { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
To delete the document, use the delete API:
resp = client.delete( index=".ds-my-data-stream-2099.03.08-000003", id="bfspvnIBr7VVZlfp2lqX", ) print(resp)
response = client.delete( index: '.ds-my-data-stream-2099.03.08-000003', id: 'bfspvnIBr7VVZlfp2lqX' ) puts response
const response = await client.delete({ index: ".ds-my-data-stream-2099.03.08-000003", id: "bfspvnIBr7VVZlfp2lqX", }); console.log(response);
DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX
To delete or update multiple documents with a single request, use the
bulk API's delete
, index
, and update
actions. For index
actions, include valid if_seq_no
and
if_primary_term
arguments.
resp = client.bulk( refresh=True, operations=[ { "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }, { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } ], ) print(resp)
response = client.bulk( refresh: true, body: [ { index: { _index: '.ds-my-data-stream-2099.03.08-000003', _id: 'bfspvnIBr7VVZlfp2lqX', if_seq_no: 0, if_primary_term: 1 } }, { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' } ] ) puts response
const response = await client.bulk({ refresh: "true", operations: [ { index: { _index: ".ds-my-data-stream-2099.03.08-000003", _id: "bfspvnIBr7VVZlfp2lqX", if_seq_no: 0, if_primary_term: 1, }, }, { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, ], }); console.log(response);
PUT /_bulk?refresh { "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } } { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }