Baue deinen eigenen Beat

Beats ist die Plattform zur Entwicklung von leichtgewichtigen Open-Source Daten-Shippern, die alle möglichen Daten zur späteren Analyse an Elasticsearch senden. Wir haben Packetbeat zur Überwachung des Netzwerk-Verkehrs, Filebeat zum Abrufen der Logs von euren Servern und das neue Metricbeat, das regelmäßig statistische Daten von externen Systemen anfordert. Wenn ihr andere, eigene Daten sammeln wollt, könnt ihr auf Basis des libbeat-Frameworks ganz einfach eigene Beats schreiben. Es gibt bereits mehr als 25 Community Beats, die von der Community entwickelt wurden.

Wir stellen das Beat Generator-Paket bereit, das euch beim Schreiben eigener Beats hilft. In diesem Blog-Beitrag erklären wir die Erstellung eines eigenen Beats mit dem Beat Generator. Der Beat, den wir heute zu Übungszwecken schreiben, heißt Lsbeat. Lsbeat indiziert Informationen zu Dateien und Verzeichnissen – etwa so wie der Unix-Befehl ls. Dieser Artikel basiert auf Unix. Wenn ihr also Windows oder ein anderes Betriebssystem nutzt, befolgt die Anweisungen für euer OS.

Schritt 1 – Eure Golang-Umgebung einrichten


Beats werden in Golang geschrieben. Beat-Entwickler müssen also Golang auf ihren Rechnern installieren. Folgt bei Bedarf den Anweisungen zur Golang-Installation. Aktuell benötigen Beats mindestens Golang 1.6. Achtet darauf, eure Umgebungsvariable $GOPATH richtig zu setzen.

Sehen wir uns an, welchen Code wir für Lsbeat brauchen. Dies ist ein simples Golang-Programm, das ein Verzeichnis als Befehlszeilen-Argument entgegennimmt und alle Dateien und Unterverzeichnisse darin auflistet.

package main
import (
    "fmt"
    "io/ioutil"
    "os"
)
func main() {
    //apply run path "." without argument.
    if len(os.Args) == 1 {
        listDir(".")
    } else {
        listDir(os.Args[1])
    }
}
func listDir(dirFile string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        fmt.Println(f.Name(), dirFile+"/"+f.Name(), f.IsDir(), t, f.Size())
        if f.IsDir() {
            listDir(dirFile + "/" + f.Name())
        }
    }
}

Wir verwenden dazu den Code der Funktion listDir.

Schritt 2 – Generieren


Zum Generieren unseres eigenen Beats nehmen wir den Beat Generator. Zuerst müsst ihr aber cookiecutter installieren. Hier findet ihr die Installationsanleitung. Nach der erfolgreich Installation müssen wir uns einen Namen für den Beat überlegen. Der Name muss ein zusammenhängender Ausdruck in Kleinbuchstaben sein. Für dieses Beispiel verwenden wir lsbeat.

Für das Beat-Grundgerüst solltet ihr euch das Beats Generator-Paket holen, das im beats-Repository verfügbar ist. Nach der Installation von Golang könnt ihr das Beats Generator-Paket mit dem Befehl go get herunterladen. Wenn der Befehl ausgeführt wird, landen alle Quelldateien im Pfad $GOPATH/src.

$ go get github.com/elastic/beats

Damit die Entwicklung auf einem stabilen Branch stattfindet, wird der 5.1 Branch verwendet.

$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

Jetzt erstellt ihr euer eigenes Verzeichnis unter GOPATH und führt cookiecutter mit dem Beat Generator-Pfad aus.

$ cd $GOPATH/src/github.com/{user}
$ cookiecutter $GOPATH/src/github.com/elastic/beats/generate/beat

Cookiecutter wird euch mehrere Fragen stellen. Für project_name gebt Lsbeat und für gifthub_name eure GitHub-ID ein. Die nächsten beiden Fragen zu beat und beat_path sollten bereits automatisch richtig eingestellt sein. Bei der letzten Frage könnt ihr euren Vornamen und Nachnamen eintragen.

project_name [Examplebeat]: lsbeat
github_name [your-github-name]: {username}
beat [lsbeat]:
beat_path [github.com/{github id}]:
full_name [Firstname Lastname]: {Full Name}

Jetzt sollte ein Verzeichnis namens lsbeat mit mehreren Dateien in unserem Ordner erzeugt worden sein. Wechseln wir doch einmal zu diesem Verzeichnis und sehen uns die automatisch erstellten Dateien an.

