Azure Event Hubs plugin

edit
  • Plugin version: v1.5.0
  • Released on: 2024-10-25
  • Changelog

For other versions, see the Versioned plugin docs.

Getting help

edit

For questions about the plugin, open a topic in the Discuss forums. For bugs or feature requests, open an issue in Github. For the list of Elastic supported plugins, please consult the Elastic Support Matrix.

Description

edit

This plugin consumes events from Azure Event Hubs, a highly scalable data streaming platform and event ingestion service. Event producers send events to the Azure Event Hub, and this plugin consumes those events for use with Logstash.

Many Azure services integrate with the Azure Event Hubs. Azure Monitor, for example, integrates with Azure Event Hubs to provide infrastructure metrics.

This plugin requires outbound connections to ports tcp/443, tcp/9093, tcp/5671, and tcp/5672, as noted in the Microsoft Event Hub documentation.

Event Hub connection string

edit

The plugin uses the connection string to access Azure Events Hubs. Find the connection string here: Azure Portal-> Event Hub -> Shared access polices. The event_hub_connections option passes the Event Hub connection strings for the basic configuration.

Sample connection string:

Endpoint=sb://logstash.servicebus.windows.net/;SharedAccessKeyName=activity-log-read-only;SharedAccessKey=mm6AbDcEfj8lk7sjsbzoTJ10qAkiSaG663YykEAG2eg=;EntityPath=insights-operational-logs

Blob Storage and connection string

edit

Azure Blob Storage account is an essential part of Azure-to-Logstash configuration. A Blob Storage account is a central location that enables multiple instances of Logstash to work together to process events. It records the offset (location) of processed events. On restart, Logstash resumes processing exactly where it left off.

Configuration notes:

  • A Blob Storage account is highly recommended for use with this plugin, and is likely required for production servers.
  • The storage_connection option passes the blob storage connection string.
  • Configure all Logstash instances to use the same storage_connection to get the benefits of shared processing.

Sample Blob Storage connection string:

DefaultEndpointsProtocol=https;AccountName=logstash;AccountKey=ETOPnkd/hDAWidkEpPZDiXffQPku/SZdXhPSLnfqdRTalssdEuPkZwIcouzXjCLb/xPZjzhmHfwRCGo0SBSw==;EndpointSuffix=core.windows.net

Find the connection string to Blob Storage here: Azure Portal-> Blob Storage account -> Access keys.

Best practices

edit

Here are some guidelines to help you avoid data conflicts that can cause lost events.

Create a Logstash consumer group
edit

Create a new consumer group specifically for Logstash. Do not use the $default or any other consumer group that might already be in use. Reusing consumer groups among non-related consumers can cause unexpected behavior and possibly lost events. All Logstash instances should use the same consumer group so that they can work together for processing events.

Avoid overwriting offset with multiple Event Hubs
edit

The offsets (position) of the Event Hubs are stored in the configured Azure Blob store. The Azure Blob store uses paths like a file system to store the offsets. If the paths between multiple Event Hubs overlap, then the offsets may be stored incorrectly.

To avoid duplicate file paths, use the advanced configuration model and make sure that at least one of these options is different per Event Hub:

  • storage_connection
  • storage_container (defaults to Event Hub name if not defined)
  • consumer_group
Set number of threads correctly
edit

By default, the number of threads used to service all event hubs is 16. And while this may be sufficient for most use cases, throughput may be improved by refining this number. When servicing a large number of partitions across one or more event hubs, setting a higher value may result in improved performance. The maximum number of threads is not strictly bound by the total number of partitions being serviced, but setting the value much higher than that may mean that some threads are idle.

The number of threads must be greater than or equal to the number of Event hubs plus one.

Threads are currently available only as a global setting across all event hubs in a single azure_event_hubs input definition. However if your configuration includes multiple azure_event_hubs inputs, the threads setting applies independently to each.

Example: Single event hub

If you’re collecting activity logs from one event hub instance, then only 2 threads are required.

  • Event hubs = 1
  • Minimum threads = 2 (1 Event Hub + 1)

Example: Multiple event hubs

If you are collecting activity logs from more than event hub instance, then at least 1 thread per event hub is required.

  • Event hubs = 4
  • Minimum threads = 5 (4 Event Hubs + 1)

When you are using multiple partitions per event hub, you may want to assign more threads. A good base level is (1 + number of event hubs * number of partitions). That is, one thread for each partition across all event hubs.

Configuration models

edit

This plugin supports two configuration models: basic and advanced. Basic configuration is recommended for most use cases, and is illustrated in the examples throughout this topic.

Basic configuration (default)

edit

Basic configuration is the default and supports consuming from multiple Event Hubs. All Events Hubs, except for the connection string, share the same configuration.

