New

The executive guide to generative AI

Read more

Request pipelines

edit

Every request is executed in the context of a RequestPipeline when using the default ITransport implementation.

When calling Request() or RequestAsync() on an ITransport, the whole coordination of the request is deferred to a new instance in a using block.

var pipeline = new RequestPipeline(
    settings,
    DateTimeProvider.Default,
    new MemoryStreamFactory(),
    new SearchRequestParameters());

pipeline.GetType().Should().Implement<IDisposable>();

An ITransport does not instantiate a RequestPipeline directly; it uses a pluggable IRequestPipelineFactory to create them

var requestPipelineFactory = new RequestPipelineFactory();
var requestPipeline = requestPipelineFactory.Create(
    settings,
    DateTimeProvider.Default, 
    new MemoryStreamFactory(),
    new SearchRequestParameters());

requestPipeline.Should().BeOfType<RequestPipeline>();
requestPipeline.GetType().Should().Implement<IDisposable>();

An IDateTimeProvider implementation

You can pass your own IRequestPipeline implementation to the transport when instantiating a client, allowing you to have requests executed in your own custom request pipeline

var transport = new Transport<IConnectionSettingsValues>(
    settings,
    requestPipelineFactory,
    DateTimeProvider.Default,
    new MemoryStreamFactory());

var client = new ElasticClient(transport);

Let’s now have a look at some of the characteristics of the request pipeline

Sniffing on first usage

edit

Here we have setup three pipelines with three different connection pools

var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First()));
var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris));
var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris));
Let's see how they behave on first usage
singleNodePipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse();
staticPipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse();
sniffingPipeline.FirstPoolUsageNeedsSniffing.Should().BeTrue();

We see that only the Sniffing connection pool supports sniffing on first usage, since it supports reseeding. Sniffing on startup however can be disabled on ConnectionSettings for sniffing connection pool

sniffingPipeline = CreatePipeline(
    uris => new SniffingConnectionPool(uris),
    s => s.SniffOnStartup(false)); 

sniffingPipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse();

Disable sniffing on startup

Wait for first sniff

edit

All threads wait for the sniff on startup to finish, waiting the request timeout period. A SemaphoreSlim is used to block threads until the sniff finishes and waiting threads release the SemaphoreSlim appropriately.

We can demonstrate this with the following example. First, let’s configure a custom IConnection implementation that’s simply going to return a known 200 response after one second

var inMemoryConnection = new WaitingInMemoryConnection(
    TimeSpan.FromSeconds(1),
    responseBody);

Next, we create a Sniffing connection pool using our custom connection and a timeout for how long a request can take before the client times out

var sniffingPipeline = CreatePipeline(
    uris => new SniffingConnectionPool(uris),
    connection: inMemoryConnection,
    settingsSelector: s => s.RequestTimeout(TimeSpan.FromSeconds(2)));

Now, with a SemaphoreSlim in place that allows only one thread to enter at a time, start three tasks that will initiate a sniff on startup.

The first task will successfully sniff on startup with the remaining two waiting tasks exiting without exception. The SemaphoreSlim is also released, ready for when sniffing needs to take place again

var semaphoreSlim = new SemaphoreSlim(1, 1);

var task1 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
var task2 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
var task3 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));

var exception = Record.Exception(() => Task.WaitAll(task1, task2, task3));

exception.Should().BeNull();
semaphoreSlim.CurrentCount.Should().Be(1);

Sniff on connection failure

edit

Only a connection pool that supports reseeding will opt in to SniffsOnConnectionFailure(). In this case, it is only the Sniffing connection pool

var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First()));
var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris));
var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris));

singleNodePipeline.SniffsOnConnectionFailure.Should().BeFalse();
staticPipeline.SniffsOnConnectionFailure.Should().BeFalse();
sniffingPipeline.SniffsOnConnectionFailure.Should().BeTrue();

You can however disable this behaviour on ConnectionSettings

sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris), s => s.SniffOnConnectionFault(false));
sniffingPipeline.SniffsOnConnectionFailure.Should().BeFalse();

Sniff on stale cluster

edit

A connection pool that supports reseeding will sniff after a period of time to ensure that its understanding of the state of the cluster is not stale.

Let’s set up three request pipelines with different connection pools and a date time provider that will allow us to artificially change the time

var dateTime = new TestableDateTimeProvider();
var singleNodePipeline = CreatePipeline(uris =>
    new SingleNodeConnectionPool(uris.First(), dateTime), dateTimeProvider: dateTime);

var staticPipeline = CreatePipeline(uris =>
    new StaticConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime);

var sniffingPipeline = CreatePipeline(uris =>
    new SniffingConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime);

On the request pipeline with the Sniffing connection pool will sniff when its understanding of the cluster is stale

singleNodePipeline.SniffsOnStaleCluster.Should().BeFalse();
staticPipeline.SniffsOnStaleCluster.Should().BeFalse();
sniffingPipeline.SniffsOnStaleCluster.Should().BeTrue();

To begin with, all request pipelines have a fresh view of cluster state i.e. not stale

singleNodePipeline.StaleClusterState.Should().BeFalse();
staticPipeline.StaleClusterState.Should().BeFalse();
sniffingPipeline.StaleClusterState.Should().BeFalse();

Now, if we go two hours into the future

dateTime.ChangeTime(d => d.Add(TimeSpan.FromHours(2)));

Those connection pools that do not support reseeding never go stale

singleNodePipeline.StaleClusterState.Should().BeFalse();
staticPipeline.StaleClusterState.Should().BeFalse();

but the Request pipeline using the Sniffing connection pool that supports reseeding, signals that its understanding of the cluster state is out of date

sniffingPipeline.StaleClusterState.Should().BeTrue();

Retrying

edit

A request pipeline also checks whether the overall time across multiple retries exceeds the request timeout. See Retries for more details, here we assert that our request pipeline exposes this properly

var dateTime = new TestableDateTimeProvider();
var singleNodePipeline = CreatePipeline(uris =>
    new SingleNodeConnectionPool(uris.First(), dateTime), dateTimeProvider: dateTime);

var staticPipeline = CreatePipeline(uris =>
    new StaticConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime);

var sniffingPipeline = CreatePipeline(uris =>
    new SniffingConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime);

singleNodePipeline.IsTakingTooLong.Should().BeFalse();
staticPipeline.IsTakingTooLong.Should().BeFalse();
sniffingPipeline.IsTakingTooLong.Should().BeFalse();

go one hour into the future

dateTime.ChangeTime(d => d.Add(TimeSpan.FromHours(2)));

Connection pools that do not support reseeding never go stale

singleNodePipeline.IsTakingTooLong.Should().BeTrue();
staticPipeline.IsTakingTooLong.Should().BeTrue();

the sniffing connection pool supports reseeding so the pipeline will signal the state is out of date

sniffingPipeline.IsTakingTooLong.Should().BeTrue();

request pipeline exposes the DateTime it started; we assert it started 2 hours in the past

(dateTime.Now() - singleNodePipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2));
(dateTime.Now() - staticPipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2));
(dateTime.Now() - sniffingPipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2));