kafka
editkafka
editWrite events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.
The only required configuration is the topic name. The default codec is json, so events will be persisted on the broker in json format. If you select a codec of plain, Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:
output { kafka { codec => plain { format => "%{message}" } } } For more information see http://kafka.apache.org/documentation.html#theproducer
Kafka producer configuration: http://kafka.apache.org/documentation.html#producerconfigs
Synopsis
editThis plugin supports the following configuration options:
Required configuration options:
kafka { topic_id => ... }
Available configuration options:
Setting | Input type | Required | Default value |
---|---|---|---|
No |
|
||
No |
|
||
No |
|
||
No |
|
||
No |
|
||
string, one of |
No |
|
|
No |
|
||
No |
|
||
No |
|
||
No |
|
||
string, one of |
No |
|
|
No |
|
||
No |
|
||
No |
|
||
string, one of |
No |
|
|
No |
|
||
No |
|
||
No |
|
||
No |
|
||
Yes |
|||
No |
|
||
No |
|
Details
edit
batch_num_messages
edit- Value type is number
-
Default value is
200
The number of messages to send in one batch when using async mode. The producer will wait
until either this number of messages are ready to send or queue.buffer.max.ms
is reached.
broker_list
edit- Value type is string
-
Default value is
"localhost:9092"
This is for bootstrapping and the producer will only use it for getting metadata (topics,
partitions and replicas). The socket connections for sending the actual data will be
established based on the broker information returned in the metadata. The format is
host1:port1,host2:port2
, and the list can be a subset of brokers or a VIP pointing to a
subset of brokers.
client_id
edit- Value type is string
-
Default value is
""
The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
codec
edit- Value type is codec
-
Default value is
"json"
The codec used for output data. Output codecs are a convenient method for encoding your data before it leaves the output, without needing a separate filter in your Logstash pipeline.
compressed_topics
edit- Value type is string
-
Default value is
""
This parameter allows you to set whether compression should be turned on for particular
topics. If the compression codec is anything other than NoCompressionCodec
,
enable compression only for specified topics if any. If the list of compressed topics is
empty, then enable the specified compression codec for all topics. If the compression codec
is NoCompressionCodec
, compression is disabled for all topics
compression_codec
edit-
Value can be any of:
none
,gzip
,snappy
-
Default value is
"none"
This parameter allows you to specify the compression codec for all data generated by this
producer. Valid values are none
, gzip
and snappy
.
exclude_tags
(DEPRECATED)
edit- DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
- Value type is array
-
Default value is
[]
Only handle events without any of these tags. Optional.
key_serializer_class
edit- Value type is string
-
Default value is
"kafka.serializer.StringEncoder"
The serializer class for keys (defaults to the same as for messages if nothing is given)
message_send_max_retries
edit- Value type is number
-
Default value is
3
This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Note that setting a non-zero value here can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost.
partition_key_format
edit- Value type is string
-
Default value is
nil
Provides a way to specify a partition key as a string. To specify a partition key for
Kafka, configure a format that will produce the key as a string. Defaults
key_serializer_class
to kafka.serializer.StringEncoder
to match. For example, to partition
by host:
output { kafka { partition_key_format => "%{host}" } }
partitioner_class
edit- Value type is string
-
Default value is
"kafka.producer.DefaultPartitioner"
The partitioner class for partitioning messages amongst partitions in the topic. The default
partitioner is based on the hash of the key. If the key is null,
the message is sent to a random partition in the broker.
NOTE: topic_metadata_refresh_interval_ms
controls how long the producer will distribute to a
partition in the topic. This defaults to 10 mins, so the producer will continue to write to a
single partition for 10 mins before it switches
producer_type
edit-
Value can be any of:
sync
,async
-
Default value is
"sync"
This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.
queue_buffering_max_messages
edit- Value type is number
-
Default value is
10000
The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped.
queue_buffering_max_ms
edit- Value type is number
-
Default value is
5000
Maximum time to buffer data when using async mode. For example a setting of 100 will try to batch together 100ms of messages to send at once. This will improve throughput but adds message delivery latency due to the buffering.
queue_enqueue_timeout_ms
edit- Value type is number
-
Default value is
-1
The amount of time to block before dropping messages when running in async mode and the
buffer has reached queue.buffering.max.messages
. If set to 0 events will be enqueued
immediately or dropped if the queue is full (the producer send call will never block). If set
to -1 the producer will block indefinitely and never willingly drop a send.
request_required_acks
edit-
Value can be any of:
-1
,0
,1
-
Default value is
0
This value controls when a produce request is considered completed. Specifically, how many other brokers must have committed the data to their log and acknowledged this to the leader. For more info, see — http://kafka.apache.org/documentation.html#producerconfigs
request_timeout_ms
edit- Value type is number
-
Default value is
10000
The amount of time the broker will wait trying to meet the request.required.acks
requirement
before sending back an error to the client.
retry_backoff_ms
edit- Value type is number
-
Default value is
100
Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
serializer_class
edit- Value type is string
-
Default value is
"kafka.serializer.StringEncoder"
The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]
tags
(DEPRECATED)
edit- DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
- Value type is array
-
Default value is
[]
Only handle events with all of these tags. Optional.
topic_id
edit- This is a required setting.
- Value type is string
- There is no default value for this setting.
The topic to produce the messages to
topic_metadata_refresh_interval_ms
edit- Value type is number
-
Default value is
600000
The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed
type
(DEPRECATED)
edit- DEPRECATED WARNING: This configuration item is deprecated and may not be available in future versions.
- Value type is string
-
Default value is
""
The type to act on. If a type is given, then this output will only
act on messages with the same type. See any input plugin’s type
attribute for more.
Optional.