如何在 Elasticsearch 中查找并移除重复文档

将数据导入 Elasticsearch 的很多系统都将利用 Elasticsearch 为新插入的文档自动生成 ID 值。但是,如果数据源将同一文档多次意外发送到 Elasticsearch,并且对于 Elasticsearch 插入的每个文档都使用了这种自动生成的 _id 值,那么这个文档就会使用不同的 _id 值在 Elasticsearch 中存储多次。如果发生此情况,可能就需要查找并移除此类重复文档。因此,在本篇博文中,我们将介绍如何通过以下两种方法从 Elasticsearch 中检测并移除重复文档:(1) 使用 Logstash;(2) 使用以 Python 语言编写的定制代码。

示例文档结构

就本篇博文而言,我们假设 Elasticsearch 集群中的文档具有以下结构。此结构对应的数据集包含多个表示股市交易的文档。

   {
      "_index": "stocks",
      "_type": "doc",
      "_id":"6fo3tmMB_ieLOlkwYclP",
      "_version":1,
      "found": true,
      "_source": {
        "CAC":1854.6,
        "host":"Alexanders-MBP",
        "SMI":2061.7,
        "@timestamp":"2017-01-09T02:30:00.000Z",
        "FTSE":2827.5,
        "DAX":1527.06,
        "time":"1483929000",
        "message":"1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version":"1"
      }
    }

鉴于这个示例文档结构,在本篇博文中,我们主观假设:如果多个文档的 ["CAC", "FTSE", "SMI"] 字段具有相同的值,则它们相互重复。

使用 Logstash 删除重复的 Elasticsearch 文档

Logstash 可用于从 Elasticsearch 索引中检测并移除重复文档。有关此项技术的描述,请参阅介绍如何使用 Logstash 处理重复文档的博文,这里我们将仅演示一个应用此方法的具体示例。

在下面的示例中,我编写了一个简单的 Logstash 配置,从 Elasticsearch 集群上的索引中读取文档,然后使用指纹筛选器根据 ["CAC", "FTSE", "SMI"] 字段的哈希为每个文档计算一个唯一的 _id 值,最后将每个文档写回到同一 Elasticsearch 集群上的新索引,这样重复的文档将被写入同一 _id,并随之被删除。

此外,只需稍加修改,同样的 Logstash 筛选器还可应用于写入新建索引的未来文档,以确保几乎实时地移除重复文档。为实现这一目的,可以更改以下示例中的输入部分,以接受来自实时输入源的文档,而不是从现有索引中拉取文档。

请注意,使用定制 _id 值(即不是由 Elasticsearch 生成的 _id)将会对索引操作的写入性能造成一定影响

另外值得注意的是,在理论上,根据所使用的哈希算法,这个方法可能会导致 _id 值的哈希冲突数不为零,这可能会使两个不同的文档被映射到同一 _id,从而导致其中一个文档丢失。对于大部分实践用例,发生哈希冲突的可能性极低。对不同哈希函数的详细分析不在本博文讨论范围之内,但应仔细考虑指纹筛选器中使用的哈希函数,因其对采集性能和哈希冲突数都将有影响。

下面介绍了一个使用指纹筛选器对现有索引进行重复文档删除的简单 Logstash 配置。

input {
  # Read all documents from Elasticsearch 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}
# This filter has been updated on February 18, 2019
filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true # <-- New line added since original post date
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

用于删除 Elasticsearch 重复文档的定制 Python 脚本

一种节省内存的方法

如果不使用 Logstash,则可以使用定制 Python 脚本有效地完成重复文档删除。对于这种方法,我们将计算定义为唯一标识文档的 ["CAC", "FTSE", "SMI"] 字段的哈希。然后,我们将此哈希用作 Python 字典中的一个键,其中每个字典条目的关联值将是映射到同一哈希的文档 _ids 的数组。

如果多个文档具有相同哈希,则可以删除映射到同一哈希的重复文档。或者,如果担心可能会发生哈希冲突,则可以检查映射到同一哈希的文档内容,以查看文档是否确实相同,确认后再删除重复文档。

检测算法分析

对于一个大小为 50GB 的索引,如果假设索引包含的文档平均大小为 0.4 KB,则该索引中将有 1.25 亿个文档。在这种情况下,如果使用 128 位 md5 哈希将删除重复的数据结构存储在内存中,则所需内存量约为 128 位 x 1.25 亿 = 2GB 内存,再加上 160 位 _ids 还需要 160 位 x 1.25 亿 = 2.5 GB 内存。因此,这一算法将需要约 4.5GB RAM 才能将所有相关数据结构保留在内存中。此时,如果可以应用下节所讨论的方法,则可以大大减少内存占用。

算法增强功能

在这部分中,我们将介绍一个算法增强功能,用于减少内存使用量并持续移除新的重复文档。

如果您存储的是时序数据,并且知道重复文档仅在短时间内彼此重复,则可通过对索引中的文档子集重复执行此算法(其中每个子集对应一个不同的时间范围)来改进其内存占用情况。例如,如果您有一年的数据,则可以使用日期时间字段上的范围查询(在筛选器上下文中,用于获得最佳性能),一次一周地遍历您的数据集。这就要求该算法执行 52 次(针对每周数据集执行一次),在这种情况下,这种方法可以将极端情况下的内存占用减少 52 倍。

在上例中,您可能会担心检测不到跨周的重复文档。让我们假设您知道重复文档的间隔不能超过 2 小时,那么,您需要确保每次执行的算法都包含与前一次执行算法分析的最后一组文档重叠 2 小时的文档。因此,在上述的每周示例中,您需要查询 170 小时(1 周 + 2 小时)的时序文档,才可确保不漏掉任何重复文档。

如果要持续定期从索引中清除重复文档,则可以对最近收到的文档执行这个算法。与上述逻辑相同,确保分析中包含最近接收的文档,并且与稍旧文档足够重叠,以确保不会无意中漏掉重复文档。

用于检测重复文档的 Python 代码

以下代码演示了如何高效评估文档,以查看它们是否相同,然后根据需要删除。但是,为了防止意外删除文档,在这个示例中,我们不会实际执行删除操作。包含此功能很容易,

也可以在 github 中找到用于从 Elasticsearch 中删除重复文档的代码。

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        # If the hashval is new, then we will create a new key
        # in the dict_of_duplicate_docs, which will be
        # assigned a value of an empty array.
        # We then immediately push the _id onto the array.
        # If hashval already exists, then
        # we will just push the new _id onto the existing array
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})
    # Get the scroll ID
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # Before scroll, process current batch of hits
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # Process current batch of hits
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # Update the scroll ID
        sid = data['_scroll_id']
        # Get the number of results that returned in the last scroll
        scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
    # Search through the hash of doc values to see if any
    # duplicate hashes have been found
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Get the documents that have mapped to the current hashval
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # In this example, we just print the duplicate docs.
            # This code could be easily modified to delete duplicates
            # here instead of printing them
            print("doc=%s\n" % doc)
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
main()

结论

在本篇博文中,我们演示了两种在 Elasticsearch 中删除重复文档的方法。第一种方法使用 Logstash 移除重复文档,第二种方法使用定制 Python 脚本查找并移除重复文档。

如果对 Elasticsearch 重复文档删除或任何其他 Elasticsearch 相关主题有疑问,请在讨论论坛中查看各种宝贵见解和信息。