Indexing documents
editIndexing documents
editNEST exposes the index and bulk APIs of Elasticsearch as methods, to enable indexing of single or multiple documents. In addition to this, the client provides some convenient shorthand methods for the typical indexing approaches.
Single documents
editA single document can be indexed, either synchronously or asynchronously,
using the IndexDocument
and IndexDocumentAsync
methods, respectively. These methods are a simple way to index a single document
that doesn’t require any additional request parameters
Single documents with parameters
editIf you need to set additional parameters when indexing, you can use the Index
method with either the fluent or object initializer syntax.
The Index
method exposes a way to set additional parameters such as the name of the index in which to index, the id to assign to the
document, routing parameters, etc., allowing more control over indexing.
Multiple documents with IndexMany
editMultiple documents can be indexed using the IndexMany
and IndexManyAsync
methods, again either synchronously or asynchronously, respectively.
These methods are specific to the NEST client to provide a convenient shortcut to indexing
multiple documents using the _bulk
endpoint.
IndexMany
all documents in a single HTTP request, so for very large document collections it is not a recommended approach
- consider using the BulkAllObservable
helper instead.
var people = new [] { new Person { Id = 1, FirstName = "Martijn", LastName = "Laarman" }, new Person { Id = 2, FirstName = "Stuart", LastName = "Cam" }, new Person { Id = 3, FirstName = "Russ", LastName = "Cam" } }; var indexManyResponse = client.IndexMany(people); if (indexManyResponse.Errors) { foreach (var itemWithError in indexManyResponse.ItemsWithErrors) { Console.WriteLine($"Failed to index document {itemWithError.Id}: {itemWithError.Error}"); } } // Alternatively, documents can be indexed asynchronously var indexManyAsyncResponse = await client.IndexManyAsync(people);
Multiple documents with Bulk
editIf you require more control over indexing many documents, you can use the Bulk
and BulkAsync
methods and use the descriptors to
customise the bulk calls.
As with the IndexMany
methods, documents are sent using the bulk API in a single HTTP request.
This does mean that consideration should be given to the overall size of the HTTP request. For indexing a large number
of documents, it may be sensible to perform multiple separate Bulk
calls, or use BulkAllObservable
,
which takes care of a lot of the complexity.
var bulkIndexResponse = client.Bulk(b => b .Index("people") .IndexMany(people) ); // Alternatively, documents can be indexed asynchronously similar to IndexManyAsync var asyncBulkIndexResponse = await client.BulkAsync(b => b .Index("people") .IndexMany(people) );
synchronous method that returns an IBulkResponse, the same as IndexMany and can be inspected in the same way for errors |
|
asynchronous method that returns a Task<IBulkResponse> that can be awaited |
Control over how each bulk index operation is configured can be achieved by passing a descriptor to the IndexMany
method on Bulk
. Here’s an example of specifying a different index and pipeline for each document, based on properties of
the document to be indexed
var bulkIndexResponse = client.Bulk(b => b .Index("people") .IndexMany(people, (descriptor, person) => descriptor .Index(person.Id % 2 == 0 ? "even-index" : "odd-index") .Pipeline(person.FirstName.StartsWith("M") ? "startswith_m_pipeline" : "does_not_start_with_m_pipeline") ) );
configure an explicit index for a document, based on its |
|
specify an ingest pipeline to use when indexing the document |
Multiple documents with BulkAllObservable
helper
editUsing the BulkAllObservable
helper allows you to focus on the overall objective of indexing, without having to
concern yourself with retry, backoff or chunking mechanics.
Multiple documents can be indexed using the BulkAll
method and Wait()
extension method.
This helper exposes functionality to automatically retry / backoff in the event of an indexing failure, and to control the number of documents indexed in a single HTTP request. In the example below each request will contain 1000 documents, chunked from the original input. In the event of a large number of documents this could result in many HTTP requests, each containing 1000 documents (the last request may contain less, depending on the total number).
The helper lazily enumerates the provided IEnumerable<T>
of documents, allowing you to index a large number of documents easily
var bulkAllObservable = client.BulkAll(people, b => b .Index("people") .BackOffTime("30s") .BackOffRetries(2) .RefreshOnCompleted() .MaxDegreeOfParallelism(Environment.ProcessorCount) .Size(1000) ) .Wait(TimeSpan.FromMinutes(15), next => { // do something e.g. write number of pages to console });
how long to wait between retries |
|
how many retries are attempted if a failure occurs |
|
items per bulk request |
|
perform the indexing and wait up to 15 minutes, whilst the BulkAll calls are asynchronous this is a blocking operation |
The internal implementation of BulkAllObservable
is asynchronous, using the
Observer Design Pattern to enable observers to
be registered to take action when each bulk response is returned, an error has occurred, and when the BulkAllObservable
has
finished. Whilst the internal implementation is asynchronous, you typically want to wait until all bulk indexing has finished before
continuing. The Wait
method is a convenient shorthand to use for this, using a ManualResetEvent
to block the current thread until
bulk indexing has finished or an error has occurred.
Advanced bulk indexing
editThe BulkAllObservable
helper exposes a number of methods to further control the process, such as
-
BufferToBulk
to customize individual operations within the bulk request before it is dispatched to the server -
RetryDocumentPredicate
to decide if a document that failed to be indexed should be retried -
DroppedDocumentCallback
to determine what to do in the event a document is not indexed, even after retrying
The following example demonstrates some of these methods, in addition to using a BulkAllObserver
to subscribe to
the bulk indexing process and take some action on each successful bulk response, when an error occurs, and when
the process has finished.
An observer such as BulkAllObserver
should not throw exceptions from its interface implementations, such
as OnNext
and OnError
. Any exceptions thrown should be expected to go unhandled. In light of this, any exception
that occurs during the bulk indexing process should be captured and thrown outside of the observer, as demonstrated in the
example below. Take a look at the
Observer Design Pattern best practices
on handling exceptions.
var bulkAllObservable = client.BulkAll(people, b => b .BufferToBulk((descriptor, buffer) => { foreach (var person in buffer) { descriptor.Index<Person>(bi => bi .Index(person.Id % 2 == 0 ? "even-index" : "odd-index") .Document(person) ); } }) .RetryDocumentPredicate((bulkResponseItem, person) => { return bulkResponseItem.Error.Index == "even-index" && person.FirstName == "Martijn"; }) .DroppedDocumentCallback((bulkResponseItem, person) => { Console.WriteLine($"Unable to index: {bulkResponseItem} {person}"); })); var waitHandle = new ManualResetEvent(false); ExceptionDispatchInfo exceptionDispatchInfo = null; var observer = new BulkAllObserver( onNext: response => { // do something e.g. write number of pages to console }, onError: exception => { exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception); waitHandle.Set(); }, onCompleted: () => waitHandle.Set()); bulkAllObservable.Subscribe(observer); waitHandle.WaitOne(); exceptionDispatchInfo?.Throw();
Customise each bulk operation before it is dispatched |
|
Index each document into either even-index or odd-index |
|
Decide if a document should be retried in the event of a failure |
|
If a document cannot be indexed this delegate is called |
|
Subscribe to the observable, which will initiate the bulk indexing process |
|
Block the current thread until a signal is received |
|
If an exception was captured during the bulk indexing process, throw it |