Websocket Input

edit

This functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.

The websocket input reads messages from a websocket server or api endpoint. This input uses the CEL engine and the mito library interally to parse and process the messages. Having support for CEL allows you to parse and process the messages in a more flexible way. It has many similarities with the cel input as to how the CEL programs are written but deviates in the way the messages are read and processed. The websocket input is a streaming input and can only be used to read messages from a websocket server or api endpoint.

This input supports:

  • Auth

    • Basic
    • Bearer
    • Custom

The websocket input as of now does not support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.

Execution

edit

The execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library. A single JSON object is provided as an input accessible through a state variable. state contains a response map field and may contain arbitrary other fields configured via the input’s state configuration. If the CEL program saves cursor states between executions of the program, the configured state.cursor value will be replaced by the saved cursor prior to execution.

On start the state will be something like this:

{
    "response": { ... },
    "cursor": { ... },
    ...
}

The websocket input creates a response field in the state map and attaches the websocket message to this field. All CEL programs written should act on this response field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.

If the cursor is present the program should process or filter out responses based on its value. If cursor is not present all responses should be processed as per the program’s logic.

After completion of a program’s execution it should return a single object with a structure looking like this:

{
    "events": [ 
        {...},
        ...
    ],
    "cursor": [ 
        {...},
        ...
    ]
}

The events field must be present, but may be empty or null. If it is not empty, it must only have objects as elements. The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the websocket server or api endpoint. The events field is the array of events to be published to the output. Each event must be a JSON object.

If cursor is present it must be either be a single object or an array with the same length as events; each element i of the cursor will be the details for obtaining the events at and beyond event i in the events array. If the cursor is a single object, it will be the details for obtaining events after the last event in the events array and will only be retained on successful publication of all the events in the events array.

Example configuration:

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
  url: ws://localhost:443/v1/stream
  program: |
    bytes(state.response).decode_json().as(inner_body,{
      "events": {
        "message":  inner_body.encode_json(),
      }
    })

Debug state logging

edit

The Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation. This will include any sensitive or secret information kept in the state object, and so DEBUG level logging should not be used in production when sensitive information is retained in the state object. See redact configuration parameters for settings to exclude sensitive fields from DEBUG logs.

Authentication

edit

The Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the state object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the Authorization header and prepend the token with Basic or Bearer respectively.

Example configurations with authentication:

filebeat.inputs:
- type: websocket
  auth.basic_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://localhost:443/_stream
filebeat.inputs:
- type: websocket
  auth.bearer_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://localhost:443/_stream
filebeat.inputs:
- type: websocket
  auth.custom:
    header: "x-api-key"
    value: "dXNlcjpwYXNzd29yZA=="
  url: wss://localhost:443/_stream
filebeat.inputs:
- type: websocket
  auth.custom:
    header: "Auth"
    value: "Bearer dXNlcjpwYXNzd29yZA=="
  url: wss://localhost:443/_stream

Input state

edit

The websocket input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects. The state must contain a response map and may contain any object the user wishes to store in it. All objects are stored at runtime, except cursor, which has values that are persisted between restarts.

Configuration options

edit

The websocket input supports the following configuration options plus the Common options described later.

program

edit

The CEL program that is executed on each message received. This field should ideally be present but if not the default program given below is used.

program: |
  bytes(state.response).decode_json().as(inner_body,{
    "events": {
      "message":  inner_body.encode_json(),
    }
  })

url_program

edit

If present, this CEL program is executed before the websocket connection is established using the state object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the websocket connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.

url: ws://testapi:443/v1/streamresults
state:
  initial_start_time: "2022-01-01T00:00:00Z"
url_program: |
  state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time)
program: |
  bytes(state.response).decode_json().as(inner_body,{
    "events": {
      "message":  inner_body.encode_json(),
    },
    "cursor": {
      "since": inner_body.timestamp
    }
  })

state

edit

state is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the state variable. Except for the state.cursor field, state does not persist over restarts.

state.cursor

edit

The cursor is an object available as state.cursor where arbitrary values may be stored. Cursor state is kept between input restarts and updated after each event of a request has been published. When a cursor is used the CEL program must either create a cursor state for each event that is returned by the program, or a single cursor that reflects the cursor for completion of the full set of events.

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
  url: ws://localhost:443/v1/stream
  program: |
    bytes(state.response).as(body, {
      "events": [body.decode_json().with({
        "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ?
          state.cursor.last_requested_at
        :
          now
      })],
      "cursor": {"last_requested_at": now}
    })

regexp

edit

