Tutorial: Disaster recovery based on bi-directional cross-cluster replication

edit

Tutorial: Disaster recovery based on bi-directional cross-cluster replication

edit

Learn how to set up disaster recovery between two clusters based on bi-directional cross-cluster replication. The following tutorial is designed for data streams which support update by query and delete by query. You can only perform these actions on the leader index.

This tutorial works with Logstash as the source of ingestion. It takes advantage of a Logstash feature where the Logstash output to Elasticsearch can be load balanced across an array of hosts specified. Beats and Elastic Agents currently do not support multiple outputs. It should also be possible to set up a proxy (load balancer) to redirect traffic without Logstash in this tutorial.

  • Setting up a remote cluster on clusterA and clusterB.
  • Setting up bi-directional cross-cluster replication with exclusion patterns.
  • Setting up Logstash with multiple hosts to allow automatic load balancing and switching during disasters.
Bi-directional cross cluster replication failover and failback

Initial setup

edit
  1. Set up a remote cluster on both clusters.

    resp = client.cluster.put_settings(
        persistent={
            "cluster": {
                "remote": {
                    "clusterB": {
                        "mode": "proxy",
                        "skip_unavailable": True,
                        "server_name": "clusterb.es.region-b.gcp.elastic-cloud.com",
                        "proxy_socket_connections": 18,
                        "proxy_address": "clusterb.es.region-b.gcp.elastic-cloud.com:9400"
                    }
                }
            }
        },
    )
    print(resp)
    
    resp1 = client.cluster.put_settings(
        persistent={
            "cluster": {
                "remote": {
                    "clusterA": {
                        "mode": "proxy",
                        "skip_unavailable": True,
                        "server_name": "clustera.es.region-a.gcp.elastic-cloud.com",
                        "proxy_socket_connections": 18,
                        "proxy_address": "clustera.es.region-a.gcp.elastic-cloud.com:9400"
                    }
                }
            }
        },
    )
    print(resp1)
    response = client.cluster.put_settings(
      body: {
        persistent: {
          cluster: {
            remote: {
              "clusterB": {
                mode: 'proxy',
                skip_unavailable: true,
                server_name: 'clusterb.es.region-b.gcp.elastic-cloud.com',
                proxy_socket_connections: 18,
                proxy_address: 'clusterb.es.region-b.gcp.elastic-cloud.com:9400'
              }
            }
          }
        }
      }
    )
    puts response
    
    response = client.cluster.put_settings(
      body: {
        persistent: {
          cluster: {
            remote: {
              "clusterA": {
                mode: 'proxy',
                skip_unavailable: true,
                server_name: 'clustera.es.region-a.gcp.elastic-cloud.com',
                proxy_socket_connections: 18,
                proxy_address: 'clustera.es.region-a.gcp.elastic-cloud.com:9400'
              }
            }
          }
        }
      }
    )
    puts response
    const response = await client.cluster.putSettings({
      persistent: {
        cluster: {
          remote: {
            clusterB: {
              mode: "proxy",
              skip_unavailable: true,
              server_name: "clusterb.es.region-b.gcp.elastic-cloud.com",
              proxy_socket_connections: 18,
              proxy_address: "clusterb.es.region-b.gcp.elastic-cloud.com:9400",
            },
          },
        },
      },
    });
    console.log(response);
    
    const response1 = await client.cluster.putSettings({
      persistent: {
        cluster: {
          remote: {
            clusterA: {
              mode: "proxy",
              skip_unavailable: true,
              server_name: "clustera.es.region-a.gcp.elastic-cloud.com",
              proxy_socket_connections: 18,
              proxy_address: "clustera.es.region-a.gcp.elastic-cloud.com:9400",
            },
          },
        },
      },
    });
    console.log(response1);
    ### On cluster A ###
    PUT _cluster/settings
    {
      "persistent": {
        "cluster": {
          "remote": {
            "clusterB": {
              "mode": "proxy",
              "skip_unavailable": true,
              "server_name": "clusterb.es.region-b.gcp.elastic-cloud.com",
              "proxy_socket_connections": 18,
              "proxy_address": "clusterb.es.region-b.gcp.elastic-cloud.com:9400"
            }
          }
        }
      }
    }
    ### On cluster B ###
    PUT _cluster/settings
    {
      "persistent": {
        "cluster": {
          "remote": {
            "clusterA": {
              "mode": "proxy",
              "skip_unavailable": true,
              "server_name": "clustera.es.region-a.gcp.elastic-cloud.com",
              "proxy_socket_connections": 18,
              "proxy_address": "clustera.es.region-a.gcp.elastic-cloud.com:9400"
            }
          }
        }
      }
    }
  2. Set up bi-directional cross-cluster replication.

    resp = client.ccr.put_auto_follow_pattern(
        name="logs-generic-default",
        remote_cluster="clusterB",
        leader_index_patterns=[
            ".ds-logs-generic-default-20*"
        ],
        leader_index_exclusion_patterns="*-replicated_from_clustera",
        follow_index_pattern="{{leader_index}}-replicated_from_clusterb",
    )
    print(resp)
    
    resp1 = client.ccr.put_auto_follow_pattern(
        name="logs-generic-default",
        remote_cluster="clusterA",
        leader_index_patterns=[
            ".ds-logs-generic-default-20*"
        ],
        leader_index_exclusion_patterns="*-replicated_from_clusterb",
        follow_index_pattern="{{leader_index}}-replicated_from_clustera",
    )
    print(resp1)
    const response = await client.ccr.putAutoFollowPattern({
      name: "logs-generic-default",
      remote_cluster: "clusterB",
      leader_index_patterns: [".ds-logs-generic-default-20*"],
      leader_index_exclusion_patterns: "*-replicated_from_clustera",
      follow_index_pattern: "{{leader_index}}-replicated_from_clusterb",
    });
    console.log(response);
    
    const response1 = await client.ccr.putAutoFollowPattern({
      name: "logs-generic-default",
      remote_cluster: "clusterA",
      leader_index_patterns: [".ds-logs-generic-default-20*"],
      leader_index_exclusion_patterns: "*-replicated_from_clusterb",
      follow_index_pattern: "{{leader_index}}-replicated_from_clustera",
    });
    console.log(response1);
    ### On cluster A ###
    PUT /_ccr/auto_follow/logs-generic-default
    {
      "remote_cluster": "clusterB",
      "leader_index_patterns": [
        ".ds-logs-generic-default-20*"
      ],
      "leader_index_exclusion_patterns":"*-replicated_from_clustera",
      "follow_index_pattern": "{{leader_index}}-replicated_from_clusterb"
    }
    
    ### On cluster B ###
    PUT /_ccr/auto_follow/logs-generic-default
    {
      "remote_cluster": "clusterA",
      "leader_index_patterns": [
        ".ds-logs-generic-default-20*"
      ],
      "leader_index_exclusion_patterns":"*-replicated_from_clusterb",
      "follow_index_pattern": "{{leader_index}}-replicated_from_clustera"
    }

    Existing data on the cluster will not be replicated by _ccr/auto_follow even though the patterns may match. This function will only replicate newly created backing indices (as part of the data stream).

    Use leader_index_exclusion_patterns to avoid recursion.

    follow_index_pattern allows lowercase characters only.

    This step cannot be executed via the Kibana UI due to the lack of an exclusion pattern in the UI. Use the API in this step.

  3. Set up the Logstash configuration file.

    This example uses the input generator to demonstrate the document count in the clusters. Reconfigure this section to suit your own use case.

    ### On Logstash server ###
    ### This is a logstash config file ###
    input {
      generator{
        message => 'Hello World'
        count => 100
      }
    }
    output {
      elasticsearch {
        hosts => ["https://clustera.es.region-a.gcp.elastic-cloud.com:9243","https://clusterb.es.region-b.gcp.elastic-cloud.com:9243"]
        user => "logstash-user"
        password => "same_password_for_both_clusters"
      }
    }

    The key point is that when cluster A is down, all traffic will be automatically redirected to cluster B. Once cluster A comes back, traffic is automatically redirected back to cluster A again. This is achieved by the option hosts where multiple ES cluster endpoints are specified in the array [clusterA, clusterB].

    Set up the same password for the same user on both clusters to use this load-balancing feature.

  4. Start Logstash with the earlier configuration file.

    ### On Logstash server ###
    bin/logstash -f multiple_hosts.conf
  5. Observe document counts in data streams.

    The setup creates a data stream named logs-generic-default on each of the clusters. Logstash will write 50% of the documents to cluster A and 50% of the documents to cluster B when both clusters are up.

    Bi-directional cross-cluster replication will create one more data stream on each of the clusters with the -replication_from_cluster{a|b} suffix. At the end of this step:

    • data streams on cluster A contain:

      • 50 documents in logs-generic-default-replicated_from_clusterb
      • 50 documents in logs-generic-default
    • data streams on cluster B contain:

      • 50 documents in logs-generic-default-replicated_from_clustera
      • 50 documents in logs-generic-default
  6. Queries should be set up to search across both data streams. A query on logs*, on either of the clusters, returns 100 hits in total.

    resp = client.search(
        index="logs*",
        size="0",
    )
    print(resp)
    response = client.search(
      index: 'logs*',
      size: 0
    )
    puts response
    const response = await client.search({
      index: "logs*",
      size: 0,
    });
    console.log(response);
    GET logs*/_search?size=0

