Persistent Queues

edit

By default, Logstash uses in-memory bounded queues between pipeline stages (inputs → pipeline workers) to buffer events. The size of these in-memory queues is fixed and not configurable. If Logstash experiences a temporary machine failure, the contents of the in-memory queue will be lost. Temporary machine failures are scenarios where Logstash or its host machine are terminated abnormally but are capable of being restarted.

In order to protect against data loss during abnormal termination, Logstash has a persistent queue feature which will store the message queue on disk. Persistent queues provide durability of data within Logstash.

Persistent queues are also useful for Logstash deployments that need large buffers. Instead of deploying and managing a message broker, such as Redis, RabbitMQ, or Apache Kafka, to facilitate a buffered publish-subscriber model, you can enable persistent queues to buffer events on disk and remove the message broker.

In summary, the benefits of enabling persistent queues are as follows:

  • Absorbs bursts of events without needing an external buffering mechanism like Redis or Apache Kafka.
  • Provides an at-least-once delivery guarantee against message loss during a normal shutdown as well as when Logstash is terminated abnormally. If Logstash is restarted while events are in-flight, Logstash will attempt to deliver messages stored in the persistent queue until delivery succeeds at least once.

    You must set queue.checkpoint.writes: 1 explicitly to guarantee maximum durability for all input events. See Controlling Durability.

Limitations of Persistent Queues

edit

The following are problems not solved by the persistent queue feature:

  • Input plugins that do not use a request-response protocol cannot be protected from data loss. For example: tcp, udp, zeromq push+pull, and many other inputs do not have a mechanism to acknowledge receipt to the sender. Plugins such as beats and http, which do have an acknowledgement capability, are well protected by this queue.
  • It does not handle permanent machine failures such as disk corruption, disk failure, and machine loss. The data persisted to disk is not replicated.

How Persistent Queues Work

edit

The queue sits between the input and filter stages in the same process:

input → queue → filter + output

When an input has events ready to process, it writes them to the queue. When the write to the queue is successful, the input can send an acknowledgement to its data source.

When processing events from the queue, Logstash acknowledges events as completed, within the queue, only after filters and outputs have completed. The queue keeps a record of events that have been processed by the pipeline. An event is recorded as processed (in this document, called "acknowledged" or "ACKed") if, and only if, the event has been processed completely by the Logstash pipeline.

What does acknowledged mean? This means the event has been handled by all configured filters and outputs. For example, if you have only one output, Elasticsearch, an event is ACKed when the Elasticsearch output has successfully sent this event to Elasticsearch.

During a normal shutdown (CTRL+C or SIGTERM), Logstash will stop reading from the queue and will finish processing the in-flight events being processed by the filters and outputs. Upon restart, Logstash will resume processing the events in the persistent queue as well as accepting new events from inputs.

If Logstash is abnormally terminated, any in-flight events will not have been ACKed and will be reprocessed by filters and outputs when Logstash is restarted. Logstash processes events in batches, so it is possible that for any given batch, some of that batch may have been successfully completed, but not recorded as ACKed, when an abnormal termination occurs.

For more details specific behaviors of queue writes and acknowledgement, see Controlling Durability.

Configuring Persistent Queues

edit

To configure persistent queues, specify options in the Logstash settings file. Settings are applied to every pipeline.

When you set values for capacity and sizing settings, remember that the value you set is applied per pipeline rather than a total to be shared among all pipelines.

If you want to define values for a specific pipeline, use pipelines.yml.

  • queue.type: Specify persisted to enable persistent queues. By default, persistent queues are disabled (default: queue.type: memory).
  • path.queue: The directory path where the data files will be stored. By default, the files are stored in path.data/queue.
  • queue.page_capacity: The maximum size of a queue page in bytes. The queue data consists of append-only files called "pages". The default size is 64mb. Changing this value is unlikely to have performance benefits.
  • queue.drain: Specify true if you want Logstash to wait until the persistent queue is drained before shutting down. The amount of time it takes to drain the queue depends on the number of events that have accumulated in the queue. Therefore, you should avoid using this setting unless the queue, even when full, is relatively small and can be drained quickly.
  • queue.max_events: The maximum number of events that are allowed in the queue. The default is 0 (unlimited).
  • queue.max_bytes: The total capacity of the queue in number of bytes. The default is 1024mb (1gb). Make sure the capacity of your disk drive is greater than the value you specify here.

    If you are using persistent queues to protect against data loss, but don’t require much buffering, you can set queue.max_bytes to a smaller value, such as 10mb, to produce smaller queues and improve queue performance.

If both queue.max_events and queue.max_bytes are specified, Logstash uses whichever criteria is reached first. See Handling Back Pressure for behavior when these queue limits are reached.

You can also control when the checkpoint file gets updated by setting queue.checkpoint.writes. See Controlling Durability.

Example configuration:

queue.type: persisted
queue.max_bytes: 4gb

Handling Back Pressure

edit

When the queue is full, Logstash puts back pressure on the inputs to stall data flowing into Logstash. This mechanism helps Logstash control the rate of data flow at the input stage without overwhelming outputs like Elasticsearch.

Use queue.max_bytes setting to configure the total capacity of the queue on disk. The following example sets the total capacity of the queue to 8gb:

queue.type: persisted
queue.max_bytes: 8gb

With these settings specified, Logstash will buffer events on disk until the size of the queue reaches 8gb. When the queue is full of unACKed events, and the size limit has been reached, Logstash will no longer accept new events.

Each input handles back pressure independently. For example, when the beats input encounters back pressure, it no longer accepts new connections and waits until the persistent queue has space to accept more events. After the filter and output stages finish processing existing events in the queue and ACKs them, Logstash automatically starts accepting new events.

Controlling Durability

edit

Durability is a property of storage writes that ensures data will be available after it’s written.

When the persistent queue feature is enabled, Logstash will store events on disk. Logstash commits to disk in a mechanism called checkpointing.

To discuss durability, we need to introduce a few details about how the persistent queue is implemented.

First, the queue itself is a set of pages. There are two kinds of pages: head pages and tail pages. The head page is where new events are written. There is only one head page. When the head page is of a certain size (see queue.page_capacity), it becomes a tail page, and a new head page is created. Tail pages are immutable, and the head page is append-only. Second, the queue records details about itself (pages, acknowledgements, etc) in a separate file called a checkpoint file.

When recording a checkpoint, Logstash will:

  • Call fsync on the head page.
  • Atomically write to disk the current state of the queue.

The process of checkpointing is atomic, which means any update to the file is saved if successful.

If Logstash is terminated, or if there is a hardware-level failure, any data that is buffered in the persistent queue, but not yet checkpointed, is lost.

You can force Logstash to checkpoint more frequently by setting queue.checkpoint.writes. This setting specifies the maximum number of events that may be written to disk before forcing a checkpoint. The default is 1024. To ensure maximum durability and avoid losing data in the persistent queue, you can set queue.checkpoint.writes: 1 to force a checkpoint after each event is written. Keep in mind that disk writes have a resource cost. Setting this value to 1 can severely impact performance.

Disk Garbage Collection

edit

On disk, the queue is stored as a set of pages where each page is one file. Each page can be at most queue.page_capacity in size. Pages are deleted (garbage collected) after all events in that page have been ACKed. If an older page has at least one event that is not yet ACKed, that entire page will remain on disk until all events in that page are successfully processed. Each page containing unprocessed events will count against the queue.max_bytes byte size.