kafka

edit

This input will read events from a Kafka topic. It uses the high level consumer API provided by Kafka to read messages from the broker. It also maintains the state of what has been consumed using Zookeeper. The default input codec is json.

Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:

Kafka Client Version Logstash Version Plugin Version Security Features Why?

0.8

2.0.0 - 2.x.x

<3.0.0

Legacy, 0.8 is still popular

0.9

2.0.0 - 2.3.x

3.x.x

Basic Auth, SSL

We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.

You must configure topic_id, white_list or black_list. By default it will connect to a Zookeeper running on localhost. All the broker information is read from Zookeeper state.

Ideally you should have as many threads as the number of partitions for a perfect balance — more threads than partitions means that some threads will be idle

For more information see http://kafka.apache.org/documentation.html#theconsumer

Kafka consumer configuration: http://kafka.apache.org/documentation.html#consumerconfigs

 

Synopsis

edit

This plugin supports the following configuration options:

Required configuration options:

kafka {
}

Available configuration options:

Setting Input type Required Default value

add_field

hash

No

{}

auto_offset_reset

string, one of ["largest", "smallest"]

No

"largest"

black_list

string

No

nil

codec

codec

No

"json"

consumer_id

string

No

nil

consumer_restart_on_error

boolean

No

true

consumer_restart_sleep_ms

number

No

0

consumer_threads

number

No

1

consumer_timeout_ms

number

No

-1

decoder_class

string

No

"kafka.serializer.DefaultDecoder"

decorate_events

boolean

No

false

fetch_message_max_bytes

number

No

1048576

group_id

string

No

"logstash"

key_decoder_class

string

No

"kafka.serializer.DefaultDecoder"

queue_size

number

No

20

rebalance_backoff_ms

number

No

2000

rebalance_max_retries

number

No

4

reset_beginning

boolean

No

false

tags

array

No

topic_id

string

No

nil

type

string

No

white_list

string

No

nil

zk_connect

string

No

"localhost:2181"

Details

edit

 

add_field

edit
  • Value type is hash
  • Default value is {}

Add a field to an event

auto_offset_reset

edit
  • Value can be any of: largest, smallest
  • Default value is "largest"

smallest or largest - (optional, default largest) If the consumer does not already have an established offset or offset is invalid, start with the earliest message present in the log (smallest) or after the last message in the log (largest).

black_list

edit
  • Value type is string
  • Default value is nil

Blacklist of topics to exclude from consumption.

codec

edit
  • Value type is codec
  • Default value is "json"

The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.

consumer_id

edit
  • Value type is string
  • Default value is nil

A unique id for the consumer; generated automatically if not set.

consumer_restart_on_error

edit
  • Value type is boolean
  • Default value is true

Option to restart the consumer loop on error

consumer_restart_sleep_ms

edit
  • Value type is number
  • Default value is 0

Time in millis to wait for consumer to restart after an error

consumer_threads

edit
  • Value type is number
  • Default value is 1

Number of threads to read from the partitions. Ideally you should have as many threads as the number of partitions for a perfect balance. More threads than partitions means that some threads will be idle. Less threads means a single thread could be consuming from more than one partition

consumer_timeout_ms

edit
  • Value type is number
  • Default value is -1

Throw a timeout exception to the consumer if no message is available for consumption after the specified interval

decoder_class

edit
  • Value type is string
  • Default value is "kafka.serializer.DefaultDecoder"

The serializer class for messages. The default decoder takes a byte[] and returns the same byte[]

decorate_events

edit
  • Value type is boolean
  • Default value is false

Option to add Kafka metadata like topic, message size to the event. This will add a field named kafka to the logstash event containing the following attributes: msg_size: The complete serialized size of this message in bytes (including crc, header attributes, etc) topic: The topic this message is associated with consumer_group: The consumer group used to read in this event partition: The partition this message is associated with key: A ByteBuffer containing the message key

fetch_message_max_bytes

edit
  • Value type is number
  • Default value is 1048576

The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch.

group_id

edit
  • Value type is string
  • Default value is "logstash"

A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.

key_decoder_class

edit
  • Value type is string
  • Default value is "kafka.serializer.DefaultDecoder"

The serializer class for keys (defaults to the same default as for messages)

queue_size

edit
  • Value type is number
  • Default value is 20

Internal Logstash queue size used to hold events in memory after it has been read from Kafka

rebalance_backoff_ms

edit
  • Value type is number
  • Default value is 2000

Backoff time between retries during rebalance.

rebalance_max_retries

edit
  • Value type is number
  • Default value is 4

When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.

reset_beginning

edit
  • Value type is boolean
  • Default value is false

Reset the consumer group to start at the earliest message present in the log by clearing any offsets for the group stored in Zookeeper. This is destructive! Must be used in conjunction with auto_offset_reset ⇒ smallest

tags

edit
  • Value type is array
  • There is no default value for this setting.

Add any number of arbitrary tags to your event.

This can help with processing later.

topic_id

edit
  • Value type is string
  • Default value is nil

The topic to consume messages from

type

edit
  • Value type is string
  • There is no default value for this setting.

Add a type field to all events handled by this input.

Types are used mainly for filter activation.

The type is stored as part of the event itself, so you can also use the type to search for it in Kibana.

If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.

white_list

edit
  • Value type is string
  • Default value is nil

Whitelist of topics to include for consumption.

zk_connect

edit
  • Value type is string
  • Default value is "localhost:2181"

Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. You can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.

The server may also have a ZooKeeper chroot path as part of it’s ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string. For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.