Bulk: indexing multiple documents
editBulk: indexing multiple documents
editBulk requests allow sending multiple document-related operations to Elasticsearch in one request. When you have multiple documents to ingest, this is more efficient than sending each document with a separate request.
A bulk request can contain several kinds of operations:
- create a document, indexing it after ensuring it doesn’t already exist,
- index a document, creating it if needed and replacing it if it exists,
- update a document that already exists in place, either with a script or a partial document,
- delete a document.
See the Elasticsearch API documentation for a full explanation of bulk requests.
Indexing application objects
editA BulkRequest
contains a collection of operations, each operation being a type with several variants. To create this request, it is convenient to use a builder object for the main request, and the fluent DSL for each operation.
The example below shows how to index a list or application objects.
List<Product> products = fetchProducts(); BulkRequest.Builder br = new BulkRequest.Builder(); for (Product product : products) { br.operations(op -> op .index(idx -> idx .index("products") .id(product.getSku()) .document(product) ) ); } BulkResponse result = esClient.bulk(br.build()); // Log errors, if any if (result.errors()) { logger.error("Bulk had errors"); for (BulkResponseItem item: result.items()) { if (item.error() != null) { logger.error(item.error().reason()); } } }
Adds an operation (remember that list properties are additive). |
|
Selects the |
|
Sets the properties for the index operation, similar to single document indexing: index name, identifier and document. |
Indexing raw JSON data
editThe document
property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client’s JSON mapper. However, data that is ingested in bulk is often available as JSON text (e.g. files on disk), and parsing this JSON just to re-serialize it to send the bulk request would be a waste of resources. So documents in bulk operations can also be of type BinaryData
that are sent verbatim (without parsing) to the Elasticsearch server.
In the example below we will use the Java API Client’s BinaryData
to read json files from a log directory and send them in a bulk request.
// List json log files in the log directory File[] logFiles = logDir.listFiles( file -> file.getName().matches("log-.*\\.json") ); BulkRequest.Builder br = new BulkRequest.Builder(); for (File file: logFiles) { FileInputStream input = new FileInputStream(file); BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON); br.operations(op -> op .index(idx -> idx .index("logs") .document(data) ) ); }
Streaming ingestion with the Bulk Ingester
editThe BulkIngester
simplifies the usage of the Bulk API by providing a utility class that allows index/update/delete operations to be transparently grouped in bulk requests. You only have to add()
bulk operations to the ingester and
it will take care of grouping and sending them in bulk according to its configuration.
The ingester will send a bulk request when one of the following criteria is met:
- the number of operations exceeds a maximum (defaults to 1000)
- the bulk request size in bytes exceeds a maximum (defaults to 5 MiB)
- a delay since the last request has expired (periodic flush, no default)
Additionally, you can define a maximum number of concurrent request waiting to be executed by Elasticsearch (defaults to 1). When that maximum is reached and the maximum number of operations have been collected, adding a new operation to the indexer will block. This is avoids overloading the Elasticsearch server by putting backpressure on the client application.
BulkIngester<Void> ingester = BulkIngester.of(b -> b .client(esClient) .maxOperations(100) .flushInterval(1, TimeUnit.SECONDS) ); for (File file: logFiles) { FileInputStream input = new FileInputStream(file); BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON); ingester.add(op -> op .index(idx -> idx .index("logs") .document(data) ) ); } ingester.close();
Sets the Elasticsearch client used to send bulk requests. |
|
Sets the maximum number of operations to collect before sending a bulk request. |
|
Sets the flush interval. |
|
Adds a bulk operation to the ingester. |
|
Closes the ingester to flush the pending operations and release resources. |
Additionally, the bulk ingester accepts a listener so that your application can be notified of bulk requests that are
sent and their result. To allow correlating bulk operations to application context, the add()
method optionally
accepts a context
parameter. The type of this context parameter is used as the generic parameter of the BulkIngester
object. You may have noticed the Void
type in BulkIngester<Void>
above: this is because we did not register a listener,
and therefore did not care about context values.
The following example shows how you can use context values to implement a bulk ingestion listener: as previously it sends JSON log files in bulk, but tracks bulk request errors and failed operations. When an operation fails, depending on the error type you may want to re-add it to the ingester.
BulkListener<String> listener = new BulkListener<String>() { @Override public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) { } @Override public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) { // The request was accepted, but may contain failed items. // The "context" list gives the file name for each bulk item. logger.debug("Bulk request " + executionId + " completed"); for (int i = 0; i < contexts.size(); i++) { BulkResponseItem item = response.items().get(i); if (item.error() != null) { // Inspect the failure cause logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason()); } } } @Override public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) { // The request could not be sent logger.debug("Bulk request " + executionId + " failed", failure); } }; BulkIngester<String> ingester = BulkIngester.of(b -> b .client(esClient) .maxOperations(100) .flushInterval(1, TimeUnit.SECONDS) .listener(listener) ); for (File file: logFiles) { FileInputStream input = new FileInputStream(file); BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON); ingester.add(op -> op .index(idx -> idx .index("logs") .document(data) ), file.getName() ); } ingester.close();
Creates a listener where context values are strings for the ingested file name. |
|
Registers the listener on the bulk ingester. |
|
Sets the file name as the context value for a bulk operation. |
The bulk ingest also exposes statistic information that allows monitoring the ingestion process and tune its configuration:
- number of operations added,
-
number of calls to
add()
that were blocked because the maximum number of concurrent requests was reached (contention), - number of bulk requests sent,
- number of bulk requests that were blocked because the maximum number of concurrent requests was reached.
The source code for the examples above can be found in the Java API Client tests.