A set of named regular expressions that may be used during a CEL program’s execution using the regexp extension library. The syntax used for the regular expressions is RE2.

filebeat.inputs:
- type: websocket
  # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution.
  regexp:
    products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
    solutions: '(?i)(Search|Observability|Security)'

redact

edit

During debug level logging, the state object and the resulting evaluation result are included in logs. This may result in leaking of secrets. In order to prevent this, fields may be redacted or deleted from the logged state. The redact configuration allows users to configure this field redaction behaviour. For safety reasons if the redact configuration is missing a warning is logged.

In the case of no-required redaction an empty redact.fields configuration should be used to silence the logged warning.

- type: websocket
  redact:
    fields: ~

As an example, if a user-constructed Basic Authentication request is used in a CEL program the password can be redacted like so

filebeat.inputs:
- type: websocket
  url: ws://localhost:443/_stream
  state:
    user: user@domain.tld
    password: P@$$W0₹D
  redact:
    fields:
      - password
    delete: true

Note that fields under the auth configuration hierarchy are not exposed to the state and so do not need to be redacted. For this reason it is preferable to use these for authentication over the request construction shown above where possible.

redact.fields

edit

This specifies fields in the state to be redacted prior to debug logging. Fields listed in this array will be either replaced with a * or deleted entirely from messages sent to debug logs.

redact.delete

edit

This specifies whether fields should be replaced with a * or deleted entirely from messages sent to debug logs. If delete is true, fields will be deleted rather than replaced.

Metrics

edit

This input exposes metrics under the HTTP monitoring endpoint. These metrics are exposed under the /inputs path. They can be used to observe the activity of the input.

Metric Description

url

URL of the input resource.

cel_eval_errors

Number of errors encountered during cel program evaluation.

errors_total

Number of errors encountered over the life cycle of the input.

batches_received_total

Number of event arrays received.

batches_published_total

Number of event arrays published.

received_bytes_total

Number of bytes received over the life cycle of the input.

events_received_total

Number of events received.

events_published_total

Number of events published.

cel_processing_time

Histogram of the elapsed successful CEL program processing times in nanoseconds.

batch_processing_time

Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).

Developer tools

edit

A stand-alone CEL environment that implements the majority of the websocket input’s Comment Expression Language functionality is available in the Elastic Mito repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running go install github.com/elastic/mito/cmd/mito@latest and requires a Go toolchain.

Common options

edit

The following configuration options are supported by all inputs.

enabled
edit

Use the enabled option to enable and disable inputs. By default, enabled is set to true.

tags
edit

A list of tags that Filebeat includes in the tags field of each published event. Tags make it easy to select specific events in Kibana or apply conditional filtering in Logstash. These tags will be appended to the list of tags specified in the general configuration.

Example:

filebeat.inputs:
- type: websocket
  . . .
  tags: ["json"]
fields
edit

Optional fields that you can specify to add additional information to the output. For example, you might add fields that you can use for filtering log data. Fields can be scalar values, arrays, dictionaries, or any nested combination of these. By default, the fields that you specify here will be grouped under a fields sub-dictionary in the output document. To store the custom fields as top-level fields, set the fields_under_root option to true. If a duplicate field is declared in the general configuration, then its value will be overwritten by the value declared here.

filebeat.inputs:
- type: websocket
  . . .
  fields:
    app_id: query_engine_12
fields_under_root
edit

If this option is set to true, the custom fields are stored as top-level fields in the output document instead of being grouped under a fields sub-dictionary. If the custom field names conflict with other field names added by Filebeat, then the custom fields overwrite the other fields.

processors
edit

A list of processors to apply to the input data.

See Processors for information about specifying processors in your config.

pipeline
edit

The ingest pipeline ID to set for the events generated by this input.

The pipeline ID can also be configured in the Elasticsearch output, but this option usually results in simpler configuration files. If the pipeline is configured both in the input and output, the option from the input is used.

keep_null
edit

If this option is set to true, fields with null values will be published in the output document. By default, keep_null is set to false.

index
edit

If present, this formatted string overrides the index for events from this input (for elasticsearch outputs), or sets the raw_index field of the event’s metadata (for other outputs). This string can only refer to the agent name and version and the event timestamp; for access to dynamic fields, use output.elasticsearch.index or a processor.

Example value: "%{[agent.name]}-myindex-%{+yyyy.MM.dd}" might expand to "filebeat-myindex-2019.11.01".

publisher_pipeline.disable_host
edit

By default, all events contain host.name. This option can be set to true to disable the addition of this field to all events. The default value is false.

The websocket input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the Github repository.