- .NET Clients: other versions:
- Introduction
- Breaking changes
- API Conventions
- Elasticsearch.Net - Low level client
- NEST - High level client
- Troubleshooting
- Search
- Query DSL
- Full text queries
- Term level queries
- Exists Query Usage
- Fuzzy Date Query Usage
- Fuzzy Numeric Query Usage
- Fuzzy Query Usage
- Ids Query Usage
- Prefix Query Usage
- Date Range Query Usage
- Numeric Range Query Usage
- Term Range Query Usage
- Regexp Query Usage
- Term Query Usage
- Terms List Query Usage
- Terms Lookup Query Usage
- Terms Query Usage
- Type Query Usage
- Wildcard Query Usage
- Compound queries
- Joining queries
- Geo queries
- Geo Bounding Box Query Usage
- Geo Distance Query Usage
- Geo Distance Range Query Usage
- Geo Hash Cell Query Usage
- Geo Polygon Query Usage
- Geo Shape Circle Query Usage
- Geo Shape Envelope Query Usage
- Geo Shape Geometry Collection Query Usage
- Geo Shape Indexed Shape Query Usage
- Geo Shape Line String Query Usage
- Geo Shape Multi Line String Query Usage
- Geo Shape Multi Point Query Usage
- Geo Shape Multi Polygon Query Usage
- Geo Shape Point Query Usage
- Geo Shape Polygon Query Usage
- Specialized queries
- Span queries
- NEST specific queries
- Aggregations
- Metric Aggregations
- Average Aggregation Usage
- Cardinality Aggregation Usage
- Extended Stats Aggregation Usage
- Geo Bounds Aggregation Usage
- Geo Centroid Aggregation Usage
- Max Aggregation Usage
- Min Aggregation Usage
- Percentile Ranks Aggregation Usage
- Percentiles Aggregation Usage
- Scripted Metric Aggregation Usage
- Stats Aggregation Usage
- Sum Aggregation Usage
- Top Hits Aggregation Usage
- Value Count Aggregation Usage
- Bucket Aggregations
- Adjacency Matrix Usage
- Children Aggregation Usage
- Date Histogram Aggregation Usage
- Date Range Aggregation Usage
- Filter Aggregation Usage
- Filters Aggregation Usage
- Geo Distance Aggregation Usage
- Geo Hash Grid Aggregation Usage
- Global Aggregation Usage
- Histogram Aggregation Usage
- Ip Range Aggregation Usage
- Missing Aggregation Usage
- Nested Aggregation Usage
- Range Aggregation Usage
- Reverse Nested Aggregation Usage
- Sampler Aggregation Usage
- Significant Terms Aggregation Usage
- Terms Aggregation Usage
- Pipeline Aggregations
- Average Bucket Aggregation Usage
- Bucket Script Aggregation Usage
- Bucket Selector Aggregation Usage
- Cumulative Sum Aggregation Usage
- Derivative Aggregation Usage
- Extended Stats Bucket Aggregation Usage
- Max Bucket Aggregation Usage
- Min Bucket Aggregation Usage
- Moving Average Ewma Aggregation Usage
- Moving Average Holt Linear Aggregation Usage
- Moving Average Holt Winters Aggregation Usage
- Moving Average Linear Aggregation Usage
- Moving Average Simple Aggregation Usage
- Percentiles Bucket Aggregation Usage
- Serial Differencing Aggregation Usage
- Stats Bucket Aggregation Usage
- Sum Bucket Aggregation Usage
- Matrix Aggregations
- Metric Aggregations
WARNING: Version 5.x has passed its EOL date.
This documentation is no longer being maintained and may be removed. If you are running this version, we strongly advise you to upgrade. For the latest information, see the current release documentation.
Request pipelines
editRequest pipelines
editEvery request is executed in the context of a RequestPipeline
when using the
default ITransport implementation.
When calling Request()
or RequestAsync()
on an ITransport
,
the whole coordination of the request is deferred to a new instance in a using
block.
var pipeline = new RequestPipeline( settings, DateTimeProvider.Default, new MemoryStreamFactory(), new SearchRequestParameters()); pipeline.GetType().Should().Implement<IDisposable>();
An ITransport
does not instantiate a RequestPipeline
directly; it uses a pluggable IRequestPipelineFactory
to create them
var requestPipelineFactory = new RequestPipelineFactory(); var requestPipeline = requestPipelineFactory.Create( settings, DateTimeProvider.Default, new MemoryStreamFactory(), new SearchRequestParameters()); requestPipeline.Should().BeOfType<RequestPipeline>(); requestPipeline.GetType().Should().Implement<IDisposable>();
An |
You can pass your own IRequestPipeline
implementation to the transport when instantiating a client,
allowing you to have requests executed in your own custom request pipeline
var transport = new Transport<IConnectionSettingsValues>( settings, requestPipelineFactory, DateTimeProvider.Default, new MemoryStreamFactory()); var client = new ElasticClient(transport);
Let’s now have a look at some of the characteristics of the request pipeline
Sniffing on first usage
editHere we have setup three pipelines with three different connection pools
var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First())); var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris)); var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris));
Let's see how they behave on first usage
singleNodePipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse(); staticPipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse(); sniffingPipeline.FirstPoolUsageNeedsSniffing.Should().BeTrue();
We see that only the Sniffing connection pool supports sniffing on first usage,
since it supports reseeding. Sniffing on startup however can be disabled on ConnectionSettings
for sniffing
connection pool
Wait for first sniff
editAll threads wait for the sniff on startup to finish, waiting the request timeout period. A
SemaphoreSlim
is used to block threads until the sniff finishes and waiting threads release the SemaphoreSlim
appropriately.
We can demonstrate this with the following example. First, let’s configure
a custom IConnection
implementation that’s simply going to return a known
200 response after one second
var inMemoryConnection = new WaitingInMemoryConnection( TimeSpan.FromSeconds(1), responseBody);
Next, we create a Sniffing connection pool using our custom connection and a timeout for how long a request can take before the client times out
var sniffingPipeline = CreatePipeline( uris => new SniffingConnectionPool(uris), connection: inMemoryConnection, settingsSelector: s => s.RequestTimeout(TimeSpan.FromSeconds(2)));
Now, with a SemaphoreSlim
in place that allows only one thread to enter at a time,
start three tasks that will initiate a sniff on startup.
The first task will successfully sniff on startup with the remaining two waiting
tasks exiting without exception. The SemaphoreSlim
is also released, ready for
when sniffing needs to take place again
var semaphoreSlim = new SemaphoreSlim(1, 1); var task1 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim)); var task2 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim)); var task3 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim)); var exception = Record.Exception(() => Task.WaitAll(task1, task2, task3)); exception.Should().BeNull(); semaphoreSlim.CurrentCount.Should().Be(1);
Sniff on connection failure
editOnly a connection pool that supports reseeding will opt in to SniffsOnConnectionFailure()
.
In this case, it is only the Sniffing connection pool
var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First())); var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris)); var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris)); singleNodePipeline.SniffsOnConnectionFailure.Should().BeFalse(); staticPipeline.SniffsOnConnectionFailure.Should().BeFalse(); sniffingPipeline.SniffsOnConnectionFailure.Should().BeTrue();
You can however disable this behaviour on ConnectionSettings
sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris), s => s.SniffOnConnectionFault(false)); sniffingPipeline.SniffsOnConnectionFailure.Should().BeFalse();
Sniff on stale cluster
editA connection pool that supports reseeding will sniff after a period of time to ensure that its understanding of the state of the cluster is not stale.
Let’s set up three request pipelines with different connection pools and a date time provider that will allow us to artificially change the time
var dateTime = new TestableDateTimeProvider(); var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First(), dateTime), dateTimeProvider: dateTime); var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime); var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime);
On the request pipeline with the Sniffing connection pool will sniff when its understanding of the cluster is stale
singleNodePipeline.SniffsOnStaleCluster.Should().BeFalse(); staticPipeline.SniffsOnStaleCluster.Should().BeFalse(); sniffingPipeline.SniffsOnStaleCluster.Should().BeTrue();
To begin with, all request pipelines have a fresh view of cluster state i.e. not stale
singleNodePipeline.StaleClusterState.Should().BeFalse(); staticPipeline.StaleClusterState.Should().BeFalse(); sniffingPipeline.StaleClusterState.Should().BeFalse();
Now, if we go two hours into the future
dateTime.ChangeTime(d => d.Add(TimeSpan.FromHours(2)));
Those connection pools that do not support reseeding never go stale
singleNodePipeline.StaleClusterState.Should().BeFalse(); staticPipeline.StaleClusterState.Should().BeFalse();
but the Request pipeline using the Sniffing connection pool that supports reseeding, signals that its understanding of the cluster state is out of date
sniffingPipeline.StaleClusterState.Should().BeTrue();
Retrying
editA request pipeline also checks whether the overall time across multiple retries exceeds the request timeout. See Retries for more details, here we assert that our request pipeline exposes this properly
var dateTime = new TestableDateTimeProvider(); var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First(), dateTime), dateTimeProvider: dateTime); var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime); var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime); singleNodePipeline.IsTakingTooLong.Should().BeFalse(); staticPipeline.IsTakingTooLong.Should().BeFalse(); sniffingPipeline.IsTakingTooLong.Should().BeFalse();
go one hour into the future
dateTime.ChangeTime(d => d.Add(TimeSpan.FromHours(2)));
Connection pools that do not support reseeding never go stale
singleNodePipeline.IsTakingTooLong.Should().BeTrue(); staticPipeline.IsTakingTooLong.Should().BeTrue();
the sniffing connection pool supports reseeding so the pipeline will signal the state is out of date
sniffingPipeline.IsTakingTooLong.Should().BeTrue();
request pipeline exposes the DateTime it started; we assert it started 2 hours in the past
(dateTime.Now() - singleNodePipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2)); (dateTime.Now() - staticPipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2)); (dateTime.Now() - sniffingPipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2));
On this page