Websocket Input
editWebsocket Input
editThis functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.
The websocket
input reads messages from a websocket server or api endpoint. This input uses the CEL engine
and the mito
library interally to parse and process the messages. Having support for CEL
allows you to parse and process the messages in a more flexible way. It has many similarities with the cel
input as to how the CEL
programs are written but deviates in the way the messages are read and processed. The websocket
input is a streaming
input and can only be used to read messages from a websocket server or api endpoint.
This input supports:
-
Auth
- Basic
- Bearer
- Custom
The websocket
input as of now does not support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.
Execution
editThe execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library.
A single JSON object is provided as an input accessible through a state
variable. state
contains a response
map field and may contain arbitrary other fields configured via the input’s state
configuration. If the CEL program saves cursor states between executions of the program, the configured state.cursor
value will be replaced by the saved cursor prior to execution.
On start the state
will be something like this:
{ "response": { ... }, "cursor": { ... }, ... }
The websocket
input creates a response
field in the state map and attaches the websocket message to this field. All CEL
programs written should act on this response
field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.
If the cursor is present the program should process or filter out responses based on its value. If cursor is not present all responses should be processed as per the program’s logic.
After completion of a program’s execution it should return a single object with a structure looking like this:
The |
|
If |
Example configuration:
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: websocket url: ws://localhost:443/v1/stream program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
Debug state logging
editThe Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation.
This will include any sensitive or secret information kept in the state
object, and so DEBUG level logging should not be used in production when sensitive information is retained in the state
object. See redact
configuration parameters for settings to exclude sensitive fields from DEBUG logs.
Authentication
editThe Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the state
object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the Authorization
header and prepend the token with Basic
or Bearer
respectively.
Example configurations with authentication:
filebeat.inputs: - type: websocket auth.basic_token: "dXNlcjpwYXNzd29yZA==" url: wss://localhost:443/_stream
filebeat.inputs: - type: websocket auth.bearer_token: "dXNlcjpwYXNzd29yZA==" url: wss://localhost:443/_stream
filebeat.inputs: - type: websocket auth.custom: header: "x-api-key" value: "dXNlcjpwYXNzd29yZA==" url: wss://localhost:443/_stream
filebeat.inputs: - type: websocket auth.custom: header: "Auth" value: "Bearer dXNlcjpwYXNzd29yZA==" url: wss://localhost:443/_stream
Input state
editThe websocket
input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects.
The state must contain a response
map and may contain any object the user wishes to store in it. All objects are stored at runtime, except cursor
, which has values that are persisted between restarts.
Configuration options
editThe websocket
input supports the following configuration options plus the
Common options described later.
program
editThe CEL program that is executed on each message received. This field should ideally be present but if not the default program given below is used.
program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
url_program
editIf present, this CEL program is executed before the websocket connection is established using the state
object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the websocket connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.
url: ws://testapi:443/v1/streamresults state: initial_start_time: "2022-01-01T00:00:00Z" url_program: | state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time) program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), }, "cursor": { "since": inner_body.timestamp } })
state
editstate
is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the state
variable. Except for the state.cursor
field, state
does not persist over restarts.
state.cursor
editThe cursor is an object available as state.cursor
where arbitrary values may be stored. Cursor state is kept between input restarts and updated after each event of a request has been published. When a cursor is used the CEL program must either create a cursor state for each event that is returned by the program, or a single cursor that reflects the cursor for completion of the full set of events.
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: websocket url: ws://localhost:443/v1/stream program: | bytes(state.response).as(body, { "events": [body.decode_json().with({ "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ? state.cursor.last_requested_at : now })], "cursor": {"last_requested_at": now} })
regexp
editA set of named regular expressions that may be used during a CEL program’s execution using the regexp
extension library. The syntax used for the regular expressions is RE2.
filebeat.inputs: - type: websocket # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution. regexp: products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)' solutions: '(?i)(Search|Observability|Security)'
redact
editDuring debug level logging, the state
object and the resulting evaluation result are included in logs. This may result in leaking of secrets. In order to prevent this, fields may be redacted or deleted from the logged state
. The redact
configuration allows users to configure this field redaction behaviour. For safety reasons if the redact
configuration is missing a warning is logged.
In the case of no-required redaction an empty redact.fields
configuration should be used to silence the logged warning.
- type: websocket redact: fields: ~
As an example, if a user-constructed Basic Authentication request is used in a CEL program the password can be redacted like so
filebeat.inputs: - type: websocket url: ws://localhost:443/_stream state: user: user@domain.tld password: P@$$W0₹D redact: fields: - password delete: true
Note that fields under the auth
configuration hierarchy are not exposed to the state
and so do not need to be redacted. For this reason it is preferable to use these for authentication over the request construction shown above where possible.
redact.fields
editThis specifies fields in the state
to be redacted prior to debug logging. Fields listed in this array will be either replaced with a *
or deleted entirely from messages sent to debug logs.
redact.delete
editThis specifies whether fields should be replaced with a *
or deleted entirely from messages sent to debug logs. If delete is true
, fields will be deleted rather than replaced.
Metrics
editThis input exposes metrics under the HTTP monitoring endpoint.
These metrics are exposed under the /inputs
path. They can be used to
observe the activity of the input.
Metric | Description |
---|---|
|
URL of the input resource. |
|
Number of errors encountered during cel program evaluation. |
|
Number of errors encountered over the life cycle of the input. |
|
Number of event arrays received. |
|
Number of event arrays published. |
|
Number of bytes received over the life cycle of the input. |
|
Number of events received. |
|
Number of events published. |
|
Histogram of the elapsed successful CEL program processing times in nanoseconds. |
|
Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). |
Developer tools
editA stand-alone CEL environment that implements the majority of the websocket input’s Comment Expression Language functionality is available in the Elastic Mito repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running go install github.com/elastic/mito/cmd/mito@latest
and requires a Go toolchain.
Common options
editThe following configuration options are supported by all inputs.
enabled
editUse the enabled
option to enable and disable inputs. By default, enabled is
set to true.
tags
editA list of tags that Filebeat includes in the tags
field of each published
event. Tags make it easy to select specific events in Kibana or apply
conditional filtering in Logstash. These tags will be appended to the list of
tags specified in the general configuration.
Example:
filebeat.inputs: - type: websocket . . . tags: ["json"]
fields
editOptional fields that you can specify to add additional information to the
output. For example, you might add fields that you can use for filtering log
data. Fields can be scalar values, arrays, dictionaries, or any nested
combination of these. By default, the fields that you specify here will be
grouped under a fields
sub-dictionary in the output document. To store the
custom fields as top-level fields, set the fields_under_root
option to true.
If a duplicate field is declared in the general configuration, then its value
will be overwritten by the value declared here.
filebeat.inputs: - type: websocket . . . fields: app_id: query_engine_12
fields_under_root
editIf this option is set to true, the custom
fields are stored as top-level fields in
the output document instead of being grouped under a fields
sub-dictionary. If
the custom field names conflict with other field names added by Filebeat,
then the custom fields overwrite the other fields.
processors
editA list of processors to apply to the input data.
See Processors for information about specifying processors in your config.
pipeline
editThe ingest pipeline ID to set for the events generated by this input.
The pipeline ID can also be configured in the Elasticsearch output, but this option usually results in simpler configuration files. If the pipeline is configured both in the input and output, the option from the input is used.
keep_null
editIf this option is set to true, fields with null
values will be published in
the output document. By default, keep_null
is set to false
.
index
editIf present, this formatted string overrides the index for events from this input
(for elasticsearch outputs), or sets the raw_index
field of the event’s
metadata (for other outputs). This string can only refer to the agent name and
version and the event timestamp; for access to dynamic fields, use
output.elasticsearch.index
or a processor.
Example value: "%{[agent.name]}-myindex-%{+yyyy.MM.dd}"
might
expand to "filebeat-myindex-2019.11.01"
.
publisher_pipeline.disable_host
editBy default, all events contain host.name
. This option can be set to true
to
disable the addition of this field to all events. The default value is false
.
The websocket
input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the Github repository.