Mirko BezTaha Derouiche

Convert Logstash pipelines to OpenTelemetry Collector Pipelines

This guide helps Logstash users transition to OpenTelemetry by demonstrating how to convert common Logstash pipelines into equivalent OpenTelemetry Collector configurations. We will focus on the log signal.

Convert Logstash pipelines to OpenTelemetry Collector Pipelines

Convert Logstash pipelines to OpenTelemetry Collector Pipelines

Introduction

Elastic observability strategy is increasingly aligned with OpenTelemetry. With the recent launch of Elastic Distributions of OpenTelemetry we’re expanding our offering to make it easier to use OpenTelemetry, the Elastic Agent now offers an "otel" mode, enabling it to run a custom distribution of the OpenTelemetry Collector, seamlessly enhancing your observability onboarding and experience with Elastic.

This post is designed to assist users familiar with Logstash transitioning to OpenTelemetry by demonstrating how to convert some standard Logstash pipelines into corresponding OpenTelemetry Collector configurations.

What is OpenTelemetry Collector and why should I care?

OpenTelemetry is an open-source framework that ensures vendor-agnostic data collection, providing a standardized approach for the collection, processing, and ingestion of observability data. Elastic is fully committed to this principle, aiming to make observability truly vendor-agnostic and eliminating the need for users to re-instrument their observability when switching platforms.

By embracing OpenTelemetry, you have access to these benefits:

  • Unified Observability: By using the OpenTelemetry Collector, you can collect and manage logs, metrics, and traces from a single tool, providing holistic observability into your system's performance and behavior. This simplifies monitoring and debugging in complex, distributed environments like microservices.
  • Flexibility and Scalability: Whether you're running a small service or a large distributed system, the OpenTelemetry Collector can be scaled to handle the amount of data generated, offering the flexibility to deploy as an agent (running alongside applications) or as a gateway (a centralized hub).
  • Open Standards: Since OpenTelemetry is an open-source project under the Cloud Native Computing Foundation (CNCF), it ensures that you're working with widely accepted standards, contributing to the long-term sustainability and compatibility of your observability stack.
  • Simplified Telemetry Pipelines: The ability to build pipelines using receivers, processors, and exporters simplifies telemetry management by centralizing data flows and minimizing the need for multiple agents.

In the next sections, we will explain how OTEL Collector and Logstash pipelines are structured, and we will clarify how the steps for each option are used.

OTEL Collector Configuration

An OpenTelemetry Collector Configuration has different sections:

  • Receivers: Collect data from different sources.
  • Processors: Transform the data collected by receivers
  • Exporters: Send data to different collectors
  • Connectors: Link two pipelines together
  • Service: defines which components are active
    • Pipelines: Combine the defined receivers, processors, exporters, and connectors to process the data
    • Extensions are optional components that expand the capabilities of the Collector to accomplish tasks not directly involved with processing telemetry data (e.g., health monitoring)
    • Telemetry where you can set observability for the collector itself (e.g., logging and monitoring)

We can visualize it schematically as follows:

We refer to the official documentation Configuration | OpenTelemetry for an in-depth introduction in the components.

Logstash pipeline definition

A Logstash pipeline is composed of three main components:

  • Input Plugins: Allow us to read data from different sources
  • Filters Plugins: Allow us to transform and filter the data
  • Output Plugins: Allow us to send the data

Logstash also has a special input and a special output that allow the pipeline-to-pipeline communication, we can consider this as a similar concept to an OpenTelemetry connector.

Logstash pipeline compared to Otel Collector components

We can schematize how Logstash Pipeline and OTEL Collector pipeline components can relate to each other as follows:

Enough theory! Let us dive into some examples.

Convert a Logstash Pipeline into OpenTelemetry Collector Pipeline

Example 1: Parse and transform log line

Let's consider the below line:

2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404

We will apply the following steps:

  1. Read the line from the file
    /tmp/demo-line.log
    .
  2. Define the output to be an Elasticsearch datastream
    logs-access-default
    .
  3. Extract the
    @timestamp
    ,
    user.name
    ,
    client.ip
    ,
    client.port
    ,
    url.path
    and
    http.status.code
    .
  4. Drop log messages related to the
    SYSTEM
    user.
  5. Parse the date timestamp with the relevant date format and store it in
    @timestamp
    .
  6. Add a code
    http.status.code_description
    based on known codes' descriptions.
  7. Send data to Elasticsearch.

Logstash pipeline

input {
    file {
        path => "/tmp/demo-line.log" #[1]
        start_position => "beginning"
        add_field => { #[2]
            "[data_stream][type]" => "logs"
            "[data_stream][dataset]" => "access_log"
            "[data_stream][namespace]" => "default"
        }
    }
}

