Streaming Input
editStreaming Input
editThis functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.
The streaming
input reads messages from a streaming data source, for example a websocket server. This input uses the CEL engine
and the mito
library internally to parse and process the messages. Having support for CEL
allows you to parse and process the messages in a more flexible way. It has many similarities with the cel
input as to how the CEL
programs are written but differs in the way the messages are read and processed. Currently websocket server or API endpoints, and the Crowdstrike Falcon streaming API are supported.
The websocket streaming input supports:
-
Auth
- Basic
- Bearer
- Custom
The streaming
input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.
The Crowdstrike streaming input requires OAuth2.0 as described in the Crowdstrike documentation for the API. When using the Crowdstrike streaming type, the crowdstrike_app_id
configuration field must be set. This field specifies the appId
parameter sent to the Crowdstrike API. See the Crowdstrike documentation for details.
The stream_type
configuration field specifies which type of streaming input to use, "websocket" or "crowdstrike". If it is not set, the input defaults to websocket streaming .
Execution
editThe execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library.
A single JSON object is provided as an input accessible through a state
variable. state
contains a response
map field and may contain arbitrary other fields configured via the input’s state
configuration. If the CEL program saves cursor states between executions of the program, the configured state.cursor
value will be replaced by the saved cursor prior to execution.
On start the state
will be something like this:
{ "response": { ... }, "cursor": { ... }, ... }
The streaming
input websocket handler creates a response
field in the state map and attaches the websocket message to this field. All CEL
programs written should act on this response
field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.
If the cursor is present the program should process or filter out responses based on its value. If cursor is not present all responses should be processed as per the program’s logic.
After completion of a program’s execution it should return a single object with a structure looking like this:
The |
|
If |
Example configurations:
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: streaming url: ws://localhost:443/v1/stream program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
filebeat.inputs: # Read and process events from the Crowdstrike Falcon Hose API - type: streaming stream_type: crowdstrike url: https://api.crowdstrike.com/sensors/entities/datafeed/v2 auth: client_id: a23fcea2643868ef1a41565a1a8a1c7c client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK token_url: https://api.crowdstrike.com/oauth2/token crowdstrike_app_id: my_app_id program: | state.response.decode_json().as(body,{ "events": [body], ?"cursor": has(body.?metadata.offset) ? optional.of({"offset": body.metadata.offset}) : optional.none(), })
Debug state logging
editThe Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation.
This will include any sensitive or secret information kept in the state
object, and so DEBUG level logging should not be used in production when sensitive information is retained in the state
object. See redact
configuration parameters for settings to exclude sensitive fields from DEBUG logs.
Authentication
editThe websocket streaming input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the state
object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the Authorization
header and prepend the token with Basic
or Bearer
respectively.
Example configurations with authentication:
filebeat.inputs: - type: streaming auth.basic_token: "dXNlcjpwYXNzd29yZA==" url: wss://localhost:443/_stream
filebeat.inputs: - type: streaming auth.bearer_token: "dXNlcjpwYXNzd29yZA==" url: wss://localhost:443/_stream
filebeat.inputs: - type: streaming auth.custom: header: "x-api-key" value: "dXNlcjpwYXNzd29yZA==" url: wss://localhost:443/_stream
filebeat.inputs: - type: streaming auth.custom: header: "Auth" value: "Bearer dXNlcjpwYXNzd29yZA==" url: wss://localhost:443/_stream
The crowdstrike streaming input requires OAuth2.0 authentication using a client ID, client secret and a token URL. These values are not exposed to the state
object. OAuth2.0 scopes and endpoint parameters are available via the auth.scopes
and auth.endpoint_params
config parameters.
filebeat.inputs: - type: streaming stream_type: crowdstrike auth: client_id: a23fcea2643868ef1a41565a1a8a1c7c client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK token_url: https://api.crowdstrike.com/oauth2/token
Input state
editThe streaming
input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects.
The state must contain a response
map and may contain any object the user wishes to store in it. All objects are stored at runtime, except cursor
, which has values that are persisted between restarts.
Configuration options
editThe streaming
input supports the following configuration options plus the
Common options described later.
stream_type
editThe flavor of streaming to use. This may be either "websocket", "crowdstrike", or unset. If the field is unset, websocket streaming is used.
program
editThe CEL program that is executed on each message received. This field should ideally be present but if not the default program given below is used.
program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
url_program
editIf present, this CEL program is executed before the streaming connection is established using the state
object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the streaming connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.
url: ws://testapi:443/v1/streamresults state: initial_start_time: "2022-01-01T00:00:00Z" url_program: | state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time) program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), }, "cursor": { "since": inner_body.timestamp } })
state
editstate
is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the state
variable. Except for the state.cursor
field, state
does not persist over restarts.
state.cursor
editThe cursor is an object available as state.cursor
where arbitrary values may be stored. Cursor state is kept between input restarts and updated after each event of a request has been published. When a cursor is used the CEL program must either create a cursor state for each event that is returned by the program, or a single cursor that reflects the cursor for completion of the full set of events.
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: streaming url: ws://localhost:443/v1/stream program: | bytes(state.response).as(body, { "events": [body.decode_json().with({ "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ? state.cursor.last_requested_at : now })], "cursor": {"last_requested_at": now} })
regexp
editA set of named regular expressions that may be used during a CEL program’s execution using the regexp
extension library. The syntax used for the regular expressions is RE2.
filebeat.inputs: - type: streaming # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution. regexp: products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)' solutions: '(?i)(Search|Observability|Security)'
redact
editDuring debug level logging, the state
object and the resulting evaluation result are included in logs. This may result in leaking of secrets. In order to prevent this, fields may be redacted or deleted from the logged state
. The redact
configuration allows users to configure this field redaction behaviour. For safety reasons if the redact
configuration is missing a warning is logged.
In the case of no-required redaction an empty redact.fields
configuration should be used to silence the logged warning.
- type: streaming redact: fields: ~
As an example, if a user-constructed Basic Authentication request is used in a CEL program the password can be redacted like so
filebeat.inputs: - type: streaming url: ws://localhost:443/_stream state: user: user@domain.tld password: P@$$W0₹D redact: fields: - password delete: true
Note that fields under the auth
configuration hierarchy are not exposed to the state
and so do not need to be redacted. For this reason it is preferable to use these for authentication over the request construction shown above where possible.
redact.fields
editThis specifies fields in the state
to be redacted prior to debug logging. Fields listed in this array will be either replaced with a *
or deleted entirely from messages sent to debug logs.
redact.delete
editThis specifies whether fields should be replaced with a *
or deleted entirely from messages sent to debug logs. If delete is true
, fields will be deleted rather than replaced.
retry
editThe retry
configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is nil
which means no retries will be attempted. It has a wait_min
and wait_max
configuration which specifies the minimum and maximum time to wait between retries.
filebeat.inputs: - type: streaming url: ws://localhost:443/_stream program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } }) retry: max_attempts: 5 wait_min: 1s wait_max: 10s
retry.max_attempts
editThe maximum number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is nil
which means no retries will be attempted.
retry.wait_min
editThe minimum time to wait between retries. This ensures that retries are spaced out enough to give the system time to recover or resolve transient issues, rather than bombarding the system with rapid retries. For example, wait_min
might be set to 1 second, meaning that even if the calculated backoff is less than this, the client will wait at least 1 second before retrying.
retry.wait_max
editThe maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, wait_max
might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying.
Metrics
editThis input exposes metrics under the HTTP monitoring endpoint.
These metrics are exposed under the /inputs
path. They can be used to
observe the activity of the input.
Metric | Description |
---|---|
|
URL of the input resource. |
|
Number of errors encountered during cel program evaluation. |
|
Number of errors encountered over the life cycle of the input. |
|
Number of event arrays received. |
|
Number of event arrays published. |
|
Number of bytes received over the life cycle of the input. |
|
Number of events received. |
|
Number of events published. |
|
Histogram of the elapsed successful CEL program processing times in nanoseconds. |
|
Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). |
Developer tools
editA stand-alone CEL environment that implements the majority of the streaming input’s Comment Expression Language functionality is available in the Elastic Mito repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running go install github.com/elastic/mito/cmd/mito@latest
and requires a Go toolchain.
Common options
editThe following configuration options are supported by all inputs.
enabled
editUse the enabled
option to enable and disable inputs. By default, enabled is
set to true.
tags
editA list of tags that Filebeat includes in the tags
field of each published
event. Tags make it easy to select specific events in Kibana or apply
conditional filtering in Logstash. These tags will be appended to the list of
tags specified in the general configuration.
Example:
filebeat.inputs: - type: streaming . . . tags: ["json"]
fields
editOptional fields that you can specify to add additional information to the
output. For example, you might add fields that you can use for filtering log
data. Fields can be scalar values, arrays, dictionaries, or any nested
combination of these. By default, the fields that you specify here will be
grouped under a fields
sub-dictionary in the output document. To store the
custom fields as top-level fields, set the fields_under_root
option to true.
If a duplicate field is declared in the general configuration, then its value
will be overwritten by the value declared here.
filebeat.inputs: - type: streaming . . . fields: app_id: query_engine_12
fields_under_root
editIf this option is set to true, the custom
fields are stored as top-level fields in
the output document instead of being grouped under a fields
sub-dictionary. If
the custom field names conflict with other field names added by Filebeat,
then the custom fields overwrite the other fields.
processors
editA list of processors to apply to the input data.
See Processors for information about specifying processors in your config.
pipeline
editThe ingest pipeline ID to set for the events generated by this input.
The pipeline ID can also be configured in the Elasticsearch output, but this option usually results in simpler configuration files. If the pipeline is configured both in the input and output, the option from the input is used.
keep_null
editIf this option is set to true, fields with null
values will be published in
the output document. By default, keep_null
is set to false
.
index
editIf present, this formatted string overrides the index for events from this input
(for elasticsearch outputs), or sets the raw_index
field of the event’s
metadata (for other outputs). This string can only refer to the agent name and
version and the event timestamp; for access to dynamic fields, use
output.elasticsearch.index
or a processor.
Example value: "%{[agent.name]}-myindex-%{+yyyy.MM.dd}"
might
expand to "filebeat-myindex-2019.11.01"
.
publisher_pipeline.disable_host
editBy default, all events contain host.name
. This option can be set to true
to
disable the addition of this field to all events. The default value is false
.
The streaming
input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the Github repository.