Aufteilen von Daten und Senden der Daten an unterschiedliche Zielorte mit Logstash

Logstash ist eine Open-Source-Software zur serverseitigen Verarbeitung von Datenströmen, die Daten ingestiert und transformiert, bevor sie sie an einen oder mehrere Zielorte (Outputs) sendet. In diesem Blog zeige ich Ihnen anhand eines Beispiels, wie man mit Logstash Daten für mehrere Aktienindizes ingestieren und sie dann je nach Aktienindex jeweils an einen anderen Zielort senden lassen kann. Dafür sind die folgenden Schritte erforderlich:

  1. Von jedem Dokument im eingehenden Aktienindex-Datenstream werden Kopien erstellt.
  2. Die Kopien werden so gefiltert, dass sie nur die für den jeweiligen Aktienindex gültigen Felder enthalten.
  3. Den Kopien werden Metadaten hinzugefügt, aus denen hervorgeht, zu welchem Aktienindex die Daten gehören.
  4. Es wird eine Auswertung der Metadaten in jedem Dokument eingerichtet, damit das Dokument an den richtigen Zielort gesendet wird.

Beachten Sie, dass ich in diesem Blogpost nicht die Pipeline-zu-Pipeline-Kommunikation (in 6.5 in der Betaphase) nutze, mit der sich einige der hier beschriebenen Funktionen ebenfalls umsetzen lassen dürften.

Beispiel für eine eingehende Datei

Als eingehende Datei verwenden wir eine CSV-Datei, die Aktienindexwerte enthält. Die Werte in dieser CSV-Datei sehen wie folgt aus (als Dezimaltrennzeichen wird hier der Punkt verwendet):

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

Die Werte stehen jeweils für einen Zeitpunkt und den Wert der folgenden Aktienindizes: DAX, SMI, CAC und FTSE. Wir kopieren die Zeilen oben in eine CSV-Datei namens „stocks.csv“, die wir dann an die Logstash-Pipeline in unserem Beispiel senden können.

Beispiel für eine Logstash-Pipeline

Die unten dargestellte Logstash-Pipeline führt Folgendes aus:

  1. Sie liest Aktienindexwerte aus einer CSV-Datei (Datei mit kommagetrennten Werten).
  2. Sie ordnet jede Zeile in der CSV-Datei einem JSON-Dokument zu, wobei die Spalten in der CSV-Datei für die folgenden JSON-Felder stehen: „time“ (Zeitpunkt), „DAX“, „SMI“, „CAC“ und „FTSE“.
  3. Sie wandelt das Feld „time“ ins Unix-Format um.
  4. Sie erstellt mithilfe des Filter-Plugins „clone“ von jedem Dokument zwei Kopien (diese Kopien werden zusätzlich zum Originaldokument angelegt). Der Filter „clone“ fügt jeder neuen Dokumentkopie automatisch ein neues Feld „type“ hinzu, wobei „type“ mit den Namen korrespondiert, die im Klone-Array festgelegt wurden. Als Typennamen haben wir „clone_for_SMI“ oder „clone_for_FTSE“ definiert und jeder Klon enthält am Ende nur Daten entweder für den SMI oder aber für den FTSE.
  5. Für jeden Klon gilt Folgendes:
    1. Mit dem Filter-Plugin „prune“ werden die Felder entfernt, die nicht für den konkreten Aktienindex gültig sind.
    2. Jedes Dokument wird entsprechend dem Wert des von der Funktion „clone“ hinzugefügten Feldes „type“ mit Metadaten versehen. Dies ist notwendig, da wir die Funktion „prune“ verwenden werden, mit der die von „clone“ hinzugefügte Angabe zum Typ entfernt wird. Wir benötigen aber den Typ, um beim Senden der Daten an den Zielort entscheiden zu können, wohin das Dokument gehen soll.
  6. Mit dem Elasticsearch-Output-Plugin für Logstash werden die Dokumente gemäß ihrer Aktienindexzugehörigkeit jeweils an einen anderen Elasticsearch-Zielort gesendet. Welcher das ist, richtet sich nach dem Wert im Metadatenfeld, den wir in Schritt 5 hinzugefügt haben. Zur Vereinfachung des Codes unten wird jede Elasticsearch-Ausgabe in einen eigenen Index in einem lokalen Elasticsearch-Cluster geschrieben. Wenn mehrere Cluster als Zielorte verwendet werden sollen, lassen sich in den Elasticsearch-Output-Deklarationen ganz einfach eindeutige Elasticsearch-Hosts angeben.