filter {
    grok { #[3]
        match => {
            "message" => "%{TIMESTAMP_ISO8601:[date]}: user %{WORD:[user][name]} accessed from %{IP:[client][ip]}:%{NUMBER:[client][port]:int} path %{URIPATH:[url][path]} with error %{NUMBER:[http][status][code]}"
        }
    }
    if "_grokparsefailure" not in [tags] {
        if [user][name] == "SYSTEM" { #[4]
            drop {}
        }
        date { #[5]
            match => ["[date]", "ISO8601"]
            target => "[@timestamp]"
            timezone => "UTC"
            remove_field => [ "date" ]
        }
        translate { #[6]
            source => "[http][status][code]"
            target => "[http][status][code_description]"
            dictionary => {
                "200" => "OK"
                "403" => "Permission denied"
                "404" => "Not Found"
                "500" => "Server Error"
            }
            fallback => "Unknown error"
        }
    }
}

output {
    elasticsearch { #[7]
        hosts => "elasticsearch-enpoint:443"
        api_key => "${ES_API_KEY}"
    }
}

OpenTelemtry Collector configuration

receivers:
  filelog: #[1]
    start_at: beginning
    include:
      - /tmp/demo-line.log
    include_file_name: false
    include_file_path: true
    storage: file_storage 
    operators:
    # Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
    - type: copy
      from: body
      to: attributes['event.original']
    - type: add #[2]
      field: attributes["data_stream.type"]
      value: "logs"
    - type: add #[2]
      field: attributes["data_stream.dataset"]
      value: "access_log_otel" 
    - type: add #[2]
      field: attributes["data_stream.namespace"]
      value: "default"

extensions:
  file_storage:
    directory: /var/lib/otelcol/file_storage

processors:
  # Adding  host.name (this is done OOTB by Logstash)
  resourcedetection/system:
    detectors: ["system"]
    system:
      hostname_sources: ["os"]
      resource_attributes:
        os.type:
          enabled: false

  transform/grok: #[3]
    log_statements:
      - context: log
        statements:
        - 'merge_maps(attributes, ExtractGrokPatterns(attributes["event.original"], "%{TIMESTAMP_ISO8601:date}: user %{WORD:user.name} accessed from %{IP:client.ip}:%{NUMBER:client.port:int} path %{URIPATH:url.path} with error %{NUMBER:http.status.code}", true), "insert")'

  filter/exclude_system_user:  #[4]
    error_mode: ignore
    logs:
      log_record:
        - attributes["user.name"] == "SYSTEM"

  transform/parse_date: #[5]
    log_statements:
      - context: log
        statements:
          - set(time, Time(attributes["date"], "%Y-%m-%dT%H:%M:%S"))
          - delete_key(attributes, "date")
        conditions:
          - attributes["date"] != nil

  transform/translate_status_code:  #[6]
    log_statements:
      - context: log
        conditions:
        - attributes["http.status.code"] != nil
        statements:
        - set(attributes["http.status.code_description"], "OK")                where attributes["http.status.code"] == "200"
        - set(attributes["http.status.code_description"], "Permission Denied") where attributes["http.status.code"] == "403"
        - set(attributes["http.status.code_description"], "Not Found")         where attributes["http.status.code"] == "404"
        - set(attributes["http.status.code_description"], "Server Error")      where attributes["http.status.code"] == "500"
        - set(attributes["http.status.code_description"], "Unknown Error")     where attributes["http.status.code_description"] == nil

exporters:
  elasticsearch: #[7]
    endpoints: ["elasticsearch-enpoint:443"]
    api_key: ${env:ES_API_KEY}
    tls:
    logs_dynamic_index:
      enabled: true
    mapping:
      mode: ecs

service:
  extensions: [file_storage]
  pipelines:
    logs:
      receivers:
        - filelog
      processors:
        - resourcedetection/system
        - transform/grok
        - filter/exclude_system_user
        - transform/parse_date
        - transform/translate_status_code
      exporters:
        - elasticsearch

These will generate the following document in Elasticsearch

{
    "@timestamp": "2024-09-20T08:33:27.000Z",
    "client": {
        "ip": "89.66.167.22",
        "port": 10592
    },
    "data_stream": {
        "dataset": "access_log",
        "namespace": "default",
        "type": "logs"
    },
    "event": {
        "original": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404"
    },
    "host": {
        "hostname": "my-laptop",
        "name": "my-laptop",
     },
    "http": {
        "status": {
            "code": "404",
            "code_description": "Not Found"
        }
    },
    "log": {
        "file": {
            "path": "/tmp/demo-line.log"
        }
    },
    "message": "2024-09-20T08:33:27: user frank accessed from 89.66.167.22:10592 path /blog with error 404",
    "url": {
        "path": "/blog"
    },
    "user": {
        "name": "frank"
    }
}

Example 2: Parse and transform a NDJSON-formatted log file

Let's consider the below json line:

{"log_level":"INFO","message":"User login successful","service":"auth-service","timestamp":"2024-10-11 12:34:56.123 +0100","user":{"id":"A1230","name":"john_doe"}}

We will apply the following steps:

  1. Read a line from the file
    /tmp/demo.ndjson
    .
  2. Define the output to be an Elasticsearch datastream
    logs-json-default
  3. Parse the JSON and assign relevant keys and values.
  4. Parse the date.
  5. Override the message field.
  6. Rename fields to follow ECS conventions.
  7. Send data to Elasticsearch.