Failover when clusterA is down

edit
  1. You can simulate this by shutting down either of the clusters. Let’s shut down cluster A in this tutorial.
  2. Start Logstash with the same configuration file. (This step is not required in real use cases where Logstash ingests continuously.)

    ### On Logstash server ###
    bin/logstash -f multiple_hosts.conf
  3. Observe all Logstash traffic will be redirected to cluster B automatically.

    You should also redirect all search traffic to the clusterB cluster during this time.

  4. The two data streams on cluster B now contain a different number of documents.

    • data streams on cluster A (down)

      • 50 documents in logs-generic-default-replicated_from_clusterb
      • 50 documents in logs-generic-default
    • data streams On cluster B (up)

      • 50 documents in logs-generic-default-replicated_from_clustera
      • 150 documents in logs-generic-default

Failback when clusterA comes back

edit
  1. You can simulate this by turning cluster A back on.
  2. Data ingested to cluster B during cluster A 's downtime will be automatically replicated.

    • data streams on cluster A

      • 150 documents in logs-generic-default-replicated_from_clusterb
      • 50 documents in logs-generic-default
    • data streams on cluster B

      • 50 documents in logs-generic-default-replicated_from_clustera
      • 150 documents in logs-generic-default
  3. If you have Logstash running at this time, you will also observe traffic is sent to both clusters.

