Kafka für den Elastic Stack, Teil 1

Der Elastic Stack und Apache Kafka sind im Bereich der Log-/Event-Verarbeitung eng miteinander verknüpft. Viele Unternehmen nutzen Kafka als Broker zur Speicherung und Verarbeitung von großen Datenvolumen. Wir haben festgestellt, dass Kafka bei vielen Systemlandschaften eine wichtige Rolle als temporärer Zwischenspeicher spielt, bevor diese für die schnelle Suche und Analyse an Elasticsearch weitergereicht werden. In einer Reihe von Blog-Beiträgen möchten wir beleuchten, wie man Kafka einrichtet und verwaltet, wenn es mit dem Elastic Stack integriert werden soll. Außerdem berichten wir von unseren Erfahrungen im Einsatz von Kafka und Logstash bei hohem Datenvolumen.

Hinweis: In diesen Beiträgen beziehen wir uns auf Kafkas Version 0.8.xEinige Funktionalitäten haben sich in der neuesten Version von Kafka (0.9.x) geändert, aber 0.8.x ist weiterhin beliebt und weit verbreitet.


Die Grundlagen

Beschäftigen wir uns zuerst mit ein paar einfachen Grundlagen. In der Dokumentation von Kafka heißt es:

Apache Kafka ist ein Publish-Subscribe-Messaging-System, das als verteiltes Commit-Log implementiert wurde.

Kafka wurde bei LinkedIn zur Bewältigung hoher Event-Datenvolumen entwickelt. So wie viele andere Messaging-Broker implementiert es Publisher-Consumer- und Queue-Semantiken durch die Gruppierung von Daten in sogenannte Topics. Als Applikation kann man in ein Topic schreiben und Daten aus einem Topic für die Verarbeitung abrufen. Der größte Unterschied bzw. der neue Design-Ansatz bei Kafka ist, dass die Komplexität vom Erzeuger auf die Empfänger übertragen wird und es in großem Maße auf den Cache des Dateisystems zurückgreift. Diese Designentscheidungen gepaart mit der von Anfang an eingeplanten Verteilung führen dazu, dass Kafka in vielen Streaming-Anwendungsfällen mit hohen Datenvolumen so erfolgreich ist.

Logstash ist nativ über die Java APIs mit Kafka integriert. Es bietet sowohl Input- als auch Output-Plugins, damit Anwendungen direkt über Logstash in Kafka lesen und schreiben können. Die ersten Schritte bei der Konfiguration sind relativ einfach:

kafka {
   zk_connect => "hostname:port"
   topic_id => "apache_logs"
   ...
}

Kafka benötigt Apache ZooKeeper. Wenn du Kafka nutzt, wird ein ZooKeeper-Cluster benötigt. Wir kommen später noch einmal darauf zurück.

Wann sollte man Kafka mit dem Elastic Stack verwenden?

Szenario 1: Event-Spitzen

Log-Daten oder Event-basierte Daten verfügen nur selten über ein konsistentes oder vorhersehbares Volumen. Stellen wir uns folgendes Szenario vor: Wir haben an einem Freitagabend ein Upgrade an einer Applikation vorgenommen (weshalb man freitags keine Upgrades vornehmen sollte, ist ein Thema für einen anderen Blog-Artikel). Die Anwendung hat einen schweren Bug, bei dem Daten exzessiv geloggt wurden, so dass die Logging-Infrastruktur überflutet wird. Solche Datenspitzen oder -explosionen kommen in anderen Multi-Tenant-Anwendungsfällen ebenfalls regelmäßig vor, zum Beispiel in der Gaming- oder E-Commerce-Branche. Ein Message-Broker wie Kafka wird in solch einem Szenario eingesetzt, um Logstash und Elasticsearch vor dieser Datenflut zu schützen. 

In dieser Architektur erfolgt die Verarbeitung in zwei separaten Stufen. Zuerst wird die Shipper-Stufe durchlaufen und dann die Indexer-Stufe. Die Logstash-Instanz, die Daten aus unterschiedlichen Datenquellen liest, wird Shipper genannt, da diese Instanz keine Verarbeitung vornimmt. Die Aufgabe dieser Instanz ist es, Daten sofort in ein Kafka-Topic zu schreiben. Aus diesem Grund handelt es sich dabei um einen Producer. Auf der anderen Seite konsumiert eine stärkere Logstash-Instanz Daten in ihrer eigenen gedrosselten Geschwindigkeit, während gleichzeitig aufwändige Transformationen wie Grok, DNS-Suche und die Indexierung in Logstash vorgenommen werden. Diese Instanz heißt Indexer.

Obwohl Logstash traditionell als Shipper verwendet wurde, empfehlen wir, die verfügbaren Elastic Beats-Produkte als spezialisierte Shipper einzusetzen. Filebeat ist beispielsweise ein unkomplizierter, Ressourcen freundlicher Shipper, der Dateien permanent auslesen und über Logstash an Kafka schicken kann.

Kafka

Hinweis: Derzeit kann Filebeat nicht direkt in Kafka schreiben, aber ab Version 5.0.0 (die sich derzeit im Pre-Release-Status befindet) kannst du Kafka als eine der Output-Optionen konfigurieren. Diese Verbesserung optimiert die oben aufgeführte Architektur in Anwendungsfällen, in denen Daten mit Beats eingelesen werden. Bitte teste diese und weitere neue Funktionen in unseren Alpha-Versionen und teile uns mit, was du davon hältst! Unter allen fleißigen Testern, die uns helfen, wird eine kostenlose Eintrittskarte zur Elastic{ON}17 verlost!


Szenario 2: Elasticsearch ist nicht erreichbar

Stellen wir uns ein weiteres Szenario vor: Wir planen das Upgrade eines Multi-Node-Clusters von Elasticsearch von Version 1.7 auf Version 2.3, wodurch ein kompletter Neustart des Clusters notwendig wird. Oder eine Situation, in der Elasticsearch für einen längeren Zeitraum als erwartet nicht erreichbar ist. Wenn du Elasticsearch aus vielen Datenquellen befüllst und es nicht möglich ist, diese Indizierung zu stoppen, kann hier ein Message-Broker wie Kafka helfen. Wenn du die Shipper- und Indexer-Architektur von Logstash mit Kafka nutzt, kannst du weiterhin Daten von Edge-Nodes streamen und sie vorübergehend in Kafka vorhalten. Sobald Elasticsearch wieder verfügbar ist, macht Logstash an der letzten Stelle weiter und hilft dir dabei, den Rückstand an Daten aufzuholen. Das passt auch gut zur Elastic-Philosophie unserer Software. Du kannst die Verarbeitungs- und Indexierungsleistung temporär verbessern, indem du zusätzliche Logstash-Instanzen hinzufügst, die sich um die Verarbeitung des selben Kafka-Topics kümmern. Du kannst auch in Elasticsearch zusätzliche Nodes hinzufügen. Die einfache horizontale Skalierung von Elasticsearch ermöglicht dies ohne viel Aufwand. Wenn eventuelle Rückstände beim Indizieren aufgeholt sind, kannst du wieder auf die ursprüngliche Anzahl an Instanzen herunterskalieren.


Anti-Pattern: Wann sollte man Kafka nicht mit dem Elastic Stack verwenden?

Genauso, wie es gut ist, zu wissen, wann man Kafka nutzen sollte, ist es auch gut zu wissen, wann man es nicht nutzen sollte. Es gibt nichts umsonst. Kafka ist auch nur eine weitere Software, die du in deiner Produktionsumgebung betreiben musst. Dazu gehören das Monitoring, die Reaktion auf Warnmeldungen, Upgrades und alles, was sonst noch so mit dem erfolgreichen Betrieb von Software in Produktion einhergeht. Du überwachst doch alle deine Software-Konfigurationen in Produktion, oder? Wenn es zur zentralisierten Log-Verwaltung kommt, gibt es oft die pauschale Aussage, dass Logs so schnell wie möglich von deinen Edge-Nodes aus verschickt werden müssen. Obwohl das natürlich auf einige Anwendungsfälle zutrifft, solltest du dich immer fragen, ob bei dir die Notwendigkeit besteht! Wenn du eine höhere Suchlatenz tolerieren kannst, musst du Kafka nicht verwenden. Filebeat, das Dateien von Edge-Nodes ausliest und weiterleitet, toleriert das Rotieren von Logfiles. Das bedeutet, wenn deine Applikation mehr Logs generiert, als Logstash/Elasticsearch in Echtzeit aufnehmen kann, können die Logs für alle Dateien rotiert werden (dafür nutzt man beispielsweise Log4j oderlogrotate) und die Indexierung wird fortgeführt. Natürlich ist dann auch ausreichend Speicherplatz für die Speicherung dieser Logs auf deinen Servern erforderlich. In anderen Worten: In diesem Szenario wird das lokales Dateisystem zum temporären Puffer.


