Utiliser Logstash pour fractionner les données et les envoyer vers différentes sorties

Logstash est un pipeline open source côté serveur, destiné au traitement des données. Sa mission ? Ingérer des données, les transformer, puis les envoyer vers une ou plusieurs sorties. Dans cet article de blog, je prendrai l'exemple du marché boursier pour vous montrer comment utiliser Logstash pour ingérer les données provenant de différentes bourses, puis envoyer les données correspondant à chacune d'entre elles vers une sortie distincte. Pour ce faire, nous suivrons les étapes suivantes :

  1. Créer des copies de chaque document depuis le flux d'entrée d'une place boursière.
  2. Filtrer chacune de ces copies, afin qu'elle ne contienne que les champs valides pour une place boursière donnée.
  3. Ajouter des métadonnées à chaque copie, afin d'indiquer à quelle place boursière correspondent les données qu'elle contient.
  4. Évaluer les métadonnées de chaque document, afin que ces derniers soient dirigés vers la bonne sortie.

Remarque : dans cet article de blog, je n'utiliserai pas la communication pipeline-to-pipeline (disponible en bêta depuis la version 6.5), qui pourrait aussi permettre d'atteindre certains des résultats décrits ici.

Exemple de fichier d'entrée

En entrée vers Logstash, nous allons utiliser un fichier CSV contenant des valeurs boursières repères. Voici quelques exemples d'entrées CSV :

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

Les valeurs séparées par une virgule correspondent aux valeurs temporelles du champ "time" et aux valeurs repères des places boursières suivantes : "DAX", "SMI", "CAC" et "FTSE" . Copiez et collez les lignes ci-dessus dans un fichier CSV que vous nommerez "stocks.csv", qui servira d'entrée dans l'exemple de pipeline Logstash.

Exemple de pipeline Logstash

Ci-dessous, nous allons voir un pipeline Logstash qui exécute ce qui suit :

  1. Lire les valeurs boursières en entrée au format CSV depuis un fichier CSV.
  2. Mapper chaque ligne de l'entrée CSV vers un document JSON, dans lequel les colonnes CSV correspondent aux champs JSON suivants :  "time", "DAX", "SMI", "CAC" et "FTSE".
  3. Convertir le champ "time" au format Unix.
  4. Utilisez le plug-in "clone filter" pour créer deux copies de chaque document (outre le document d'origine). Le plug-in "clone filter" ajoute automatiquement un nouveau champ "type" à chaque nouvelle copie du document, où "type" correspond aux noms donnés dans le tableau de clones. Nous définissons les types pour qu'ils correspondent à "clone_for_SMI" ou "clone_for_FTSE", et chaque clone ne contiendra au final que les données correspondant à la bourse "SMI" ou à la bourse "FTSE".
  5. Pour chaque clone :
    1. Utilisez le plug-in "prune filter" pour supprimer tous les champs, sauf ceux qui correspondent à la place boursière concernée.
    2. Ajoutez les métadonnées à chaque document correspondant au "type" ajouté par la fonction "clone". Cette opération est nécessaire, car nous utilisons la fonction "prune", qui supprime le "type" inséré par la fonction "clone", or, cette information est requise en sortie, afin de diriger le document vers la sortie appropriée.
  6. Utilisez le plug-in de sortie Elasticsearch pour Logstash, afin d'écrire les documents correspondant à chaque place de marché vers une sortie Elasticsearch distincte – la sortie étant déterminée par la valeur que nous avons définie dans le champ "metadata" (métadonnées) ajouté à l'étape 5. Pour simplifier le code ci-dessous, chaque sortie Elasticsearch écrit vers un seul index compris dans un cluster Elasticsearch local. Si vous devez utiliser plusieurs clusters comme sorties, vous pouvez facilement modifier chaque déclaration de sortie Elasticsearch pour spécifier des hôtes Elasticsearch uniques.

Voici un pipeline Logstash qui exécute les étapes que nous venons de voir ci-dessus (j'y ai ajouté les numéros des étapes, ainsi que des commentaires). Copiez ce pipeline dans un fichier que vous nommerez "clones.conf" pour exécuter ce qui suit :

## ÉTAPE 1
input {
  file {
    # n'oubliez pas de modifier le chemin, pour utiliser votre fichier stocks.csv
    path => "${HOME}/stocks.csv"
    # Les lignes ci-dessous assurent la relecture de toute l'entrée 
    # chaque fois que Logstash s'exécute (utile pour le débogage).
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
## ÉTAPE 2
filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 
      'DAX' => 'float'
      'SMI' => 'float'
      'CAC' => 'float'
      'FTSE' => 'float'
    }
  }
## ÉTAPE 3  
date {
    match => ['time', 'UNIX']
  }
## ÉTAPE 4
  # La ligne ci-dessous va créer 2 copies supplémentaire 
  # de chaque document (en plus de l'original, 
  # donc 3 documents au total). 
  # Un champ "type" est automatiquement ajouté à chaque copie 
  # Il correspond au nom donné dans le tableau.
  clone {
    clones => ['clone_for_SMI', 'clone_for_FTSE']
  }
## ÉTAPE 5
  if [type] == 'clone_for_SMI' {
    # Tout supprimer sauf "SMI"
    prune {
       whitelist_names => [ "SMI"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_SMI" } 
    }
  } 
  else if [type] == 'clone_for_FTSE' {
    prune {
       whitelist_names => [ "FTSE"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_FTSE" } 
    }
  } 
}
## ÉTAPE 6
output {
  # La sortie stdout ci-dessous sert uniquement au débogage 
  # Elle peut être supprimée
  stdout { 
    codec =>  rubydebug {
      metadata => true
    }
  }
  if [@metadata][type] == 'only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Tester le pipeline Logstash

Pour tester ce pipeline avec les exemples de données CSV, vous pouvez exécuter la commande suivante en la modifiant, afin d'utiliser les chemins de votre système. Remarque : vous n'êtes pas obligé de spécifier "config.reload.automatic", mais cela vous permet de recharger automatiquement "clones.conf" sans devoir redémarrer Logstash :

./logstash -f ./clones.conf --config.reload.automatic

Une fois que Logstash a lu le fichier "stocks.csv" et terminé le traitement, nous pouvons afficher les trois index ainsi créés, qui sont nommés "smi_data", "ftse_data" et "stocks_original".

Vérifier l'index SMI

GET /smi_data/_search

Cela permet d'afficher les documents présentant la structure suivante. Vous remarquerez que seules les données "SMI" apparaissent dans l'index "smi_data".

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5    
        }
      }

Vérifier l'index FTSE

GET /ftse_data/_search

Cela permet d'afficher les documents présentant la structure suivante. Vous remarquerez que seules les données "FTSE" apparaissent dans l'index "ftse_data".

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Vérifier l'index des documents d'origine

GET /stocks_originals/_search

Cela permet d'afficher les documents présentant la structure suivante. Vous remarquerez que la version d'origine non filtrée des documents apparaît dans l'index "stocks_original".

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Pour conclure

Dans cet article de blog, nous avons étudié une infime partie des capacités de Logstash. Nous avons pris l'exemple du marché boursier, pour montrer comment utiliser Logstash pour ingérer les données provenant de différentes places boursières, les traiter, puis les envoyer vers des sorties distinctes. Si vous utilisez la version d'essai de Logstash et de la Suite Elastic et que vous avez des questions, n'hésitez pas à demander de l'aide sur nos forums de discussions publiques.