kafka
editkafka
editThis input will read events from a Kafka topic. It uses the high level consumer API provided by Kafka to read messages from the broker. It also maintains the state of what has been consumed using Zookeeper. The default input codec is json.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka input plugin:
Kafka Client Version | Logstash Version | Plugin Version | Security Features | Why? |
---|---|---|---|---|
0.8 |
2.0.0 - 2.x.x |
<3.0.0 |
Legacy, 0.8 is still popular |
|
0.9 |
2.0.0 - 2.3.x |
3.x.x |
Basic Auth, SSL |
We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
You must configure topic_id
, white_list
or black_list
. By default it will connect to a
Zookeeper running on localhost. All the broker information is read from Zookeeper state.
Ideally you should have as many threads as the number of partitions for a perfect balance — more threads than partitions means that some threads will be idle
For more information see http://kafka.apache.org/documentation.html#theconsumer
Kafka consumer configuration: http://kafka.apache.org/documentation.html#consumerconfigs
Synopsis
editThis plugin supports the following configuration options:
Required configuration options:
kafka { }
Available configuration options:
Setting | Input type | Required | Default value |
---|---|---|---|
No |
|
||
string, one of |
No |
|
|
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|||
No |
|
||
No |
|||
No |
|
||
No |
|
Details
edit
auto_offset_reset
edit-
Value can be any of:
largest
,smallest
-
Default value is
"largest"
smallest
or largest
- (optional, default largest
) If the consumer does not already
have an established offset or offset is invalid, start with the earliest message present in the
log (smallest
) or after the last message in the log (largest
).
black_list
edit- Value type is string
-
Default value is
nil
Blacklist of topics to exclude from consumption.
codec
edit- Value type is codec
-
Default value is
"json"
The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.
consumer_id
edit- Value type is string
-
Default value is
nil
A unique id for the consumer; generated automatically if not set.
consumer_restart_on_error
edit- Value type is boolean
-
Default value is
true
Option to restart the consumer loop on error
consumer_restart_sleep_ms
edit- Value type is number
-
Default value is
0
Time in millis to wait for consumer to restart after an error
consumer_threads
edit- Value type is number
-
Default value is
1
Number of threads to read from the partitions. Ideally you should have as many threads as the number of partitions for a perfect balance. More threads than partitions means that some threads will be idle. Less threads means a single thread could be consuming from more than one partition
consumer_timeout_ms
edit- Value type is number
-
Default value is
-1
Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
decoder_class
edit- Value type is string
-
Default value is
"kafka.serializer.DefaultDecoder"
The serializer class for messages. The default decoder takes a byte[] and returns the same byte[]
decorate_events
edit- Value type is boolean
-
Default value is
false
Option to add Kafka metadata like topic, message size to the event.
This will add a field named kafka
to the logstash event containing the following attributes:
msg_size
: The complete serialized size of this message in bytes (including crc, header attributes, etc)
topic
: The topic this message is associated with
consumer_group
: The consumer group used to read in this event
partition
: The partition this message is associated with
key
: A ByteBuffer containing the message key
fetch_message_max_bytes
edit- Value type is number
-
Default value is
1048576
The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch.
group_id
edit- Value type is string
-
Default value is
"logstash"
A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.
key_decoder_class
edit- Value type is string
-
Default value is
"kafka.serializer.DefaultDecoder"
The serializer class for keys (defaults to the same default as for messages)
queue_size
edit- Value type is number
-
Default value is
20
Internal Logstash queue size used to hold events in memory after it has been read from Kafka
rebalance_backoff_ms
edit- Value type is number
-
Default value is
2000
Backoff time between retries during rebalance.
rebalance_max_retries
edit- Value type is number
-
Default value is
4
When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.
reset_beginning
edit- Value type is boolean
-
Default value is
false
Reset the consumer group to start at the earliest message present in the log by clearing any offsets for the group stored in Zookeeper. This is destructive! Must be used in conjunction with auto_offset_reset ⇒ smallest
tags
edit- Value type is array
- There is no default value for this setting.
Add any number of arbitrary tags to your event.
This can help with processing later.
type
edit- Value type is string
- There is no default value for this setting.
Add a type
field to all events handled by this input.
Types are used mainly for filter activation.
The type is stored as part of the event itself, so you can also use the type to search for it in Kibana.
If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.
white_list
edit- Value type is string
-
Default value is
nil
Whitelist of topics to include for consumption.
zk_connect
edit- Value type is string
-
Default value is
"localhost:2181"
Specifies the ZooKeeper connection string in the form hostname:port where host and port are
the host and port of a ZooKeeper server. You can also specify multiple hosts in the form
hostname1:port1,hostname2:port2,hostname3:port3
.
The server may also have a ZooKeeper chroot path as part of it’s ZooKeeper connection string
which puts its data under some path in the global ZooKeeper namespace. If so the consumer
should use the same chroot path in its connection string. For example to give a chroot path of
/chroot/path
you would give the connection string as
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
.