Un soupçon de Kafka pour la Suite Elastic : Partie 1

La Suite Elastic et Apache Kafka fonctionnent en osmose lorsqu’il s'agit de traiter des fichiers journaux et des événements. Bien des entreprises utilisent Kafka en tant que couche transport pour le stockage et le traitement de grandes quantités de données. Ainsi, Kafka joue un rôle capital au sein de nombreux déploiements dans le field. Il gère les données en transit avant qu'elles ne rejoignent Elasticsearch pour fournir une recherche rapide et des capacités d'analyse. Par le biais d’articles de blog, nous aimerions faire la lumière sur la configuration et la gestion de Kafka lors de son intégration à la suite Elastic. Nous évoquerons en particulier l'exploitation de Kafka et de Logstash dans le cadre du traitement d'importants volumes de données.

Note: Remarque : cet article fait référence à la version 0.8.x e Kafka. Certaines des fonctionnalités ont changé dans la dernière version (0.9.x). Toutefois la version 0.8.x demeure populaire et largement utilisée.

Les Bases

Penchons-nous tout d'abord sur quelques concepts simples tirés de la documentation de Kafka :

Apacha Kafka est une messagerie de type publication/abonnement repensée comme un "commit log" distribué.

Kafka a été créé par LinkedIn pour gérer des quantités importantes de données d'événements. Comme tout autre fournisseur de services de messagerie, il gère la sémantique des files d'attente de messages et de la relation publication/consommation en groupant les données par topics. Une application peut écrire dans un topic ou consommer à partir d'un topic. Cependant, Kafka a été conçu différemment, la complexité y est transférée du producteur au consommateur et le cache système y est beaucoup utilisé. Ces choix de conception ainsi que sa facilité d'implémentation font de Kafka le candidat idéal pour le traitement de grands volumes en streaming.

Logstash intégre Kafka de façon native en utilisant les API Java. Il fournit à la fois des plugins input et output afin de vous permettre de lire et de coder dans Kafka directement à partir de Logstash. La configuration de départ est simple :

kafka {
   zk_connect => "hostname:port"
   topic_id => "apache_logs"
   ...
}

Kafka est dépendant d'Apache ZooKeeper, ainsi si vous exécutez Kafka vous aurez besoin d'un accès à un cluster ZooKeeper. Nous nous y pencherons plus tard.

Quand utiliser Kafka avec la suite Elastic ?

Cas de figure n°1 : pic d'événements

Les données de logs ou les données d'événements ont rarement un débit et un volume constant et prévisible. Imaginez ce scénario : vous avez mis à niveau une application le vendredi soir (un de nos prochains articles vous expliquera pourquoi il ne faut pas mettre à niveau le vendredi :) ). L'application déployée souffre d'un bug à l'origine d'un logging excessif de l'information, saturant ainsi votre infrastructure de log. L'occurrence d'un pic de données n'est pas rare dans les cas d'utilisation multi-utilisateurs, ainsi que dans le secteur du jeu vidéo et de l'e-commerce. Dans ce cas de figure, un fournisseur de services de messagerie tel que Kafka va protéger Logstash et Elasticsearch de cette vague soudaine de données.

Dans cette architecture, le traitement est généralement divisé en 2 étapes distinctes : l'étape d'expédition et l'étape d'indexation. D'une part il y a l'instance Logstash qui reçoit les données à partir de sources différentes. Elle est appelée "Shipper" (Expédition) vu qu'elle ne traite quasiment pas de données. Sa responsabilité est d'attribuer les données reçues à un topic Kafka, c'est donc une instance productive. D'autre part, il y une autre instance Logstash, considérablement plus lourde, qui va consommer des données à son propre rythme, tout en exécutant des transformations potentiellement lourdes telles que Grok, DNS lookup et l'indexation dans Elasticsearch. Cette instance se nomme "Indexer" (Indexation).

Bien que Logstash soit généralement utilisé pour l'Expédition, nous recommandons chaudement la suite de produits Elastic Beats. Ces produits sont des shippers spécialisés et légers. Citons par exemple Filebeat, un agent léger axé sur les ressources qui peut streamer le contenu de fichiers et expédier vers Kafka via un récepteur Logstash.

Kafka