Design-Überlegungen für Kafka und Logstash

Unten beschreiben wir ein paar Design-Aspekte bei der Verwendung von Kafka mit Logstash. Der Logstash-Input nutzt die High-Level Kafka-Consumer-API und der Logstash-Output nutzt die neue Producer-API.

Topics

Topics sind logische Gruppierungen von Nachrichten. Damit lassen sich Daten von anderen Consumern bei Bedarf isolieren. Bitte beachte, dass Kafka 0.8 über keinerlei integrierte Sicherheitsfunktion verfügt, wodurch jeder Consumer auf jedes verfügbare Topic auf dem Broker zugreifen kann. Wie viele Topics du brauchst und wie du deine Daten modellierst, hängt von den Daten ab. Hier sind ein paar Strategien: 

Nutzerbasierter Datenfluss: In diesem Fall erstellst du ein Topic pro Nutzer. Bitte denke daran, dass Kafka alle Partitionen in ZooKeeper registriert. Daher müssen in diesem Fall Hunderte oder Tausende Topics registriert werden. Wenn du nur eine geringe Anzahl an Anwendern hast, z. B. Abteilungen, funktioniert diese Strategie der Partitionierung gut.

HOV laneAttributbasierter Datenfluss: Für Log und Event-basierte Daten kannst du auch mehrere Nutzer anhand von Attributen wie dem Datenvolumen und der erwarteten Suchlatenz in einem Topic gruppieren. Denke daran, dass die Latenz bei der Suche nach Events höher ist, je mehr Zeit diese Events vor der Indexierung in Elasticsearch in der Warteschlange verbringen. Eine Lösung wäre die Erstellung von Topics anhand erwarteter SLAs: Topics mit „hoher“, „mittlerer“ und „niedriger“ Priorität. Ähnlich könnte es beim Datenvolumen ablaufen. Hast du einen Kunden/Nutzer, der bekannt dafür ist, ohne Ankündigung viele Daten zu produzieren? Erstelle für diesen Nutzer/Kunden ein neues Topic. In einer Multi-Tenant-Implementierung hat es sich bewährt, ein Topic für derartige Datenströme zu haben. Wenn also ein Benutzer sein Datenvolumen überschreitet oder zu viele Datenmengen in den letzten X Minuten/Stunden produziert wurden, kannst du ihn im laufenden Betrieb in dieses Topic verschieben. So hältst du andere Topics frei von derartigem Traffic und das Auslesen dieses Topics wird nicht für alle verlangsamt. Eine schöne Analogie ist in diesem Fall die Autobahn: Auf der schnellsten Spur soll der Verkehr möglichst flüssig laufen. Daher wird erwartet, dass langsamere Fahrzeuge auf den anderen Spuren fahren. Stell dir sich die Topics bei Kafka als Spuren und Events als Fahrzeuge vor!

Im Allgemeinen sollte man pro externer Datenquelle auch eigene Topics bilden. Bei Kafka lässt sich die Anzahl der Partitionen pro Topic konfigurieren, um auf Topic-Ebene zu skalieren. Das bedeutet auch, dass Logstash-Instanzen pro Topic skalieren können. 

Wenn du erwartest, dass bestimmte Quellen in Zukunft ein höheres Volumen aufweisen, kannst du mehr Partitionen für ein Topic konfigurieren, so dass in Zukunft auch mehr Logstash Instanzen verwendet werden können. Topics werden dynamisch erstellt, wenn die Daten das erste Mal für ein nicht vorhandenes Topic geschrieben werden, oder man erstellt diese vorher manuell..

