Streaming Input

edit

This functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.

The streaming input reads messages from a streaming data source, for example a websocket server. This input uses the CEL engine and the mito library internally to parse and process the messages. Having support for CEL allows you to parse and process the messages in a more flexible way. It has many similarities with the cel input as to how the CEL programs are written but differs in the way the messages are read and processed. Currently websocket server or API endpoints, and the Crowdstrike Falcon streaming API are supported.

The websocket streaming input supports:

  • Auth

    • Basic
    • Bearer
    • Custom

The streaming input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.

The Crowdstrike streaming input requires OAuth2.0 as described in the Crowdstrike documentation for the API. When using the Crowdstrike streaming type, the crowdstrike_app_id configuration field must be set. This field specifies the appId parameter sent to the Crowdstrike API. See the Crowdstrike documentation for details.

The stream_type configuration field specifies which type of streaming input to use, "websocket" or "crowdstrike". If it is not set, the input defaults to websocket streaming .

Execution

edit

The execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library. A single JSON object is provided as an input accessible through a state variable. state contains a response map field and may contain arbitrary other fields configured via the input’s state configuration. If the CEL program saves cursor states between executions of the program, the configured state.cursor value will be replaced by the saved cursor prior to execution.

On start the state will be something like this:

{
    "response": { ... },
    "cursor": { ... },
    ...
}

The streaming input websocket handler creates a response field in the state map and attaches the websocket message to this field. All CEL programs written should act on this response field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.

If the cursor is present the program should process or filter out responses based on its value. If cursor is not present all responses should be processed as per the program’s logic.

After completion of a program’s execution it should return a single object with a structure looking like this:

{
    "events": [ 
        {...},
        ...
    ],
    "cursor": [ 
        {...},
        ...
    ]
}

The events field must be present, but may be empty or null. If it is not empty, it must only have objects as elements. The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the streaming data source. The events field is the array of events to be published to the output. Each event must be a JSON object.

If cursor is present it must be either be a single object or an array with the same length as events; each element i of the cursor will be the details for obtaining the events at and beyond event i in the events array. If the cursor is a single object, it will be the details for obtaining events after the last event in the events array and will only be retained on successful publication of all the events in the events array.

Example configurations:

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: streaming
  url: ws://localhost:443/v1/stream
  program: |
    bytes(state.response).decode_json().as(inner_body,{
      "events": {
        "message":  inner_body.encode_json(),
      }
    })
filebeat.inputs:
# Read and process events from the Crowdstrike Falcon Hose API
- type: streaming
  stream_type: crowdstrike
  url: https://api.crowdstrike.com/sensors/entities/datafeed/v2
  auth:
    client_id: a23fcea2643868ef1a41565a1a8a1c7c
    client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
    token_url: https://api.crowdstrike.com/oauth2/token
  crowdstrike_app_id: my_app_id
  program: |
    state.response.decode_json().as(body,{
      "events": [body],
      ?"cursor": has(body.?metadata.offset) ?
        optional.of({"offset": body.metadata.offset})
      :
        optional.none(),
    })

Debug state logging

edit

The Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation. This will include any sensitive or secret information kept in the state object, and so DEBUG level logging should not be used in production when sensitive information is retained in the state object. See redact configuration parameters for settings to exclude sensitive fields from DEBUG logs.

Authentication

edit

The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the state object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the Authorization header and prepend the token with Basic or Bearer respectively.

Example configurations with authentication:

filebeat.inputs:
- type: streaming
  auth.basic_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://localhost:443/_stream
filebeat.inputs:
- type: streaming
  auth.bearer_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://localhost:443/_stream
filebeat.inputs:
- type: streaming
  auth.custom:
    header: "x-api-key"
    value: "dXNlcjpwYXNzd29yZA=="
  url: wss://localhost:443/_stream
filebeat.inputs:
- type: streaming
  auth.custom:
    header: "Auth"
    value: "Bearer dXNlcjpwYXNzd29yZA=="
  url: wss://localhost:443/_stream