Remarque: Filebeat ne peut pas encore directement coder dans Kafka, mais avec la version 5.0.0 (sortie prochaine, vous serez en mesure de configurer Kafka comme output. Cette amélioration simplifie l'architecture que nous venons de décrire, en particulier pour les cas d'utilisation qui ingèrent des données avec Beats. N'hésitez pas à tester ces nouvelles fonctionnalités incroyables dans nos versions alpha et à partagez votre expérience ! La rumeur veut que vous pourriez même gagner un pass gratuit pour Elastic{ON}17 en nous aidant à les tester !

Cas de figure n°2 : Elasticsearch est inaccessible

Imaginez ces scénarios : vous planchez sur une mise à niveau de votre cluster Elasticsearch multi-nœuds (de 1.7 à 2.3) qui requiert un redémarrage intégral, ou bien Elasticsearch est inactif plus longtemps que vous ne l'escomptiez. Si vous avez des sources de données qui envoient un flux vers Elasticsearch et que vous ne pouvez pas vous permettre d'interrompre ces flux, un fournisseur de services de messagerie comme Kafka peut vous être d'une grande aide ! Si vous utilisez l'architecture d'expédition et d'indexation Logstash avec Kafka, vous pouvez continuer à streamer vos données depuis leur source et les maintenir provisoirement dans Kafka. Quand Elasticsearch sera à nouveau actif, Logstash reprendra là où il s'était arrêté et vous traitera les données ainsi accumulées. Ces fonctionnalités se marient à merveille à la souplesse d'Elastic : vous pouvez temporairement accroître votre puissance de traitement et d'indexation en ajoutant des instances Logstash pour qu'elles puissent consommer à partir du même topic Kafka. En outre, vous pourriez aussi ajouter des nœuds elasticsearch. L'adaptabilité horizontale à moindre effort est l'une des fonctionnalités principales d'Elasticsearch. Une fois votre retard rattrapé, vous pourrez revoir votre nombre d'instances à la baisse.

Contre-indications : quand Kafka ne doit pas être utilisé avec la suite Elastic

S'il est important de savoir quand utiliser Kafka, il est tout aussi primordial de savoir les situations auxquelles ce système n'est pas adapté. Tout a un coût. Kafka est une composante de logiciel supplémentaire qu'il vous faudra entretenir dans votre environnement de production. Cela implique de la supervision, de la réactivité aux alertes, des mises à niveau et tout ce qui va de pair avec la mise en œuvre réussie d'un logiciel en production. Vous supervisez bien tous vos logiciels de production, non ?

En matière de gestion centralisée des logs, l'idée la plus répandue est la suivante : les logs doivent être envoyés le plus rapidement possible depuis leur source. Bien que ce soit le cas dans certaines circonstances, il faut vous demander si cela s'applique à votre situation ! Si vous pouvez vous permettre une certaine latence au niveau de la recherche, oubliez Kafka. Filebeat, qui suit et expédie vos logs, gère parfaitement la rotation des fichiers. Ainsi, si votre application émet davantage de logs que Logstash/Elasticsearch peut ingérer en temps réel, ceux-ci peuvent faire l'objet d'une rotation, au moyen de Log4j ou encore logrotate. Cette rotation sera effective sur tous les fichiers, mais ils devront toujours être indexés. Ceci requiert naturellement davantage d'espace disque pour héberger ces fichiers sur vos serveurs. Autrement dit, dans ce cas de figure, votre système de fichiers local fera office de mémoire tampon provisoire.

Kafka et Logstash : quels types d'utilisation ?

Dans cette partie, nous nous penchons sur les utilisations possibles de Kafka avec Logstash. L'input Logstash utilise la nouvelle API consommateur Kafka de haut niveau tandis que l'output Logstash se sert de la nouvelle API producteur.

Sujets

Les topics sont des regroupements logiques de messages. Si nécessaire, ils permettent également de bloquer l'accès des consommateurs à certaines données. Attention, la version 0.8 de Kafka ne possède pas de sécurité intégrée. Tout consommateur peut accéder aux topics présents sur le serveur de messagerie. Combien de topics devez-vous utiliser ? Comment modeler vos données ? Ce sont vos données elles-mêmes qui détiennent la clé. Voici quelques stratégies :

Flux de données axé utilisateur. Dans ce scénario, il vous faut un topic par utilisateur. Gardez bien à l'esprit que Kafka enregistre toutes les partitions dans ZooKeeper. La création de topics par milliers entraîne un coût. Si vous n'avez que peu d'utilisateurs (vous êtes une division d'une entreprise), partitionner au niveau de l'utilisateur est une bonne solution.

HOV laneFlux de données axé attributs. Pour l'enregistrement de logs et d'événements, vous pourriez grouper plusieurs utilisateurs au sein d'un même topic en fonction d'attributs tels que le volume de données ou le temps de latence de recherche attendu. Souvenez-vous que plus les événements passent du temps dans la file d'attente sans être indexés dans Elasticsearch, plus ils sont sujets à un temps de latence de recherche élevé. La création de topics en fonction des attentes en matière de SLA peut être une solution. Par exemple : « haute » « moyenne » « basse ». Vous pouvez faire de même en vous basant sur les volumes de données. Avez-vous un consommateur/utilisateur qui génère des données par salves ? Attribuez lui un nouveau topic. Dans un déploiement multi-utilisateur, il est de bon aloi d'avoir un topic « salves ». Ainsi, lorsque d'un utilisateur a dépassé son volume de données autorisé ou a généré trop de données par salves au fil des X heures/minutes précédentes, vous pouvez les déplacer vers ce topic en live. De cette façon, vous pourrez purger les autres topics de tout trafic inutile, ce qui garantit à chacun une expérience fluide. Pensez à une autoroute. La voie rapide doit être fluide, on s'attend donc à ce que les véhicules plus lents empruntent les autres voies. Les topics Kafka sont comme des voies et les événements comme des voitures !

Afin d'isoler les sources, il est d'usage de séparer les topics en fonction des sources de données. Dans Kafka, vous pouvez configurer le nombre de partitions par sujet. Cela signifie que vous pouvez ajuster les instances Logstash à chaque topic. Si vous pensez que certaines sources vont connaître une croissance plus soutenue, vous pouvez toujours sur-partitionner le topic afin d'éviter tout couac.

Vous avez le choix. Les topics peuvent êtrecréés à la volée, lorsque les données publiées n'entrent dans aucun des topics existants. Ou ils peuvent être créés manuellement en amont.

kafka-topics.sh --zookeeper zk_host:port --create --topic user1
    --partitions 8 --replication-factor 1<span></span>

Partitions

Il est maintenant l'heure d'aborder les partitions ! Les informations suivantes sont tirées de la documentation Kafka :

« Le compteur de partitions contrôle le nombre de fichiers journaux générés à partir d'un topic. Ce compteur a plusieurs effets. D'une part, chaque partition doit pouvoir être hébergée sur un seul serveur. Si vous disposez de 20 partitions, l'ensemble des données (ainsi que les opérations de lecture et d'écriture) sera traité par 20 serveurs maximum (sans compter les copies - replicas). D'autre part, le compteur de partitions à une incidence sur le degré de parallélisme maximum des clients. »

Au final, plus vous avez de partitions, plus votre cadence de consommation des données augmente. Du point de vue du producteur, Kafka vous accorde la possibilité de contrôler les données qui atterrissent dans vos partitions. Avec Logstash, les données sont assignées par défaut à une partition selon un algorithme inspiré de round-robin. En spécifiant la message_key dans les paramètres Logstash, vous pouvez contrôler la façon dont les données sont réparties. Dans certains cas, il peut se révéler plus efficace d'avoir moins de topics/partitions afin de contourner la limitation ZooKeeper. Il faut toutefois grouper les utilisateurs multiples dans des partitions fixes en utilisant le user_id en tant que message_key. Si la clé est indiquée, une partition sera choisie au moyen d'un extrait de la clé.

Autre propriété importante de Kafka : l'ordre des messages. Kafka garantit l'ordre des messages seulement au sein d'une même partition. Si les messages issus de la source de données n'ont pas de clé, ils sont disséminés à travers les partitions et Kafka ne pourra pas garantir l'ordre lors de la consommation. Lorsque les données sont inaltérables, en particulier pour les cas d'utilisation de logging, cette caractéristique est acceptable. Si vous souhaitez que vos messages soient ordonnés, vous devez faire en sorte qu'ils soient attribués à une seule et même partition.

Groupes de consommateurs : flexibilité et résistance aux pannes

