Rechercher et supprimer des documents en double dans Elasticsearch
La plupart des systèmes qui intègrent des données dans Elasticsearch tirent parti des valeurs d’ID auto-générées d’Elasticsearch lors de l’insertion de nouveaux documents. Le problème, c’est que si la source de données envoie le même document par inadvertance plusieurs fois à Elasticsearch et si une valeur auto-générée _id
est utilisée pour chaque document qu’Elasticsearch insère, alors ce document sera stocké plusieurs fois dans Elasticsearch avec des valeurs _id
différentes. Si une telle situation se produit, il peut être nécessaire de rechercher et de supprimer les documents en double. Dans cet article de blog, nous allons donc voir comment identifier les documents en double et les supprimer à partir d’Elasticsearch (1) avec Logstash, ou (2) avec un code personnalisé écrit en Python.
Exemple de structure de documents
Dans le cadre de cet article de blog, nous allons partir du principe que les documents se trouvant dans le cluster Elasticsearch ont la structure suivante. Il s’agit ici d’un ensemble de données qui contient des documents représentant des transactions boursières.
{ "_index": "stocks", "_type": "doc", "_id": "6fo3tmMB_ieLOlkwYclP", "_version": 1, "found": true, "_source": { "CAC": 1854.6, "host": "Alexanders-MBP", "SMI": 2061.7, "@timestamp": "2017-01-09T02:30:00.000Z", "FTSE": 2827.5, "DAX": 1527.06, "time": "1483929000", "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r", "@version": "1" } }
À partir de cette structure, nous supposerons que si plusieurs documents ont les mêmes valeurs pour les champs ["CAC", "FTSE", "SMI"]
, c’est qu’ils sont des doublons les uns des autres.
Utilisation de Logstash pour la déduplication des documents Elasticsearch
Vous pouvez utiliser Logstash pour identifier les doublons dans un index Elasticsearch, puis les supprimer. Pour savoir comment procéder, consultez cet article de blog sur la gestion des doublons avec Logstash. Nous allons voir dans cette section un exemple concret de la mise en application de cette approche.
Dans l’exemple ci-dessous, j’ai programmé une configuration Logstash simple qui lit les documents à partir d’un index d’un cluster Elasticsearch, puis qui utilise le filtre d’empreinte pour calculer une valeur _id
unique pour chaque document selon le hachage des champs ["CAC", "FTSE", "SMI"]
, et pour finir, qui réécrit chaque document dans un nouvel index sur ce même cluster Elasticsearch de sorte que tous les documents en double soient enregistrés avec le même _id
, et de là, éliminés.
De plus, avec quelques modifications mineures, le même filtre Logstash pourrait être également appliqué aux futurs documents écrits dans le nouvel index, pour faire en sorte que les doublons soient supprimés en temps quasi-réel. Il suffirait de changer la section d’entrée de l’exemple ci-dessous pour accepter les documents de votre source d’entrée en temps réel plutôt que de sortir les documents d’un index existant.
Sachez que l’utilisation de valeurs _id
personnalisées (c.-à-d. un _id
qui n’est pas généré par Elasticsearch) aura une incidence sur les performances d’écriture de vos opérations d’index.
Il est aussi important de signaler que, selon l’algorithme de hachage utilisé, cette approche peut théoriquement aboutir à un certain nombre de collisions de hachages pour la valeur _id
, ce qui entraînerait le mappage de deux documents non identiques au même _id
, et de là, causerait la perte de l’un de ces documents. Pour la plupart des cas pratiques, la probabilité d’une collision de hachages reste très faible. Dans cet article de blog, nous ne verrons pas en détail les différentes fonctions de hachage, car ce n’est pas le but. Néanmoins, vous devez choisir avec soin la fonction de hachage utilisée dans le filtre d’empreinte, car elle aura un impact sur les performances d’ingestion ainsi que sur le nombre de collisions de hachage.
Vous trouverez ci-dessous une configuration Logstash simple pour dédupliquer un index existant à l’aide du filtre d’empreinte.
input { # Lecture de tous les documents à partir d’Elasticsearch elasticsearch { hosts => "localhost" index => "stocks" query => '{ "sort": [ "_doc" ] }' } } # Ce filtre a été mis à jour le 18 février 2019 filter { fingerprint { key => "1234ABCD" method => "SHA256" source => ["CAC", "FTSE", "SMI"] target => "[@metadata][generated_id]" concatenate_sources => true # <-- Nouvelle ligne ajoutée depuis la date de publication d’origine } } output { stdout { codec => dots } elasticsearch { index => "stocks_after_fingerprint" document_id => "%{[@metadata][generated_id]}" } }
Script Python personnalisé pour la déduplication de documents Elasticsearch
Une approche efficace en termes de mémoire
Si vous n’utilisez pas Logstash pour la déduplication, vous pouvez alors recourir à un script Python personnalisé. Dans cette approche, nous calculons le hachage des champs ["CAC", "FTSE", "SMI"]
que nous avons défini pour identifier un document de façon unique. Nous utilisons ensuite ce hachage comme une clé dans un dictionnaire Python, dans lequel la valeur associée de chaque entrée de dictionnaire représentera un éventail des valeurs _id
des documents mappés au même hachage.
Si plusieurs documents ont le même hachage, les documents en double mappés au même hachage peuvent être supprimés. Cependant, si vous êtes préoccupé par le fait qu’il puisse y avoir des collisions de hachage, alors le contenu des documents mappés au même hachage peut être examiné pour vérifier si ces documents sont réellement identiques, et si tel est le cas, les doublons peuvent être alors supprimés.
Analyse de l’algorithme de détection
Prenons un index de 50 Go. Supposons qu’il contienne des documents d’une taille moyenne de 0,4 Ko. Il y aurait alors 125 millions de documents dans l’index. Le volume de mémoire requis pour y stocker les structures de données de déduplication lors de l’utilisation d’un hachage md5 de 128 bits serait de l’ordre de 128 bits x 125 millions = 2 Go de mémoire, plus les valeurs _id
de 160 bits, qui nécessiteraient 160 bits x 125 millions = 2,5 Go de mémoire. Cet algorithme aurait donc besoin d’une RAM de l’ordre de 4,5 Go pour conserver toutes les structures de données pertinentes en mémoire. Il est possible de réduire considérablement cette empreinte mémoire en appliquant l’approche que nous allons voir dans la section suivante.
Amélioration de l’algorithme
Dans cette section, nous allons voir comment améliorer notre algorithme pour réduire l’utilisation de mémoire ainsi que pour supprimer les documents en double de façon continue.
Si vous stockez des données temporelles et que vous savez que les documents en double sont générés uniquement sur une période limitée les uns par rapport aux autres, vous pouvez alors améliorer l’empreinte mémoire de cet algorithme en exécutant de façon répétée l’algorithme sur un sous-ensemble de documents de l’index, chaque sous-ensemble correspondant à une fenêtre temporelle différente. Par exemple, si vous avez recueilli des données sur plusieurs années, vous pourriez utiliser des demandes de recherche de plage dans votre champ d’horodatage (à l’intérieur d’un contexte de filtre pour de meilleures performances) pour parcourir votre ensemble de données semaine par semaine. Pour cela, il faudrait que l’algorithme soit exécuté 52 fois (une fois par semaine). Et dans ce cas, cette approche permettrait de réduire la "pire" empreinte mémoire d’un facteur de 52.
Dans l’exemple ci-dessus, vous pouvez être préoccupé par le fait de ne pas identifier les documents en double qui s’étendent sur plusieurs semaines. Partons du principe que les documents en double ne peuvent pas être séparés de plus de deux heures. Ce qu’il vous faudrait faire dans ce cas serait d’inclure dans l’exécution de l’algorithme les documents qui chevauchent d’au maximum deux heures le dernier ensemble de documents analysés par l’exécution précédente de l’algorithme. Reprenons le cas d’une analyse hebdomadaire. Il vous faudrait alors faire une recherche de documents sur 170 heures (1 semaine + 2 h) pour vous assurer qu’aucun doublon n’a été oublié.
Si vous souhaitez éliminer périodiquement les doublons de vos index de façon continue, vous pouvez exécuter cet algorithme sur les documents récemment reçus. La même logique que ci-dessus s’applique : assurez-vous que les documents récemment reçus sont inclus dans l’analyse avec un temps de chevauchement suffisant par rapport aux documents légèrement plus anciens pour vérifier qu’aucun doublon n’est oublié par inadvertance.
Utilisation du code Python pour identifier les doublons
Le code ci-dessous montre comment les documents peuvent être évalués avec efficacité pour voir s’ils sont identiques, et supprimés si nécessaire. Notez toutefois que pour éviter toute suppression accidentelle de documents, dans cet exemple, nous n’exécutons pas réellement une opération de suppression. Une fonctionnalité de ce type est néanmoins simple à mettre en place.
Le code permettant de dédupliquer les documents à partir d’Elasticsearch est aussi proposé sur github.
#!/usr/local/bin/python3 import hashlib from elasticsearch import Elasticsearch es = Elasticsearch(["localhost:9200"]) dict_of_duplicate_docs = {} # La ligne suivante définit les champs qui seront # utilisés pour déterminer si un document est un doublon ou non keys_to_include_in_hash = ["CAC", "FTSE", "SMI"] # Traitement des documents renvoyés par la recherche actuelle ou le défilement def populate_dict_of_duplicate_docs(hits): for item in hits: combined_key = "" for mykey in keys_to_include_in_hash: combined_key += str(item['_source'][mykey]) _id = item["_id"] hashval = hashlib.md5(combined_key.encode('utf-8')).digest() # Si la valeur du hachage (hashval) est nouvelle, nous créerons une nouvelle clé # dans dict_of_duplicate_docs, qui se verra attribuer # une valeur d’un tableau vide. # Nous intégrerons ensuite immédiatement la valeur _id dans le tableau. # Si la valeur hashval existe déjà, alors # nous indiquerons tout simplement la nouvelle valeur _id dans le tableau existant. dict_of_duplicate_docs.setdefault(hashval, []).append(_id) # Créez une boucle avec tous les documents de l’index, puis renseignez la # structure de données dict_of_duplicate_docs. def scroll_over_all_docs(): data = es.search(index="stocks", scroll='1m', body={"query": {"match_all": {}}}) # Obtenez l’ID de défilement sid = data['_scroll_id'] scroll_size = len(data['hits']['hits']) # Avant de procéder au défilement, traitez le lot de résultats en cours populate_dict_of_duplicate_docs(data['hits']['hits']) while scroll_size > 0: data = es.scroll(scroll_id=sid, scroll='2m') # Traitez le lot de résultats en cours populate_dict_of_duplicate_docs(data['hits']['hits']) # Mettez l’ID de défilement à jour sid = data['_scroll_id'] # Obtenez le nombre de résultats renvoyés lors du dernier défilement scroll_size = len(data['hits']['hits']) def loop_over_hashes_and_remove_duplicates(): # Parcourez les valeurs de hachage pour déterminer s’il existe # des hachages en double for hashval, array_of_ids in dict_of_duplicate_docs.items(): if len(array_of_ids) > 1: print("********** Duplicate docs hash=%s **********" % hashval) # Obtenez les documents mappés à la valeur hashval actuelle matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids}) for doc in matching_docs['docs']: # Dans cet exemple, nous avons simplement imprimé les documents en double. # Il est tout à fait possible (et simple !) de modifier ce code pour supprimer les doublons # ici au lieu de les imprimer. print("doc=%s\n" % doc) def main(): scroll_over_all_docs() loop_over_hashes_and_remove_duplicates() main()
Conclusion
Dans cet article de blog, nous avons vu deux méthodes pour la déduplication de documents dans Elasticsearch. La première méthode s’appuie sur Logstash pour supprimer les documents en double. La deuxième consiste à utiliser un script Python personnalisé pour identifier et supprimer les doublons.
Pour toute question concernant la déduplication des documents Elasticsearch, ou pour tout autre sujet concernant Elasticsearch, jetez un œil à nos forums de discussion pour obtenir des informations et des perspectives intéressantes.