$ cd lsbeat
$ tree
.
├── CONTRIBUTING.md
├── LICENSE
├── Makefile
├── README.md
├── beater
│   └── lsbeat.go
├── config
│   ├── config.go
│   └── config_test.go
├── dev-tools
│   └── packer
│       ├── Makefile
│       ├── beats
│       │   └── lsbeat.yml
│       └── version.yml
├── docs
│   └── index.asciidoc
├── etc
│   ├── beat.yml
│   └── fields.yml
├── glide.yaml
├── lsbeat.template.json
├── main.go
├── main_test.go
└── tests
    └── system
        ├── config
        │   └── lsbeat.yml.j2
        ├── lsbeat.py
        ├── requirements.txt
        └── test_base.py

Jetzt haben wir eine grobe Vorlage für den Beat, müssen aber noch die Abhängigkeiten festlegen und das Git-Repository einrichten.

Zuerst werden die Abhängigkeiten festgelegt: In unserem Fall ist das nur libbeat. Dann erstellen wir die grundlegenden Konfigurations- und Vorlagendateien. Später werden wir uns die Vorlagen- und Konfigurationsdateien noch genauer ansehen.

$ make setup

Jetzt habt ihr euren eigenen Beat und könnt ihn in ein GitHub-Repository hochladen, um ihn mit der Community zu teilen.

Screen Shot 2016-07-13 at 10.53.58 AM.png

Führt folgende Befehle aus, um Lsbeat in das GitHub-Repository zu übertragen:

$ git remote add origin git@github.com:{username}/lsbeat.git
$ git push -u origin master

Jetzt haben wir einen vollständigen Beat und die erste Version davon auf GitHub veröffentlicht. Nun können wir unseren Beat schreiben, ausführen und uns detaillierter mit dem Code beschäftigen.

Schritt 3 – Konfigurieren


Sobald ihr die obigen Befehle ausführt, werden die Dateien lsbeat.yml und lsbeat.template.json automatisch erstellt. Alle grundlegenden Konfigurationseinstellungen werden bereits in diese Dateien geschrieben.

lsbeat.yml:

lsbeat:
  # Defines how often an event is sent to the output
  period: 1s

period ist ein Parameter, der vom Generator in alle Beats eingefügt wird. Er steht für die sekündliche Prozessiteration von Lsbeat. Ändern wir period doch einfach von 1 auf 10 Sekunden und fügen den neuen Parameter path hinzu, um den Pfad des Oberverzeichnisses festzulegen, den das Programm scannt. Diese Parameter können wir in beat.yml unter dem Verzeichnis etc/ hinzufügen. 

lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."

Nachdem wir neue Parameter hinzugefügt haben, führen wir den Befehl make update aus, um die Änderungen für die Konfigurationsdatei lsbeat.yml zu übernehmen. Jetzt sehen wir, dass die unter etc/beat.yml hinzugefügten Parameter in lsbeat.yml verfügbar sind.

$ make update
$ cat lsbeat.yml
################### Lsbeat Configuration Example #########################
############################# Lsbeat ######################################
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."
###############################################################################

Nach dem Aktualisieren der Konfigurationsdateien solltet ihr config/config.go bearbeiten, um den Parameter path hinzuzufügen.

package config
import "time"
type Config struct {
    Period time.Duration `config:"period"`
    Path   string        `config:"path"`
}
var DefaultConfig = Config{
    Period: 10 * time.Second,
    Path:   ".",
}

In diesem Fall verwenden wir als Standard-Konfigurationsoptionen 10 Sekunden für period und das aktuelle Verzeichnis (.) für das Standard-Directory.

Step 4 - Add code


Jeder Beat muss durch die Definition der Funktionen Run() und Stop() die Beater-Schnittstelle implementieren. Genauere Informationen zu dieser Schnittstelle findet ihr hier.

Dazu müsst ihr einfach nur eine Struktur namens Lsbeat definieren, die das Lsbeat-Objekt definiert, das die Beater-Schnittstelle implementieren soll. Hier können wir auch gleich lastIndexTime hinzufügen. Damit werden die Daten des letzten Zeitstempels gespeichert.

type Lsbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
    lastIndexTime time.Time
}

Außerdem muss jeder Beat die Funktion New() implementieren, die die Beat-Konfiguration empfängt und das Beat-Objekt vom Typ Lsbeat ausgibt.

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }
    ls := &Lsbeat{
        done:   make(chan struct{}),
        config: config,
    }
    return ls, nil
}

Bei Lsbeat möchten wir die Funktionalität der Standardfunktion Run() auf den Export von Informationen zu den Dateien und Unterverzeichnissen erweitern, die in einem Verzeichnis verfügbar sind.