You supply a list of Event Hub connection strings, complete with the Event Hub EntityPath that defines the Event Hub name. All other configuration settings are shared.

input {
   azure_event_hubs {
      event_hub_connections => ["Endpoint=sb://example1...EntityPath=insights-logs-errors", "Endpoint=sb://example2...EntityPath=insights-metrics-pt1m"]
      threads => 8
      decorate_events => true
      consumer_group => "logstash"
      storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
   }
}

Advanced configuration

edit

The advanced configuration model accommodates deployments where different Event Hubs require different configurations. Options can be configured per Event Hub. You provide a list of Event Hub names through the event_hubs option. Under each name, specify the configuration for that Event Hub. Options can be defined globally or expressed per Event Hub.

If the same configuration option appears in both the global and event_hub sections, the more specific (event_hub) setting takes precedence.

Advanced configuration is not necessary or recommended for most use cases.

input {
   azure_event_hubs {
     config_mode => "advanced"
     threads => 8
     decorate_events => true
     storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
     event_hubs => [
        {"insights-operational-logs" => {
         event_hub_connection => "Endpoint=sb://example1..."
         initial_position => "beginning"
         consumer_group => "iam_team"
        }},
      {"insights-metrics-pt1m" => {
         event_hub_connection => "Endpoint=sb://example2..."
         initial_position => "end"
         consumer_group => "db_team"
       }}
     ]
   }
}

In this example, storage_connection and decorate_events are applied globally. The two Event Hubs each have their own settings for consumer_groups and initial_position.

Azure Event Hubs Configuration Options

edit

This plugin supports the following configuration options plus the Common options described later.

Setting Input type Required

config_mode

string, (basic or advanced)

No

event_hubs

array

Yes, when config_mode => advanced

event_hub_connections

array

Yes, when config_mode => basic

event_hub_connection

string

Yes, when config_mode => advanced

checkpoint_interval

number

No

consumer_group

string

No

decorate_events

boolean

No

initial_position

string, (beginning, end, or look_back)

No

initial_position_look_back

number

No, unless initial_position => look_back

max_batch_size

number

No

storage_connection

string

No

storage_container

string

No

threads

number

No

Also see Common options for a list of options supported by all input plugins.

All Event Hubs options are common to both basic and advanced configurations, with the following exceptions. The basic configuration uses event_hub_connections to support multiple connections. The advanced configuration uses event_hubs and event_hub_connection (singular).

config_mode

edit
  • Value type is string
  • Valid entries are basic or advanced
  • Default value is basic

Sets configuration to either Basic configuration (default) or Advanced configuration.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"  , "Endpoint=sb://example2...;EntityPath=event_hub_name2"  ]
}

event_hubs

edit
  • Value type is array
  • No default value
  • Ignored for basic configuration
  • Required for advanced configuration

Defines the Event Hubs to be read. An array of hashes where each entry is a hash of the Event Hub name and its configuration options.

azure_event_hubs {
  config_mode => "advanced"
  event_hubs => [
      { "event_hub_name1" => {
          event_hub_connection => "Endpoint=sb://example1..."
      }},
      { "event_hub_name2" => {
          event_hub_connection => "Endpoint=sb://example2..."
          storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
          storage_container => "my_container"
     }}
   ]
   consumer_group => "logstash" # shared across all Event Hubs
}

event_hub_connections

edit
  • Value type is array
  • No default value
  • Required for basic configuration

List of connection strings that identifies the Event Hubs to be read. Connection strings include the EntityPath for the Event Hub.

The event_hub_connections option is defined per Event Hub. All other configuration options are shared among Event Hubs.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"  , "Endpoint=sb://example2...;EntityPath=event_hub_name2"  ]
}

event_hub_connection

edit
  • Value type is string
  • No default value
  • Valid only for advanced configuration

Connection string that identifies the Event Hub to be read. Advanced configuration options can be set per Event Hub. This option modifies event_hub_name, and should be nested under it. (See sample.) This option accepts only one connection string.

azure_event_hubs {
   config_mode => "advanced"
   event_hubs => [
     { "event_hub_name1" => {
        event_hub_connection => "Endpoint=sb://example1...;EntityPath=event_hub_name1"
     }}
   ]
}

checkpoint_interval

edit
  • Value type is number
  • Default value is 5 seconds
  • Set to 0 to disable.

Interval in seconds to write checkpoints during batch processing. Checkpoints tell Logstash where to resume processing after a restart. Checkpoints are automatically written at the end of each batch, regardless of this setting.

Writing checkpoints too frequently can slow down processing unnecessarily.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   checkpoint_interval => 5
}

consumer_group

edit
  • Value type is string
  • Default value is $Default

Consumer group used to read the Event Hub(s). Create a consumer group specifically for Logstash. Then ensure that all instances of Logstash use that consumer group so that they can work together properly.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   consumer_group => "logstash"
}

decorate_events

edit
  • Value type is boolean
  • Default value is false

Adds metadata about the Event Hub, including Event Hub name, consumer_group, processor_host, partition, offset, sequence, timestamp, and event_size.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   decorate_events => true
}

initial_position

edit
  • Value type is string
  • Valid arguments are beginning, end, look_back
  • Default value is beginning

When first reading from an Event Hub, start from this position:

  • beginning reads all pre-existing events in the Event Hub
  • end does not read any pre-existing events in the Event Hub
  • look_back reads end minus a number of seconds worth of pre-existing events. You control the number of seconds using the initial_position_look_back option.

Note: If storage_connection is set, the initial_position value is used only the first time Logstash reads from the Event Hub.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   initial_position => "beginning"
}

initial_position_look_back

edit
  • Value type is number
  • Default value is 86400
  • Used only if initial_position is set to look-back

Number of seconds to look back to find the initial position for pre-existing events. This option is used only if initial_position is set to look_back. If storage_connection is set, this configuration applies only the first time Logstash reads from the Event Hub.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   initial_position => "look_back"
   initial_position_look_back => 86400
}

max_batch_size

edit
  • Value type is number
  • Default value is 125

Maximum number of events retrieved and processed together. A checkpoint is created after each batch. Increasing this value may help with performance, but requires more memory.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   max_batch_size => 125
}

storage_connection

edit
  • Value type is string
  • No default value

Connection string for blob account storage. Blob account storage persists the offsets between restarts, and ensures that multiple instances of Logstash process different partitions. When this value is set, restarts resume where processing left off. When this value is not set, the initial_position value is used on every restart.

We strongly recommend that you define this value for production environments.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
}

storage_container

edit
  • Value type is string
  • Defaults to the Event Hub name if not defined

Name of the storage container used to persist offsets and allow multiple instances of Logstash to work together.

azure_event_hubs {
   event_hub_connections => ["Endpoint=sb://example1...;EntityPath=event_hub_name1"]
   storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
   storage_container => "my_container"
}

To avoid overwriting offsets, you can use different storage containers. This is particularly important if you are monitoring two Event Hubs with the same name. You can use the advanced configuration model to configure different storage containers.

azure_event_hubs {
     config_mode => "advanced"
     consumer_group => "logstash"
     storage_connection => "DefaultEndpointsProtocol=https;AccountName=example...."
     event_hubs => [
        {"insights-operational-logs" => {
         event_hub_connection => "Endpoint=sb://example1..."
         storage_container => "insights-operational-logs-1"
        }},
        {"insights-operational-logs" => {
         event_hub_connection => "Endpoint=sb://example2..."
         storage_container => "insights-operational-logs-2"
        }}
     ]
   }

threads

edit
  • Value type is number
  • Minimum value is 2
  • Default value is 16

Total number of threads used to process events. The value you set here applies to all Event Hubs. Even with advanced configuration, this value is a global setting, and can’t be set per event hub.

azure_event_hubs {
   threads => 16
}

The number of threads should be the number of Event Hubs plus one or more. See Best practices for more information.

Common options

edit

These configuration options are supported by all input plugins:

Setting Input type Required

add_field

hash

No

codec

codec

No

enable_metric

boolean

No

id

string

No

tags

array

No

type

string

No

add_field

edit
  • Value type is hash
  • Default value is {}

Add a field to an event

codec

edit
  • Value type is codec
  • Default value is "plain"

The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.

enable_metric

edit
  • Value type is boolean
  • Default value is true

Disable or enable metric logging for this specific plugin instance by default we record all the metrics we can, but you can disable metrics collection for a specific plugin.

  • Value type is string
  • There is no default value for this setting.

Add a unique ID to the plugin configuration. If no ID is specified, Logstash will generate one. It is strongly recommended to set this ID in your configuration. This is particularly useful when you have two or more plugins of the same type, for example, if you have 2 azure_event_hubs inputs. Adding a named ID in this case will help in monitoring Logstash when using the monitoring APIs.

input {
  azure_event_hubs {
    id => "my_plugin_id"
  }
}

Variable substitution in the id field only supports environment variables and does not support the use of values from the secret store.

tags

edit
  • Value type is array
  • There is no default value for this setting.

Add any number of arbitrary tags to your event.

This can help with processing later.

type

edit
  • Value type is string
  • There is no default value for this setting.

Add a type field to all events handled by this input.

Types are used mainly for filter activation.

The type is stored as part of the event itself, so you can also use the type to search for it in Kibana.

If you try to set a type on an event that already has one (for example when you send an event from a shipper to an indexer) then a new input will not override the existing type. A type set at the shipper stays with that event for its life even when sent to another Logstash server.