Use a data stream
editUse a data stream
editAfter you set up a data stream, you can do the following:
- Add documents to a data stream
- Search a data stream
- Get statistics for a data stream
- Manually roll over a data stream
- Open closed backing indices
- Reindex with a data stream
- Update documents in a data stream by query
- Delete documents in a data stream by query
- Update or delete documents in a backing index
Add documents to a data stream
editTo add an individual document, use the index API. Ingest pipelines are supported.
POST /my-data-stream/_doc/ { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
You cannot add new documents to a data stream using the index API’s PUT
/<target>/_doc/<_id>
request format. To specify a document ID, use the PUT
/<target>/_create/<_id>
format instead. Only an
op_type
of create
is supported.
To add multiple documents with a single request, use the bulk API.
Only create
actions are supported.
response = client.bulk( index: 'my-data-stream', refresh: true, body: [ { create: {} }, { "@timestamp": '2099-03-08T11:04:05.000Z', user: { id: 'vlb44hny' }, message: 'Login attempt failed' }, { create: {} }, { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' }, { create: {} }, { "@timestamp": '2099-03-09T11:07:08.000Z', user: { id: 'l7gk7f82' }, message: 'Logout successful' } ] ) puts response
PUT /my-data-stream/_bulk?refresh {"create":{ }} { "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" } {"create":{ }} { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } {"create":{ }} { "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
Search a data stream
editThe following search APIs support data streams:
Get statistics for a data stream
editUse the data stream stats API to get statistics for one or more data streams:
response = client.indices.data_streams_stats( name: 'my-data-stream', human: true ) puts response
GET /_data_stream/my-data-stream/_stats?human=true
Manually roll over a data stream
editUse the rollover API to manually roll over a data stream:
response = client.indices.rollover( alias: 'my-data-stream' ) puts response
POST /my-data-stream/_rollover/
Open closed backing indices
editYou cannot search a closed backing index, even by searching its data stream. You also cannot update or delete documents in a closed index.
To re-open a closed backing index, submit an open index API request directly to the index:
response = client.indices.open( index: '.ds-my-data-stream-2099.03.07-000001' ) puts response
POST /.ds-my-data-stream-2099.03.07-000001/_open/
To re-open all closed backing indices for a data stream, submit an open index API request to the stream:
response = client.indices.open( index: 'my-data-stream' ) puts response
POST /my-data-stream/_open/
Reindex with a data stream
editUse the reindex API to copy documents from an existing index,
alias, or data stream to a data stream. Because data streams are
append-only, a reindex into a data stream must use
an op_type
of create
. A reindex cannot update existing documents in a data
stream.
response = client.reindex( body: { source: { index: 'archive' }, dest: { index: 'my-data-stream', op_type: 'create' } } ) puts response
POST /_reindex { "source": { "index": "archive" }, "dest": { "index": "my-data-stream", "op_type": "create" } }
Update documents in a data stream by query
editUse the update by query API to update documents in a data stream that match a provided query:
response = client.update_by_query( index: 'my-data-stream', body: { query: { match: { "user.id": 'l7gk7f82' } }, script: { source: 'ctx._source.user.id = params.new_id', params: { new_id: 'XgdX0NoX' } } } ) puts response
POST /my-data-stream/_update_by_query { "query": { "match": { "user.id": "l7gk7f82" } }, "script": { "source": "ctx._source.user.id = params.new_id", "params": { "new_id": "XgdX0NoX" } } }
Delete documents in a data stream by query
editUse the delete by query API to delete documents in a data stream that match a provided query:
response = client.delete_by_query( index: 'my-data-stream', body: { query: { match: { "user.id": 'vlb44hny' } } } ) puts response
POST /my-data-stream/_delete_by_query { "query": { "match": { "user.id": "vlb44hny" } } }
Update or delete documents in a backing index
editIf needed, you can update or delete documents in a data stream by sending requests to the backing index containing the document. You’ll need:
- The document ID
- The name of the backing index containing the document
- If updating the document, its sequence number and primary term
To get this information, use a search request:
response = client.search( index: 'my-data-stream', body: { seq_no_primary_term: true, query: { match: { "user.id": 'yWIumJd7' } } } ) puts response
GET /my-data-stream/_search { "seq_no_primary_term": true, "query": { "match": { "user.id": "yWIumJd7" } } }
Response:
{ "took": 20, "timed_out": false, "_shards": { "total": 3, "successful": 3, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 0.2876821, "hits": [ { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "_seq_no": 0, "_primary_term": 1, "_score": 0.2876821, "_source": { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "yWIumJd7" }, "message": "Login successful" } } ] } }
Backing index containing the matching document |
|
Document ID for the document |
|
Current sequence number for the document |
|
Primary term for the document |
To update the document, use an index API request with valid
if_seq_no
and if_primary_term
arguments:
PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1 { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
To delete the document, use the delete API:
response = client.delete( index: '.ds-my-data-stream-2099.03.08-000003', id: 'bfspvnIBr7VVZlfp2lqX' ) puts response
DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX
To delete or update multiple documents with a single request, use the
bulk API's delete
, index
, and update
actions. For index
actions, include valid if_seq_no
and
if_primary_term
arguments.
response = client.bulk( refresh: true, body: [ { index: { _index: '.ds-my-data-stream-2099.03.08-000003', _id: 'bfspvnIBr7VVZlfp2lqX', if_seq_no: 0, if_primary_term: 1 } }, { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' } ] ) puts response
PUT /_bulk?refresh { "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } } { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }