kafka

edit

Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.

The only required configuration is the topic name. The default codec is json, so events will be persisted on the broker in json format. If you select a codec of plain, Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:

    output {
      kafka {
        codec => plain {
           format => "%{message}"
        }
      }
    }
For more information see http://kafka.apache.org/documentation.html#theproducer

Kafka producer configuration: http://kafka.apache.org/documentation.html#producerconfigs

 

Synopsis

edit

This plugin supports the following configuration options:

Required configuration options:

kafka {
    topic_id => ...
}

Available configuration options:

Setting Input type Required Default value

batch_num_messages

number

No

200

broker_list

string

No

"localhost:9092"

client_id

string

No

""

codec

codec

No

"json"

compressed_topics

string

No

""

compression_codec

string, one of ["none", "gzip", "snappy"]

No

"none"

key_serializer_class

string

No

"kafka.serializer.StringEncoder"

message_send_max_retries

number

No

3

partition_key_format

string

No

nil

partitioner_class

string

No

"kafka.producer.DefaultPartitioner"

producer_type

string, one of ["sync", "async"]

No

"sync"

queue_buffering_max_messages

number

No

10000

queue_buffering_max_ms

number

No

5000

queue_enqueue_timeout_ms

number

No

-1

request_required_acks

string, one of [-1, 0, 1]

No

0

request_timeout_ms

number

No

10000

retry_backoff_ms

number

No

100

send_buffer_bytes

number

No

102400

serializer_class

string

No

"kafka.serializer.StringEncoder"

topic_id

string

Yes

topic_metadata_refresh_interval_ms

number

No

600000

workers

number

No

1

Details

edit

 

batch_num_messages

edit
  • Value type is number
  • Default value is 200

The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.

broker_list

edit
  • Value type is string
  • Default value is "localhost:9092"

This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.

client_id

edit
  • Value type is string
  • Default value is ""

The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.

codec

edit
  • Value type is codec
  • Default value is "json"

The codec used for output data. Output codecs are a convenient method for encoding your data before it leaves the output, without needing a separate filter in your Logstash pipeline.

compressed_topics

edit
  • Value type is string
  • Default value is ""

This parameter allows you to set whether compression should be turned on for particular topics. If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics. If the compression codec is NoCompressionCodec, compression is disabled for all topics

compression_codec

edit
  • Value can be any of: none, gzip, snappy
  • Default value is "none"

This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy.

exclude_tags (DEPRECATED)

edit
  • DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
  • Value type is array
  • Default value is []

Only handle events without any of these tags. Optional.

key_serializer_class

edit
  • Value type is string
  • Default value is "kafka.serializer.StringEncoder"

The serializer class for keys (defaults to the same as for messages if nothing is given)

message_send_max_retries

edit
  • Value type is number
  • Default value is 3

This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Note that setting a non-zero value here can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost.

partition_key_format

edit
  • Value type is string
  • Default value is nil

Provides a way to specify a partition key as a string. To specify a partition key for Kafka, configure a format that will produce the key as a string. Defaults key_serializer_class to kafka.serializer.StringEncoder to match. For example, to partition by host:

    output {
      kafka {
          partition_key_format => "%{host}"
      }
    }

partitioner_class

edit
  • Value type is string
  • Default value is "kafka.producer.DefaultPartitioner"

The partitioner class for partitioning messages amongst partitions in the topic. The default partitioner is based on the hash of the key. If the key is null, the message is sent to a random partition in the broker. NOTE: topic_metadata_refresh_interval_ms controls how long the producer will distribute to a partition in the topic. This defaults to 10 mins, so the producer will continue to write to a single partition for 10 mins before it switches

producer_type

edit
  • Value can be any of: sync, async
  • Default value is "sync"

This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.

queue_buffering_max_messages

edit
  • Value type is number
  • Default value is 10000

The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped.

queue_buffering_max_ms

edit
  • Value type is number
  • Default value is 5000

Maximum time to buffer data when using async mode. For example a setting of 100 will try to batch together 100ms of messages to send at once. This will improve throughput but adds message delivery latency due to the buffering.

queue_enqueue_timeout_ms

edit
  • Value type is number
  • Default value is -1

The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send.

request_required_acks

edit
  • Value can be any of: -1, 0, 1
  • Default value is 0

This value controls when a produce request is considered completed. Specifically, how many other brokers must have committed the data to their log and acknowledged this to the leader. For more info, see — http://kafka.apache.org/documentation.html#producerconfigs

request_timeout_ms

edit
  • Value type is number
  • Default value is 10000

The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.

retry_backoff_ms

edit
  • Value type is number
  • Default value is 100

Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.

send_buffer_bytes

edit
  • Value type is number
  • Default value is 102400

Socket write buffer size

serializer_class

edit
  • Value type is string
  • Default value is "kafka.serializer.StringEncoder"

The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]

tags (DEPRECATED)

edit
  • DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
  • Value type is array
  • Default value is []

Only handle events with all of these tags. Optional.

topic_id

edit
  • This is a required setting.
  • Value type is string
  • There is no default value for this setting.

The topic to produce the messages to

topic_metadata_refresh_interval_ms

edit
  • Value type is number
  • Default value is 600000

The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…​). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed

type (DEPRECATED)

edit
  • DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
  • Value type is string
  • Default value is ""

The type to act on. If a type is given, then this output will only act on messages with the same type. See any input plugin’s type attribute for more. Optional.

workers

edit
  • Value type is number
  • Default value is 1

The number of workers to use for this output. Note that this setting may not be useful for all outputs.