Logstash pipeline

input {
    file {
        path => "/tmp/demo.ndjson" #[1]
        start_position => "beginning"
        add_field => { #[2]
            "[data_stream][type]" => "logs"
            "[data_stream][dataset]" => "json"
            "[data_stream][namespace]" => "default"
        }
    }
}

filter {
  if [message] =~ /^\{.*/ {
    json { #[3] & #[5]
        source => "message"
    }
  }
  date { #[4]
    match => ["[timestamp]", "yyyy-MM-dd HH:mm:ss.SSS Z"]
    remove_field => "[timestamp]"
  }
  mutate {
    rename => { #[6]
      "service" => "[service][name]"
      "log_level" => "[log][level]"
    }
  }
}


output {
    elasticsearch { # [7]
        hosts => "elasticsearch-enpoint:443"
        api_key => "${ES_API_KEY}"
    }
}

OpenTelemtry Collector configuration

receivers:
  filelog/json: # [1]
    include: 
      - /tmp/demo.ndjson
    retry_on_failure:
      enabled: true
    start_at: beginning
    storage: file_storage 
    operators:
     # Copy the raw message into event.original (this is done OOTB by Logstash in ECS mode)
    - type: copy
      from: body
      to: attributes['event.original']
    - type: add #[2]
      field: attributes["data_stream.type"]
      value: "logs"      
    - type: add #[2]
      field: attributes["data_stream.dataset"]
      value: "otel" #[2]
    - type: add
      field: attributes["data_stream.namespace"]
      value: "default"     


extensions:
  file_storage:
    directory: /var/lib/otelcol/file_storage

processors:
  # Adding  host.name (this is done OOTB by Logstash)
  resourcedetection/system:
    detectors: ["system"]
    system:
      hostname_sources: ["os"]
      resource_attributes:
        os.type:
          enabled: false

  transform/json_parse:  #[3]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - merge_maps(attributes, ParseJSON(body), "upsert")
        conditions: 
          - IsMatch(body, "^\\{")
      

  transform/parse_date:  #[4]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(time, Time(attributes["timestamp"], "%Y-%m-%d %H:%M:%S.%L %z"))
          - delete_key(attributes, "timestamp")
        conditions: 
          - attributes["timestamp"] != nil

  transform/override_message_field: [5]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(body, attributes["message"])
          - delete_key(attributes, "message")

  transform/set_log_severity: # [6]
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(severity_text, attributes["log_level"])          

  attributes/rename_attributes: #[6]
    actions:
      - key: service.name
        from_attribute: service
        action: insert
      - key: service
        action: delete
      - key: log_level
        action: delete

exporters:
  elasticsearch: #[7]
    endpoints: ["elasticsearch-enpoint:443"]
    api_key: ${env:ES_API_KEY}
    tls:
    logs_dynamic_index:
      enabled: true
    mapping:
      mode: ecs

service:
  extensions: [file_storage]
  pipelines:
    logs/json:
      receivers: 
        - filelog/json
      processors:
        - resourcedetection/system    
        - transform/json_parse
        - transform/parse_date        
        - transform/override_message_field
        - transform/set_log_severity
        - attributes/rename_attributes
      exporters: 
        - elasticsearch

These will generate the following document in Elasticsearch

{
    "@timestamp": "2024-10-11T12:34:56.123000000Z",
    "data_stream": {
        "dataset": "otel",
        "namespace": "default",
        "type": "logs"
    },
    "event": {
        "original": "{\"log_level\":\"WARNING\",\"message\":\"User login successful\",\"service\":\"auth-service\",\"timestamp\":\"2024-10-11 12:34:56.123 +0100\",\"user\":{\"id\":\"A1230\",\"name\":\"john_doe\"}}"
    },
    "host": {
        "hostname": "my-laptop",
        "name": "my-laptop",
     },
    "log": {
        "file": {
            "name": "json.log"
        },
        "level": "WARNING"
    },
    "message": "User login successful",
    "service": {
        "name": "auth-service"
    },
    "user": {
        "id": "A1230",
        "name": "john_doe"
    }
}

Conclusion

In this post, we showed examples of how to convert a typical Logstash pipeline into an OpenTelemetry Collector pipeline for logs. While OpenTelemetry provides powerful tools for collecting and exporting logs, if your pipeline relies on complex transformations or scripting, Logstash remains a superior choice. This is because Logstash offers a broader range of built-in features and a more flexible approach to handling advanced data manipulation tasks.

What's Next?

Now that you've seen basic (but realistic) examples of converting a Logstash pipeline to OpenTelemetry, it's your turn to dive deeper. Depending on your needs, you can explore further and find more detailed resources in the following repositories:

If you encounter specific challenges or need to handle more advanced use cases, these repositories will be an excellent resource for discovering additional components or integrations that can enhance your pipeline. All these repositories have a similar structure with folders named

receiver
,
processor
,
exporter
,
connector
, which should be familiar after reading this blog. Whether you are migrating a simple Logstash pipeline or tackling more complex data transformations, these tools and communities will provide the support you need for a successful OpenTelemetry implementation.

Share this article