The crowdstrike streaming input requires OAuth2.0 authentication using a client ID, client secret and a token URL. These values are not exposed to the state object. OAuth2.0 scopes and endpoint parameters are available via the auth.scopes and auth.endpoint_params config parameters.

filebeat.inputs:
- type: streaming
  stream_type: crowdstrike
  auth:
    client_id: a23fcea2643868ef1a41565a1a8a1c7c
    client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
    token_url: https://api.crowdstrike.com/oauth2/token

Input state

edit

The streaming input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects. The state must contain a response map and may contain any object the user wishes to store in it. All objects are stored at runtime, except cursor, which has values that are persisted between restarts.

Configuration options

edit

The streaming input supports the following configuration options plus the Common options described later.

stream_type

edit

The flavor of streaming to use. This may be either "websocket", "crowdstrike", or unset. If the field is unset, websocket streaming is used.

program

edit

The CEL program that is executed on each message received. This field should ideally be present but if not the default program given below is used.

program: |
  bytes(state.response).decode_json().as(inner_body,{
    "events": {
      "message":  inner_body.encode_json(),
    }
  })

url_program

edit

If present, this CEL program is executed before the streaming connection is established using the state object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the streaming connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.

url: ws://testapi:443/v1/streamresults
state:
  initial_start_time: "2022-01-01T00:00:00Z"
url_program: |
  state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time)
program: |
  bytes(state.response).decode_json().as(inner_body,{
    "events": {
      "message":  inner_body.encode_json(),
    },
    "cursor": {
      "since": inner_body.timestamp
    }
  })

state

edit

state is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the state variable. Except for the state.cursor field, state does not persist over restarts.

state.cursor

edit

The cursor is an object available as state.cursor where arbitrary values may be stored. Cursor state is kept between input restarts and updated after each event of a request has been published. When a cursor is used the CEL program must either create a cursor state for each event that is returned by the program, or a single cursor that reflects the cursor for completion of the full set of events.

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: streaming
  url: ws://localhost:443/v1/stream
  program: |
    bytes(state.response).as(body, {
      "events": [body.decode_json().with({
        "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ?
          state.cursor.last_requested_at
        :
          now
      })],
      "cursor": {"last_requested_at": now}
    })

regexp

edit

A set of named regular expressions that may be used during a CEL program’s execution using the regexp extension library. The syntax used for the regular expressions is RE2.

filebeat.inputs:
- type: streaming
  # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution.
  regexp:
    products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
    solutions: '(?i)(Search|Observability|Security)'

redact

edit

During debug level logging, the state object and the resulting evaluation result are included in logs. This may result in leaking of secrets. In order to prevent this, fields may be redacted or deleted from the logged state. The redact configuration allows users to configure this field redaction behaviour. For safety reasons if the redact configuration is missing a warning is logged.

In the case of no-required redaction an empty redact.fields configuration should be used to silence the logged warning.

- type: streaming
  redact:
    fields: ~

As an example, if a user-constructed Basic Authentication request is used in a CEL program the password can be redacted like so

filebeat.inputs:
- type: streaming
  url: ws://localhost:443/_stream
  state:
    user: user@domain.tld
    password: P@$$W0₹D
  redact:
    fields:
      - password
    delete: true

Note that fields under the auth configuration hierarchy are not exposed to the state and so do not need to be redacted. For this reason it is preferable to use these for authentication over the request construction shown above where possible.

redact.fields

edit

This specifies fields in the state to be redacted prior to debug logging. Fields listed in this array will be either replaced with a * or deleted entirely from messages sent to debug logs.

redact.delete

edit

This specifies whether fields should be replaced with a * or deleted entirely from messages sent to debug logs. If delete is true, fields will be deleted rather than replaced.

retry

edit

The retry configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is nil which means no retries will be attempted. It has a wait_min and wait_max configuration which specifies the minimum and maximum time to wait between retries.

