Persistent Queues

edit

This functionality is in beta and is subject to change. Deployment in production is at your own risk.

By default, Logstash uses in-memory bounded queues between pipeline stages (inputs → pipeline workers) to buffer events. The size of these in-memory queues is fixed and not configurable. If Logstash experiences a temporary machine failure, the contents of the in-memory queue will be lost. Temporary machine failures are scenarios where Logstash or its host machine are terminated abnormally but are capable of being restarted.

In order to protect against data loss during abnormal termination, Logstash has a persistent queue feature which will store the message queue on disk. Persistent queues provide durability of data within Logstash.

Persistent queues are also useful for Logstash deployments that need large buffers. Instead of deploying and managing a message broker, such as Redis, RabbitMQ, or Apache Kafka, to facilitate a buffered publish-subscriber model, you can enable persistent queues to buffer events on disk and remove the message broker.

In summary, the two benefits of enabling persistent queues are as follows:

  • Provides protection from in-flight message loss when the Logstash process is abnormally terminated.
  • Absorbs bursts of events without needing an external buffering mechanism like Redis or Apache Kafka.

Limitations of Persistent Queues

edit

The following are problems not solved by the persistent queue feature:

  • Input plugins that do not use a request-response protocol cannot be protected from data loss. For example: tcp, udp, zeromq push+pull, and many other inputs do not have a mechanism to acknowledge receipt to the sender. Plugins such as beats and http, which do have a acknowledgement capability, are well protected by this queue.
  • It does not handle permanent machine failures such as disk corruption, disk failure, and machine loss. The data persisted to disk is not replicated.

How Persistent Queues Work

edit

The queue sits between the input and filter stages in the same process:

input → queue → filter + output

When an input has events ready to process, it writes them to the queue. When the write to the queue is successful, the input can send an acknowledgement to its data source.

When processing events from the queue, Logstash acknowledges events as completed, within the queue, only after filters and outputs have completed. The queue keeps a record of events that have been processed by the pipeline. An event is recorded as processed (in this document, called "acknowledged" or "ACKed") if, and only if, the event has been processed completely by the Logstash pipeline.

What does acknowledged mean? This means the event has been handled by all configured filters and outputs. For example, if you have only one output, Elasticsearch, an event is ACKed when the Elasticsearch output has successfully sent this event to Elasticsearch.

During a normal shutdown (CTRL+C or SIGTERM), Logstash will stop reading from the queue and will finish processing the in-flight events being processed by the filters and outputs. Upon restart, Logstash will resume processing the events in the persistent queue as well as accepting new events from inputs.

If Logstash is abnormally terminated, any in-flight events will not have been ACKed and will be reprocessed by filters and outputs when Logstash is restarted. Logstash processes events in batches, so it is possible that for any given batch, some of that batch may have been successfully completed, but not recorded as ACKed, when an abnormal termination occurs.

For more details specific behaviors of queue writes and acknowledgement, see Controlling Durability.

Configuring Persistent Queues

edit

To configure persistent queues, you can specify the following options in the Logstash settings file:

  • queue.type: Specify persisted to enable persistent queues. By default, persistent queues are disabled (default: queue.type: memory).
  • path.queue: The directory path where the data files will be stored. By default, the files are stored in path.data/queue.
  • queue.page_capacity: The maximum size of a queue page in bytes. The queue data consists of append-only files called "pages". The default size is 250mb. Changing this value is unlikely to have performance benefits.
  • queue.max_events: The maximum number of events that are allowed in the queue. The default is 0 (unlimited). This value is used internally for the Logstash test suite.
  • queue.max_bytes: The total capacity of the queue in number of bytes. The default is 1024mb (1gb). Make sure the capacity of your disk drive is greater than the value you specify here.

If both queue.max_events and queue.max_bytes are specified, Logstash uses whichever criteria is reached first. See Handling Back Pressure for behavior when these queue limits are reached.

You can also specify options that control when the checkpoint file gets updated (queue.checkpoint.acks, queue.checkpoint.writes). See Controlling Durability.

Example configuration:

queue.type: persisted
queue.max_bytes: 4gb

Handling Back Pressure

edit

When the queue is full, Logstash puts back pressure on the inputs to stall data flowing into Logstash. This mechanism helps Logstash control the rate of data flow at the input stage without overwhelming outputs like Elasticsearch.

Use queue.max_bytes setting to configure the total capacity of the queue on disk. The following example sets the total capacity of the queue to 8gb:

queue.type: persisted
queue.max_bytes: 8gb

With these settings specified, Logstash will buffer events on disk until the size of the queue reaches 8gb. When the queue is full of unACKed events, and the size limit has been reached, Logstash will no longer accept new events.

Each input handles back pressure independently. For example, when the beats input encounters back pressure, it no longer accepts new connections and waits until the persistent queue has space to accept more events. After the filter and output stages finish processing existing events in the queue and ACKs them, Logstash automatically starts accepting new events.

Controlling Durability

edit

Durability is a property of storage writes that ensures data will be available after it’s written.

When the persistent queue feature is enabled, Logstash will store events on disk. Logstash commits to disk in a mechanism called checkpointing.

To discuss durability, we need to introduce a few details about how the persistent queue is implemented.

First, the queue itself is a set of pages. There are two kinds of pages: head pages and tail pages. The head page is where new events are written. There is only one head page. When the head page is of a certain size (see queue.page_capacity), it becomes a tail page, and a new head page is created. Tail pages are immutable, and the head page is append-only. Second, the queue records details about itself (pages, acknowledgements, etc) in a separate file called a checkpoint file.

When recording a checkpoint, Logstash will:

  • Call fsync on the head page.
  • Atomically write to disk the current state of the queue.

The following settings are available to let you tune durability:

  • queue.checkpoint.writes: Logstash will checkpoint after this many writes into the queue. Currently, one event counts as one write, but this may change in future releases.
  • queue.checkpoint.acks: Logstah will checkpoint after this many events are acknowledged. This configuration controls the durability at the processing (filter + output) part of Logstash.

Disk writes have a resource cost. Tuning the above values higher or lower will trade durability for performance. For instance, if you want to the strongest durability for all input events, you can set queue.checkpoint.writes: 1.

The process of checkpointing is atomic, which means any update to the file is saved if successful.

If Logstash is terminated, or if there is a hardware level failure, any data that is buffered in the persistent queue, but not yet checkpointed, is lost. To avoid this possibility, you can set queue.checkpoint.writes: 1, but keep in mind that this setting can severely impact performance.

Disk Garbage Collection

edit

On disk, the queue is stored as a set of pages where each page is one file. Each page can be at most queue.page_capacity in size. Pages are deleted (garbage collected) after all events in that page have been ACKed. If an older page has at least one event that is not yet ACKed, that entire page will remain on disk until all events in that page are successfully processed. Each page containing unprocessed events will count against the queue.max_bytes byte size.