Les consommateurs Kafka dont les données sont traitées à partir de sujets similaires forment un groupe de consommateurs identifié par un nom unique au sein du cluster. Les messages publiés dans Kafka sont distribués dans toutes les instances du groupe, mais chaque message n'est géré que par un seul consommateur de ce même groupe. Il n'y a donc pas de chevauchement. Les instances Logstash qui lisent les données dans Kafka forment un groupe de consommateurs dont l'identifiant par défaut est logstash. Vous pouvez créer de nouvelles instances Logstash à tout moment pour adapter votre cadence de lecture en fonction du topic concerné. Par défaut, la nouvelle instance Logstash s'adjoindra au groupe de consommateurs logstash. Ce processus de fusion de groupes déclenche un rééquilibrage de Kafka. Nous avons déjà évoqué l'utilisation que fait Logstash des consommateurs Kafka de haut niveau. Ainsi, la responsabilité du processus de rééquilibrage est transférée à la bibliothèque Kafka. Ce rééquilibrage attribue de façon automatique les partitions aux consommateurs actuels en fonction des métadonnées disponibles dans ZooKeeper. Enfin, faire usage de plusieurs instances Logstash augmente la tolérance aux pannes. Si une instance défaille, Kafka rééquilibre et redistribue les attributions aux instances Logstash existantes.

Ce processus est similaire à celui du paramètre consumer_threads dans l'input Logstash. Ces paramètres contrôlent le nombre de threads qui consomment à partir des partitions Kafka. Idéalement, vous devriez avoir le même nombre de threads et de partitions afin de ménager un équilibre parfait. Si vous avez plus de threads, certains d'entre eux seront inactifs. Si vous en avez moins, certains consommeront à partir de plusieurs partitions.

Imaginez un scénario où un topic apache_logs possède 16 partitions. Je pourrais créer une instance Logstash sur une machine à 8 cœurs avec la configuration suivante :

input {
   kafka {
   zk_connect => "kafka:2181"
   group_id => "logstash"
   topic_id => "apache_logs"
   consumer_threads => 16
  }
}

Ou je pourrais créer 2 instances Logstash sur 2 machines avec des consumer_threads consommant à partir de 8 partitions chacun. La deuxième solution est un meilleur déploiement car elle utilise pleinement le potentiel de la machine et augmente la résistance aux pannes en cas de gros pépin.

Un conseil, faites en sorte que le nombre de partitions soit un multiple du nombre de threads/instances Logstash. Ceci garantit l'équilibre des instances. Le partitionnement, tout comme le sharding Elasticsearch, permet par la suite d'augmenter la capacité de traitement (instances Logstash).

Formats de sérialisation

Kafka conserve les messages dans sa file d'attente en utilisant un tableau d'octets. Ainsi, vous pouvez assaillir Kafka de format divers, même s'il vaut mieux utiliser un format de sérialisation compact et rapide. Kafka est armé pour gérer les formats de message sérialisés. Il spécifie le value_serializer dans les outputs, et le decoder_class dans les inputs. Si vous êtes un utilisateur averti de Logstash, vous devez sûrement avoir les codecs à l'esprit. Il est maintenant possible de tirer profit des codecs Logstash ainsi que des sérialiseurs Kafka pour gérer la représentation des messages au niveau de l'input et de l'output des topics Kafka.

Les autres codecs Logstash utilisables dans l'écosystème Kafka sont plain, avro, et avro_schema_registry.

Si vous souhaitez coder vous même votre sérialiseur/désérialiseur, vous pouvez le faire dans votre langage JVM préféré. Vu que ces classes ne figurent pas dans le chemin des classes Logstash, vous devez absolument ajouter la bibliothèque adéquate dans votre chemin de classes Java.

export CLASSPATH=$CLASSPATH:/path/to/kafkaserializers.jar; bin/logstash -f ..

Attention : décoder les messages dans l'input de Logstash est une action mono-threadée. Un format de sérialisation coûteux comme json fera décroître la performance générale du pipeline.

Conclusion

Dans cet article, nous avons abordé quelques concepts basiques de Kafka. Nous avons également évoqué comment Kafka pouvait être utilisé en conjonction avec la suite Elastic. Dans un prochain article, nous plongerons au cœur des opérations et nous vous donnerons des astuces pour l'exécution de Kafka avec Logstash. N'hésitez pas à nous contacter si vous avez la moindre question, vous pouvez le faire sur notre forum ou sur Twitter !

Nouveau: Vous en voulez plus? Lisez la deuxième partie de cette série consacrée à l'intégration d'Apache Kafka à la Suite Elastic