filebeat.inputs:
- type: streaming
  url: ws://localhost:443/_stream
  program: |
    bytes(state.response).decode_json().as(inner_body,{
      "events": {
        "message":  inner_body.encode_json(),
      }
    })
  retry:
    max_attempts: 5
    wait_min: 1s
    wait_max: 10s

retry.max_attempts

edit

The maximum number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is nil which means no retries will be attempted.

retry.wait_min

edit

The minimum time to wait between retries. This ensures that retries are spaced out enough to give the system time to recover or resolve transient issues, rather than bombarding the system with rapid retries. For example, wait_min might be set to 1 second, meaning that even if the calculated backoff is less than this, the client will wait at least 1 second before retrying.

retry.wait_max

edit

The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, wait_max might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying.

Metrics

edit

This input exposes metrics under the HTTP monitoring endpoint. These metrics are exposed under the /inputs path. They can be used to observe the activity of the input.

Metric Description

url

URL of the input resource.

cel_eval_errors

Number of errors encountered during cel program evaluation.

errors_total

Number of errors encountered over the life cycle of the input.

batches_received_total

Number of event arrays received.

batches_published_total

Number of event arrays published.

received_bytes_total

Number of bytes received over the life cycle of the input.

events_received_total

Number of events received.

events_published_total

Number of events published.

cel_processing_time

Histogram of the elapsed successful CEL program processing times in nanoseconds.

batch_processing_time

Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).

Developer tools

edit

A stand-alone CEL environment that implements the majority of the streaming input’s Comment Expression Language functionality is available in the Elastic Mito repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running go install github.com/elastic/mito/cmd/mito@latest and requires a Go toolchain.

Common options

edit

The following configuration options are supported by all inputs.

enabled
edit

Use the enabled option to enable and disable inputs. By default, enabled is set to true.

tags
edit

A list of tags that Filebeat includes in the tags field of each published event. Tags make it easy to select specific events in Kibana or apply conditional filtering in Logstash. These tags will be appended to the list of tags specified in the general configuration.

Example:

filebeat.inputs:
- type: streaming
  . . .
  tags: ["json"]
fields
edit

Optional fields that you can specify to add additional information to the output. For example, you might add fields that you can use for filtering log data. Fields can be scalar values, arrays, dictionaries, or any nested combination of these. By default, the fields that you specify here will be grouped under a fields sub-dictionary in the output document. To store the custom fields as top-level fields, set the fields_under_root option to true. If a duplicate field is declared in the general configuration, then its value will be overwritten by the value declared here.

filebeat.inputs:
- type: streaming
  . . .
  fields:
    app_id: query_engine_12
fields_under_root
edit

If this option is set to true, the custom fields are stored as top-level fields in the output document instead of being grouped under a fields sub-dictionary. If the custom field names conflict with other field names added by Filebeat, then the custom fields overwrite the other fields.

processors
edit

A list of processors to apply to the input data.

See Processors for information about specifying processors in your config.

pipeline
edit

The ingest pipeline ID to set for the events generated by this input.

The pipeline ID can also be configured in the Elasticsearch output, but this option usually results in simpler configuration files. If the pipeline is configured both in the input and output, the option from the input is used.

keep_null
edit

If this option is set to true, fields with null values will be published in the output document. By default, keep_null is set to false.

index
edit

If present, this formatted string overrides the index for events from this input (for elasticsearch outputs), or sets the raw_index field of the event’s metadata (for other outputs). This string can only refer to the agent name and version and the event timestamp; for access to dynamic fields, use output.elasticsearch.index or a processor.

Example value: "%{[agent.name]}-myindex-%{+yyyy.MM.dd}" might expand to "filebeat-myindex-2019.11.01".

publisher_pipeline.disable_host
edit

By default, all events contain host.name. This option can be set to true to disable the addition of this field to all events. The default value is false.

The streaming input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the Github repository.