Perform update or delete by query

edit

It is possible to update or delete the documents but you can only perform these actions on the leader index.

  1. First identify which backing index contains the document you want to update.

    resp = client.search(
        index="logs-generic-default*",
        filter_path="hits.hits._index",
        query={
            "match": {
                "event.sequence": "97"
            }
        },
    )
    print(resp)
    response = client.search(
      index: 'logs-generic-default*',
      filter_path: 'hits.hits._index',
      body: {
        query: {
          match: {
            'event.sequence' => '97'
          }
        }
      }
    )
    puts response
    const response = await client.search({
      index: "logs-generic-default*",
      filter_path: "hits.hits._index",
      query: {
        match: {
          "event.sequence": "97",
        },
      },
    });
    console.log(response);
    ### On either of the cluster ###
    GET logs-generic-default*/_search?filter_path=hits.hits._index
    {
    "query": {
        "match": {
          "event.sequence": "97"
        }
      }
    }
    • If the hits returns "_index": ".ds-logs-generic-default-replicated_from_clustera-<yyyy.MM.dd>-*", then you need to proceed to the next step on cluster A.
    • If the hits returns "_index": ".ds-logs-generic-default-replicated_from_clusterb-<yyyy.MM.dd>-*", then you need to proceed to the next step on cluster B.
    • If the hits returns "_index": ".ds-logs-generic-default-<yyyy.MM.dd>-*", then you need to proceed to the next step on the same cluster where you performed the search query.
  2. Perform the update (or delete) by query:

    resp = client.update_by_query(
        index="logs-generic-default",
        query={
            "match": {
                "event.sequence": "97"
            }
        },
        script={
            "source": "ctx._source.event.original = params.new_event",
            "lang": "painless",
            "params": {
                "new_event": "FOOBAR"
            }
        },
    )
    print(resp)
    response = client.update_by_query(
      index: 'logs-generic-default',
      body: {
        query: {
          match: {
            'event.sequence' => '97'
          }
        },
        script: {
          source: 'ctx._source.event.original = params.new_event',
          lang: 'painless',
          params: {
            new_event: 'FOOBAR'
          }
        }
      }
    )
    puts response
    const response = await client.updateByQuery({
      index: "logs-generic-default",
      query: {
        match: {
          "event.sequence": "97",
        },
      },
      script: {
        source: "ctx._source.event.original = params.new_event",
        lang: "painless",
        params: {
          new_event: "FOOBAR",
        },
      },
    });
    console.log(response);
    ### On the cluster identified from the previous step ###
    POST logs-generic-default/_update_by_query
    {
      "query": {
        "match": {
          "event.sequence": "97"
        }
      },
      "script": {
        "source": "ctx._source.event.original = params.new_event",
        "lang": "painless",
        "params": {
          "new_event": "FOOBAR"
        }
      }
    }

    If a soft delete is merged away before it can be replicated to a follower the following process will fail due to incomplete history on the leader, see index.soft_deletes.retention_lease.period for more details.