kafka-topics.sh --zookeeper zk_host:port --create --topic user1
    --partitions 8 --replication-factor 1<span></span>

Partitions

Jetzt ist ein guter Zeitpunkt, um über Partitionen zu sprechen. Aus Kafkas Dokumentation:

„Die Partition kontrolliert, auf wie viele (Commit-) Logs das Topic aufgeteilt wird. Die Anzahl an Partitionen wird von verschiedenen Dingen beeinflusst. Zuerst muss jede Partition komplett auf einen einzigen Server passen. Wenn du also 20 Partitionen hast, kümmern sich maximal 20 Server um den kompletten Datensatz (und die Lese- und Schreiblast). Kopien einzelner Partitionen (Replicas) spielen hierbei keine Rolle. Außerdem beeinflusst die Anzahl an Partitionen die maximale Anzahl an erlaubten parallelen Consumern."

Grundsätzlich lässt sich sagen, dass mit zunehmender Anzahl an Partitionen auch der Datendurchsatz zunimmt, wenn Daten konsumiert werden. Kafka bietet eine Option zur Kontrolle darüber, welche Daten in welcher Partition landen. Standardmäßig werden die Daten bei der Verwendung von Logstash einer Partition via Round-Robin zugewiesen. Durch die genaue Angabe des message_key in der Logstash-Konfiguration kannst du steuern, wie deine Daten einer Partition zugewiesen werden. In einigen Fällen kann es effizient sein, weniger Topics/Partitionen zu haben, um die ZooKeeper-Beschränkung zu umgehen, aber dennoch kann man mehrere Benutzer unter festen Partitionen gruppieren, indem du die user_id als message_key verwendest. Wenn ein Key angegeben ist, wird die Partition durch den Hashwert des Keys ermittelt.

Eine weitere wichtige Eigenschaft, derer man sich in Kafka bewusst sein sollte, ist die Reihenfolge der Nachrichten. Kafka garantiert die Nachrichtenreihenfolge nur innerhalb derselben Partition. Haben Nachrichten aus Datenquellen also keinen message_key, werden diese auf verschiedene Partitionen verteilt. Kafka garantiert in diesem Fall nicht die richtige Reihenfolge im Consumer. Wenn Daten unveränderlich sind – das kommt vor allem bei Logging-Anwendungsfällen vor – kann das eine akzeptable Eigenschaft sein. Wenn du eine strenge Reihenfolge brauchst, solltest du sicherstellen, dass die Daten einer einzigen Partition zugewiesen werden.

Consumer-Gruppen: Skalierbarkeit und Fehlertoleranz

Mehrere Kafka-Consumer, die Daten aus ähnlichen Topics verarbeiten, bilden eine Consumer-Gruppe, die innerhalb des Clusters einen eigenen Namen erhält. Nachrichten, die nach Kafka geschrieben werden, werden über die Instanzen in der Gruppe verteilt, aber jede Nachricht wird nur von einem Consumer in der Gruppe verarbeitet, d. h. es gibt keine Überschneidungen. Logstash-Instanzen, die Daten aus Kafka auslesen, bilden eine Consumer-Gruppe mit einer standardmäßigen Gruppen-ID namens logstash.  Du kannst jederzeit neue Logstash-Instanzen starten, um mehr Daten für das abonnierte Topic zu lesen. Standardmäßig tritt die neu gestartete Logstash-Instanz der logstash Consumer-Gruppe bei. 

Dieser Prozess – also, wenn ein neuer Consumer einer Consumer-Gruppe beitritt – aktiviert in Kafka eine Umverteilung. Wir haben zuvor bereits erwähnt, dass Logstash einen High-Level Kafka-Consumer verwendet, daher wird die Logik zur Umverteilung an die Kafka-Bibliothek delegiert. Durch diesen Prozess werden die Partitionen automatisch den aktuellen Consumern anhand der verfügbaren Metadaten in ZooKeeper erneut zugewiesen. Ein weiterer Grund für den Einsatz mehrerer Logstash-Instanzen ist die höhere Fehlertoleranz. Wenn eine Instanz ausfällt, nimmt Kafka die Umverteilung vor und verteilt die Zuweisungen auf die verbleibenden Logstash-Instanzen.