Bevor wir die Funktion Run() verändern, fügen wir aber zuerst unten in der Datei lsbeat.go die Funktion listDir() hinzu, um Informationen zu Dateien und und Verzeichnissen zu sammeln. Sie erzeugt Events wie:

  • "@timestamp": common.Time(time.Now())
  • "type": beatname
  • "modtime": common.Time(t)
  • "filename": f.Name()
  • "path": dirFile + "/" + f.Name()
  • "directory": f.IsDir()
  • "filesize": f.Size()

listDir() indiziert alle Dateien und Verzeichnissen einmal, prüft aber nach dem ersten Durchlauf, ob Dateien oder Verzeichnisse erstellt oder bearbeitet wurden. So werden ab dann nur noch neue Dateien und Verzeichnisse indiziert. Der Zeitstempel des letzten Durchlaufs wird in der Variable lasIndexTime gespeichert.

func (bt *Lsbeat) listDir(dirFile string, beatname string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        path := filepath.Join(dirFile, f.Name())
        if t.After(bt.lastIndexTime) {
            event := common.MapStr{
                "@timestamp": common.Time(time.Now()),
                "type":       beatname,
                "modtime":    common.Time(t),
                "filename":   f.Name(),
                "path":       path,
                "directory":  f.IsDir(),
                "filesize":   f.Size(),
            }
            bt.client.PublishEvent(event)
        }
        if f.IsDir() {
            bt.listDir(path, beatname)
        }
    }
}

Und vergesst beim Bibliotheken-Import nicht das Paket io/ioutil.

import (
    "fmt"
    "io/ioutil"
    "time"
)

Sehen wir uns nun einmal die Funktion Run() an, die die Funktion listDir() aufruft und den Zeitstempel in der Variable lasIndexTime speichert.

func (bt *Lsbeat) Run(b *beat.Beat) error {
    logp.Info("lsbeat is running! Hit CTRL-C to stop it.")
    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    for {
        now := time.Now()
        bt.listDir(bt.config.Path, b.Name) // call listDir
        bt.lastIndexTime = now             // mark Timestamp
        logp.Info("Event sent")
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }
    }
}

Die Funktion Stop() soll die Ausführungsschleife unterbrechen und kann unverändert vom generierten Code übernommen werden:

func (bt *Lsbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

Mit dem Programmieren sind wir fast fertig. Wir müssen beim Mapping neue Felder hinzufügen – und natürlich die Feldinformationen in der Datei etc/fields.yml.

- key: lsbeat
  title: LS Beat
  description: 
  fields:
    - name: counter
      type: integer
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added lsbeat
    - name: modtime
      type: date
    - name: filename
      type: text
    - name: path
    - name: directory
      type: boolean
    - name: filesize
      type: long

Und schließlich die Änderungen übernehmen.

$ make update

filename wird mit dem nGram Tokenizer analysiert. Jetzt fügen wir einen eigenen Analyser in der Datei lsbeat.template.json unter "settings" hinzu.

{
  "mappings": {
        ...
  },
  "order": 0,
  "settings": {
    "index.refresh_interval": "5s",
    "analysis": {
      "analyzer": {
        "ls_ngram_analyzer": {
          "tokenizer": "ls_ngram_tokenizer"
        }
      },
      "tokenizer": {
        "ls_ngram_tokenizer": {
          "type": "ngram",
          "min_gram": "2",
          "token_chars": [
            "letter",
            "digit"
          ]
        }
      }
    }
  },
  "template": "lsbeat-*"
}

Step 5 - Build and run


Jetzt können wir den Beat fertigstellen und ausführen. Führt einfach den Befehl „make“ aus, damit der Code kompiliert und in die ausführbare Binärdatei lsbeat (lsbeat.exe bei Windows) umgewandelt wird.

$ make

Bearbeitet die Datei lsbeat.yml und legt das Stammverzeichnis für die Dateiliste fest. In unserem Fall nehmen wir den $GOPATH, der /Users/ec2-user/go. Achtet darauf, dass der vollständige Verzeichnispfad angegeben wird.

lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "/Users/ec2-user/go"

Achtet auch darauf, dass eure Elasticsearch- und Kibana-Instanzen laufen. Jetzt führen wir Lsbeat aus und schauen, was passiert.

$ ./lsbeat

Ihr könnt mit der _cat-API prüfen, ob der Index erstellt und die Daten ordnungsgemäß indiziert werden.

cat.png 


Wir sehen den Index lsbeat-2016.06.03 und die Dokumentenanzahl. Fragen wir nun Lsbeat mit dem Feld filename ab, das mit dem nGram Tokenizer analysiert wird. Die Abfrage erfolgt mit dem Suchbegriff lsbe.

query.png

Es funktioniert! Glückwunsch, ihr habt gerade einen eigenen Beat geschrieben!