Azure Event Hubs plugin
editAzure Event Hubs plugin
edit- Plugin version: v1.5.0
- Released on: 2024-10-25
- Changelog
For other versions, see the Versioned plugin docs.
Getting help
editFor questions about the plugin, open a topic in the Discuss forums. For bugs or feature requests, open an issue in Github. For the list of Elastic supported plugins, please consult the Elastic Support Matrix.
Description
editThis plugin consumes events from Azure Event Hubs, a highly scalable data streaming platform and event ingestion service. Event producers send events to the Azure Event Hub, and this plugin consumes those events for use with Logstash.
Many Azure services integrate with the Azure Event Hubs. Azure Monitor, for example, integrates with Azure Event Hubs to provide infrastructure metrics.
This plugin requires outbound connections to ports tcp/443
, tcp/9093
, tcp/5671
, and tcp/5672
,
as noted in the Microsoft Event Hub documentation.
Event Hub connection string
editThe plugin uses the connection string to access Azure Events Hubs. Find the
connection string here: Azure Portal-> Event Hub ->
Shared access polices
. The event_hub_connections option passes the Event Hub
connection strings for the basic configuration.
Sample connection string:
Endpoint=sb://logstash.servicebus.windows.net/;SharedAccessKeyName=activity-log-read-only;SharedAccessKey=mm6AbDcEfj8lk7sjsbzoTJ10qAkiSaG663YykEAG2eg=;EntityPath=insights-operational-logs
Blob Storage and connection string
editAzure Blob Storage account is an essential part of Azure-to-Logstash configuration. A Blob Storage account is a central location that enables multiple instances of Logstash to work together to process events. It records the offset (location) of processed events. On restart, Logstash resumes processing exactly where it left off.
Configuration notes:
- A Blob Storage account is highly recommended for use with this plugin, and is likely required for production servers.
-
The
storage_connection
option passes the blob storage connection string. -
Configure all Logstash instances to use the same
storage_connection
to get the benefits of shared processing.
Sample Blob Storage connection string:
DefaultEndpointsProtocol=https;AccountName=logstash;AccountKey=ETOPnkd/hDAWidkEpPZDiXffQPku/SZdXhPSLnfqdRTalssdEuPkZwIcouzXjCLb/xPZjzhmHfwRCGo0SBSw==;EndpointSuffix=core.windows.net
Find the connection string to Blob Storage here:
Azure Portal-> Blob Storage account -> Access keys
.
Best practices
editHere are some guidelines to help you avoid data conflicts that can cause lost events.
Create a Logstash consumer group
editCreate a new consumer group specifically for Logstash. Do not use the $default or any other consumer group that might already be in use. Reusing consumer groups among non-related consumers can cause unexpected behavior and possibly lost events. All Logstash instances should use the same consumer group so that they can work together for processing events.
Avoid overwriting offset with multiple Event Hubs
editThe offsets (position) of the Event Hubs are stored in the configured Azure Blob store. The Azure Blob store uses paths like a file system to store the offsets. If the paths between multiple Event Hubs overlap, then the offsets may be stored incorrectly.
To avoid duplicate file paths, use the advanced configuration model and make sure that at least one of these options is different per Event Hub:
- storage_connection
- storage_container (defaults to Event Hub name if not defined)
- consumer_group
Set number of threads correctly
editBy default, the number of threads used to service all event hubs is 16
. And while this
may be sufficient for most use cases, throughput may be improved by refining this number.
When servicing a large number of partitions across one or more event hubs, setting a higher
value may result in improved performance. The maximum number of threads is not strictly bound
by the total number of partitions being serviced, but setting the value much higher than
that may mean that some threads are idle.
The number of threads must be greater than or equal to the number of Event hubs plus one.
Threads are currently available only as a global setting across all event hubs in a single azure_event_hubs
input definition. However if your configuration includes multiple azure_event_hubs
inputs, the threads setting applies
independently to each.
Example: Single event hub
If you’re collecting activity logs from one event hub instance, then only 2 threads are required.
- Event hubs = 1
- Minimum threads = 2 (1 Event Hub + 1)
Example: Multiple event hubs
If you are collecting activity logs from more than event hub instance, then at least 1 thread per event hub is required.
- Event hubs = 4
- Minimum threads = 5 (4 Event Hubs + 1)
When you are using multiple partitions per event hub, you may want to assign more threads.
A good base level is (1 + number of event hubs * number of partitions
).
That is, one thread for each partition across all event hubs.
Configuration models
editThis plugin supports two configuration models: basic and advanced. Basic configuration is recommended for most use cases, and is illustrated in the examples throughout this topic.
Basic configuration (default)
editBasic configuration is the default and supports consuming from multiple Event Hubs. All Events Hubs, except for the connection string, share the same configuration.
You supply a list of Event Hub connection strings, complete with the Event Hub EntityPath that defines the Event Hub name. All other configuration settings are shared.
input { azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...EntityPath=insights-logs-errors", "Endpoint=sb://example2...EntityPath=insights-metrics-pt1m"] threads => 8 decorate_events => true consumer_group => "logstash" storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." } }
Advanced configuration
editThe advanced configuration model accommodates deployments where different Event
Hubs require different configurations. Options can be configured per Event Hub.
You provide a list of Event Hub names through the event_hubs
option. Under
each name, specify the configuration for that Event Hub. Options can be defined
globally or expressed per Event Hub.
If the same configuration option appears in both the global and event_hub
sections, the more specific (event_hub) setting takes precedence.
Advanced configuration is not necessary or recommended for most use cases.
input { azure_event_hubs { config_mode => "advanced" threads => 8 decorate_events => true storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." event_hubs => [ {"insights-operational-logs" => { event_hub_connection => "Endpoint=sb://example1..." initial_position => "beginning" consumer_group => "iam_team" }}, {"insights-metrics-pt1m" => { event_hub_connection => "Endpoint=sb://example2..." initial_position => "end" consumer_group => "db_team" }} ] } }
In this example, storage_connection
and decorate_events
are applied globally.
The two Event Hubs each have their own settings for consumer_groups
and initial_position
.
Azure Event Hubs Configuration Options
editThis plugin supports the following configuration options plus the Common options described later.
Setting | Input type | Required |
---|---|---|
string, ( |
No |
|
Yes, when |
||
Yes, when |
||
Yes, when |
||
No |
||
No |
||
No |
||
string, ( |
No |
|
No, unless |
||
No |
||
No |
||
No |
||
No |
Also see Common options for a list of options supported by all input plugins.
All Event Hubs options are common to both basic and advanced
configurations, with the following exceptions. The basic configuration uses
event_hub_connections
to support multiple connections. The advanced
configuration uses event_hubs
and event_hub_connection
(singular).
config_mode
edit- Value type is string
-
Valid entries are
basic
oradvanced
-
Default value is
basic
Sets configuration to either Basic configuration (default) or Advanced configuration.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1" , "Endpoint=sb://example2...;EntityPath=event_hub_name2" ] }
event_hubs
edit- Value type is array
- No default value
- Ignored for basic configuration
- Required for advanced configuration
Defines the Event Hubs to be read. An array of hashes where each entry is a hash of the Event Hub name and its configuration options.
azure_event_hubs { config_mode => "advanced" event_hubs => [ { "event_hub_name1" => { event_hub_connection => "Endpoint=sb://example1..." }}, { "event_hub_name2" => { event_hub_connection => "Endpoint=sb://example2..." storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." storage_container => "my_container" }} ] consumer_group => "logstash" # shared across all Event Hubs }
event_hub_connections
edit- Value type is array
- No default value
- Required for basic configuration
List of connection strings that identifies the Event Hubs to be read. Connection strings include the EntityPath for the Event Hub.
The event_hub_connections
option is defined
per Event Hub. All other configuration options are shared among Event Hubs.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1" , "Endpoint=sb://example2...;EntityPath=event_hub_name2" ] }
event_hub_connection
edit- Value type is string
- No default value
- Valid only for advanced configuration
Connection string that identifies the Event Hub to be read. Advanced
configuration options can be set per Event Hub. This option modifies
event_hub_name
, and should be nested under it. (See sample.) This option
accepts only one connection string.
azure_event_hubs { config_mode => "advanced" event_hubs => [ { "event_hub_name1" => { event_hub_connection => "Endpoint=sb://example1...;EntityPath=event_hub_name1" }} ] }
checkpoint_interval
edit- Value type is number
-
Default value is
5
seconds -
Set to
0
to disable.
Interval in seconds to write checkpoints during batch processing. Checkpoints tell Logstash where to resume processing after a restart. Checkpoints are automatically written at the end of each batch, regardless of this setting.
Writing checkpoints too frequently can slow down processing unnecessarily.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] checkpoint_interval => 5 }
consumer_group
edit- Value type is string
-
Default value is
$Default
Consumer group used to read the Event Hub(s). Create a consumer group specifically for Logstash. Then ensure that all instances of Logstash use that consumer group so that they can work together properly.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] consumer_group => "logstash" }
decorate_events
edit- Value type is boolean
-
Default value is
false
Adds metadata about the Event Hub, including Event Hub name, consumer_group, processor_host, partition, offset, sequence, timestamp, and event_size.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] decorate_events => true }
initial_position
edit- Value type is string
-
Valid arguments are
beginning
,end
,look_back
-
Default value is
beginning
When first reading from an Event Hub, start from this position:
-
beginning
reads all pre-existing events in the Event Hub -
end
does not read any pre-existing events in the Event Hub -
look_back
readsend
minus a number of seconds worth of pre-existing events. You control the number of seconds using theinitial_position_look_back
option.
Note: If storage_connection
is set, the initial_position
value is used only
the first time Logstash reads from the Event Hub.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] initial_position => "beginning" }
initial_position_look_back
edit- Value type is number
-
Default value is
86400
-
Used only if
initial_position
is set tolook-back
Number of seconds to look back to find the initial position for pre-existing
events. This option is used only if initial_position
is set to look_back
. If
storage_connection
is set, this configuration applies only the first time Logstash
reads from the Event Hub.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] initial_position => "look_back" initial_position_look_back => 86400 }
max_batch_size
edit- Value type is number
-
Default value is
125
Maximum number of events retrieved and processed together. A checkpoint is created after each batch. Increasing this value may help with performance, but requires more memory.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] max_batch_size => 125 }
storage_connection
edit- Value type is string
- No default value
Connection string for blob account storage. Blob account storage persists the
offsets between restarts, and ensures that multiple instances of Logstash
process different partitions.
When this value is set, restarts resume where processing left off.
When this value is not set, the initial_position
value is used on every restart.
We strongly recommend that you define this value for production environments.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." }
storage_container
edit- Value type is string
- Defaults to the Event Hub name if not defined
Name of the storage container used to persist offsets and allow multiple instances of Logstash to work together.
azure_event_hubs { event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"] storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." storage_container => "my_container" }
To avoid overwriting offsets, you can use different storage containers. This is particularly important if you are monitoring two Event Hubs with the same name. You can use the advanced configuration model to configure different storage containers.
azure_event_hubs { config_mode => "advanced" consumer_group => "logstash" storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...." event_hubs => [ {"insights-operational-logs" => { event_hub_connection => "Endpoint=sb://example1..." storage_container => "insights-operational-logs-1" }}, {"insights-operational-logs" => { event_hub_connection => "Endpoint=sb://example2..." storage_container => "insights-operational-logs-2" }} ] }
threads
edit- Value type is number
-
Minimum value is
2
-
Default value is
16
Total number of threads used to process events. The value you set here applies to all Event Hubs. Even with advanced configuration, this value is a global setting, and can’t be set per event hub.
azure_event_hubs { threads => 16 }
The number of threads should be the number of Event Hubs plus one or more. See Best practices for more information.
Common options
editThese configuration options are supported by all input plugins:
Setting | Input type | Required |
---|---|---|
No |
||
No |
||
No |
||
No |
||
No |
||
No |
codec
edit- Value type is codec
-
Default value is
"plain"
The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.
enable_metric
edit- Value type is boolean
-
Default value is
true
Disable or enable metric logging for this specific plugin instance by default we record all the metrics we can, but you can disable metrics collection for a specific plugin.
id
edit- Value type is string
- There is no default value for this setting.
Add a unique ID
to the plugin configuration. If no ID is specified, Logstash will generate one.
It is strongly recommended to set this ID in your configuration. This is particularly useful
when you have two or more plugins of the same type, for example, if you have 2 azure_event_hubs inputs.
Adding a named ID in this case will help in monitoring Logstash when using the monitoring APIs.
input { azure_event_hubs { id => "my_plugin_id" } }
Variable substitution in the id
field only supports environment variables
and does not support the use of values from the secret store.
tags
edit- Value type is array
- There is no default value for this setting.
Add any number of arbitrary tags to your event.
This can help with processing later.
type
edit- Value type is string
- There is no default value for this setting.
Add a type
field to all events handled by this input.
Types are used mainly for filter activation.
The type is stored as part of the event itself, so you can also use the type to search for it in Kibana.
If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.