Das alles hängt eng mit der consumer_threads - Einstellung im Logstash-Input zusammen. Diese Einstellung steuert die Anzahl an Threads, die die Kafka-Partitionen verarbeiten. Idealerweise solltest du für eine perfekte Verteilung so viele Threads wie Partitionen haben. Wenn du mehr Threads als Partitionen hast, sind einige Threads nicht ausgelastet. Bei weniger Threads als Partitionen müssen einige Threads mehrere Partitionen verarbeiten.

Stell dir ein Szenario vor, bei dem das Topic apache_logs 16 Partitionen hat. Wir könnten mit dieser Konfiguration eine Logstash-Instanz auf einer 8-Kern-Maschine beschleunigen:

input {
   kafka {
   zk_connect => "kafka:2181"
   group_id => "logstash"
   topic_id => "apache_logs"
   consumer_threads => 16
  }
}

Oder wir könnten zwei Logstash-Instanzen auf zwei Maschinen starten, bei dem wir die consumer_threads jeweils auf 8 setzen. Die letztgenannte Konfiguration ist die bessere Wahl, da hier die komplette CPU der Maschine genutzt, aber auch eine Fehlertoleranz für den Fall von Ausfällen hinzugefügt wird.

Du solltest sicherstellen, dass die Anzahl der Partitionen immer ein Vielfaches der Anzahl an Logstash-Threads/-Instanzen ist. So wird garantiert, dass die Instanzen die richtige Balance haben. Die Partitionierung gewährleistet – ähnlich wie das Sharding in Elasticsearch – dass wir später mehr Kapazität (in diesem Fall Logstash-Instanzen) hinzufügen können.


Serialisierungsformate

Kafka verwendet einfache Byte-Arrays, um Nachrichten persistent zu speichern. So können wir eigentlich jedes Format in Kafka nutzen. Aber es ist generell empfehlenswert, ein Serialisierungsformat zu wählen, das kompakt und schnell ist. Kafka kann mit serialisierten Nachrichtenformaten umgehen, indem der value_serializer in den Outputs und die decoder_class in den Inputs spezifiziert werden. Wenn du Erfahrung im Umgang mit Logstash hast, denkst du nun wahrscheinlich an Codecs. Du kannst sowohl Logstash-Codecs als auch Kafka-Serialisierer verwenden, um die Nachrichtendarstellung in und außerhalb von Kafka-Topics zu verwalten.

Andere Logstash-Codecs, die für das Kafka-Ökosystem relevant sind: plain, avro, und avro_schema_registry.

Wenn du deinen eigenen Serialisierer/Deserialisierer schreiben möchtest, kannst du das in deiner favorisierten JVM-Sprache machen. Da sich die Klassen nicht im Klassenpfad von Logstash befinden, müssen die entsprechende Bibliothek explizit zum Java-Klassenpfad hinzufügt werden.

export CLASSPATH=$CLASSPATH:/path/to/kafkaserializers.jar; bin/logstash -f ..

Bitte beachte, dass die Dekodierung von Nachrichten im Logstash-Input mit einem einzelnen Thread erfolgt. Aus diesem Grund führt ein teures Serialisierungsformat wie json zu einer Verschlechterung der Gesamt-Performance in der Pipeline.

Fazit

In diesem Beitrag haben wir uns mit den grundlegenden Konzepten in Kafka beschäftigt und gezeigt, wie man Kafka zusammen mit dem Elastic Stack nutzen kann. Im nächsten Beitrag geht es direkt mit den operativen Aspekten weiter und wir zeigen Tipps für den Einsatz von Kafka mit Logstash. Falls du in der Zwischenzeit Fragen hast, kannst du uns gerne in unserem Forum oder bei Twitter kontaktieren.

Update: Willst du mehr hierzu wissen? Lies dann Teil 2 dieser Serie für operative und Monitoring-Tipps.