Das Beispiel unten zeigt eine Logstash-Pipeline, die die oben genannten Schritte ausführt (mit als Kommentar hinzugefügten entsprechenden Schrittnummern). Diese Pipeline kopieren wir zur Ausführung in eine Datei namens „clones.conf“:

## SCHRITT 1
input {
  file {
    # Im Pfad auf die Datei stocks.csv verweisen
    path => "${HOME}/stocks.csv"
    # Festlegen, dass bei jeder Logstash-Ausführung der 
    # gesamte Input gelesen wird (fürs Debugging hilfreich):
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
## SCHRITT 2
filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 
      'DAX' => 'float'
      'SMI' => 'float'
      'CAC' => 'float'
      'FTSE' => 'float'
    }
  }
## SCHRITT 3  
date {
    match => ['time', 'UNIX']
  }
## SCHRITT 4
  # Mit der folgenden Zeile werden von jedem 
  # Dokument 2 Kopien erstellt, sodass wir am Ende 
  # insgesamt 3 Exemplare (Original + 2 Kopien) haben. 
  # Jedem Exemplar wird automatisch ein Feld „type“ 
  # entsprechend dem Namen im Array hinzugefügt.
  clone {
    clones => ['clone_for_SMI', 'clone_for_FTSE']
  }
## SCHRITT 5
  if [type] == 'clone_for_SMI' {
    # Alles außer „SMI“ entfernen
    prune {
       whitelist_names => [ "SMI"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_SMI" } 
    }
  } 
  else if [type] == 'clone_for_FTSE' {
    prune {
       whitelist_names => [ "FTSE"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_FTSE" } 
    }
  } 
}
## SCHRITT 6
output {
  # Die folgende Ausgabe für „stdout“ dient nur zum Debugging 
  # und kann entfernt werden
  stdout { 
    codec =>  rubydebug {
      metadata => true
    }
  }
  if [@metadata][type] == 'only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Testen der Logstash-Pipeline

Zum Testen dieser Pipeline mit den Beispiel-CSV-Daten kann der folgende Befehl verwendet werden, wobei sicherzustellen ist, dass die korrekten Pfade fürs jeweilige System verwendet werden. Die Festlegung von „config.reload.automatic“ ist optional, erlaubt es uns aber, „clones.conf“ neu zu laden, ohne Logstash neu starten zu müssen:

./logstash -f ./clones.conf --config.reload.automatic

Nachdem Logstash die Datei „stocks.csv“ gelesen und die Verarbeitung abgeschlossen hat, können wir die drei Ergebnisindizes („smi_data“, „ftse_data“ und „stocks_original“) prüfen.

Prüfen des Index für den SMI

GET /smi_data/_search

Damit werden Dokumente mit der folgenden Struktur angezeigt. Wie Sie sehen, erscheinen im Index „smi_data“ nur Daten für den SMI.

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5    
        }
      }

Prüfen des Index für den FTSE

GET /ftse_data/_search

Damit werden Dokumente mit der folgenden Struktur angezeigt. Wie Sie sehen, erscheint in Dokumenten im Index „ftse_data“ nur das Feld „FTSE“.

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Prüfen des Index der Originaldokumente

GET /stocks_originals/_search

Damit werden Dokumente mit der folgenden Struktur angezeigt. Wie Sie sehen, erscheint im Index „stocks_original“ die originale ungefilterte Version der Dokumente.

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Fazit

In diesem Blogpost habe ich einen kleinen Teil der Funktionen demonstriert, die Logstash zu bieten hat. Anhand eines Beispiels habe ich gezeigt, wie man mit Logstash Daten für mehrere Aktienindizes ingestieren, sie verarbeiten und die verarbeiteten Daten dann an verschiedene Zielorte senden kann. Wenn Sie gerade dabei sind, Logstash und den Elastic Stack auszuprobieren, und Fragen haben, sind unsere öffentlichen Diskussionsforen die beste Anlaufstelle.