How to use the Connector API to ingest data into Elasticsearch Serverless

Elasticsearch supports a range of ingestion methods. One of these is Elastic Connectors, which sync external data sources like SQL databases or SharePoint Online with your Elasticsearch index. Connectors are especially useful for building powerful search experiences on top of your existing data. For instance, if you manage an e-commerce website and want to enhance the customer experience with semantic search across your product catalog, Elastic Connectors make it easy. If your product catalog is stored in a database included in Elastic's supported connector sources, you're just a few clicks away from ingesting this data into your index. If your source isn’t currently supported, the connector framework enables you to implement custom connectors and modify existing ones. For more details, you can read How to create customized connectors for Elasticsearch.

With the Connector API now in beta, you can fully manage Elastic connectors directly from your command-line interface. This is especially useful for specific workflows, as it can automate connector management, monitoring and testing, removing the need to go back and forth between a terminal and the Kibana UI.

In this blog post, we will look into syncing a product catalog from MongoDB and indexing it into Elasticsearch to allow for building a search experience. Let’s get started!

Note: We will primarily use terminal commands to execute all steps. However, you can also manage your connectors via the Kibana UI by navigating to the Search -> Connectors section, or by using the Kibana dev console to execute the requests there. Additionally, all API calls are compatible with both Elasticsearch Serverless and any standard ES deployment, whether hosted on Elastic Cloud or your own infrastructure.

Prerequisites

  • Docker installed on your machine
  • curl and jq available in your terminal
  • Elasticsearch Serverless or Elasticsearch version >=8.14.0

Elasticsearch Serverless

We will ingest the data to Elasticsearch Serverless as it allows you to deploy and use Elastic for your use cases without managing the underlying Elastic cluster, such as nodes, data tiers, and scaling. Serverless instances are fully managed, autoscaled, and automatically upgraded by Elastic so you can focus more on gaining value and insight from your data.

You can create your serverless project by navigating to Elastic Cloud deployment overview and clicking Create project in the serverless section.

The next step is to select the right deployment type. Since we are interested in powering a search experience, let’s select Elasticsearch. Your new deployment should be ready within a few minutes.

To securely connect to the Elasticsearch cluster, ensure you have the ES_URL and API_KEY exported as environment variables in your shell console. You can locate their values and export them by following the steps outlined below.

Open your Elasticsearch deployment, and let’s select cURL as our client.

Select your client

Generate an API key for your deployment, you can call it e.g connector-api-key.

Generate API key

Export the ES_URL and API_KEY to your shell console.

Export environment variables

Great! That’s about it with the UI, now feel free to close your ES browser and let’s ingest some data.

Our product catalog

For this example, we assume the product catalog is stored in MongoDB. However, the product catalog could be hosted in any data source supported by Elastic connectors. For any data sources that are not yet supported, the framework allows for the definition of custom connectors.

If you need to set up a local MongoDB instance with example data, you can find a quick guide in the Appendix: Spin up MongoDB instance with Docker, or you can use any other existing MongoDB instances. Keep in mind, using a different instance might require adjustments to the connector configuration described below.

In the following steps, we assume our MongoDB instance includes a product_catalog database with a products collection that contains the following items:

{ name: "Gadget", description: "A useful gadget", price: 19.99, stock_count: 100 }
{ name: "Widget", description: "An essential widget", price: 29.99, stock_count: 0 }
{ name: "Doodad", description: "A fancy doodad", price: 49.99, stock_count: 200 }

Create a MongoDB connector

Now that we have our Elasticsearch running and our example product catalog ready to be synced, we can focus on indexing the data into Elasticsearch.

Let’s start by creating the connector. Our connector will sync the data from MongoDB to the product-catalog ES index. This index will be created with the appropriate mappings during the first data sync, we will get back to this later. Additionally, you can modify the connector index at any time using the update index name API call.

export CONNECTOR_ID=product-catalog-connector
curl -X PUT "${ES_URL}/_connector/${CONNECTOR_ID}" \
-H "Authorization: ApiKey "${API_KEY}"" \
-H "Content-Type: application/json" \
-d'
{
  "service_type": "mongodb",
  "name": "Product Catalog",
  "index_name": "product-catalog"
}
'

The connector should be created. Let’s define our working directory:

export PROJECT_ROOT=$(pwd)

Let’s configure and start the self-managed connector service as described in the connectors documentation: Running from a Docker container:

mkdir $PROJECT_ROOT/connectors-config
cat > $PROJECT_ROOT/connectors-config/config.yml << EOF
connectors:
  - connector_id: $CONNECTOR_ID
    service_type: mongodb
elasticsearch.host: $ES_URL
elasticsearch.api_key: $API_KEY
EOF

Start the local connector service. Check the available versions in the official Docker repository and select the most recently published version.

export CONNECTOR_VERSION=8.13.4.0
docker run \
-v "$PROJECT_ROOT/connectors-config:/config" \
--rm \
--tty -i \
--network host \
docker.elastic.co/enterprise-search/elastic-connectors:$CONNECTOR_VERSION \
/app/bin/elastic-ingest \
-c /config/config.yml

Upon starting the connector service you should a log line looking like this:

[Connector id: product-catalog-connector, index name: product-catalog] Connector is not configured yet....

Verify your connector is connected by using the get connector endpoint and inspect its status (should be needs_configuration) and last_seen field (note that time is reported in UTC). The last_seen field indicates that the connector successfully connected to Elasticsearch.

Note: we are using the jq, a lightweight command-line JSON processor to handle the raw response.

curl -X GET "${ES_URL}/_connector/${CONNECTOR_ID}?pretty" \
-H "Authorization: ApiKey "${API_KEY}""  | jq '{id, index_name, last_seen, status}'

{
  "id": "product-catalog-connector",
  "index_name": "product-catalog",
  "last_seen": "2024-05-13T10:25:52.648635+00:00",
  "status": "error"
}

Now we have to configure the connector to authenticate against MongoDB with our product catalog. For guidance about connector configuration you can always use MongoDB connector reference docs. You can also inspect the registered schema in the connector’s configuration property returned as a part of the get request:

curl -X GET "${ES_URL}/_connector/${CONNECTOR_ID}?pretty" \
-H "Authorization: ApiKey "${API_KEY}""  | jq '.configuration | with_entries(.value |= {label, required, value})'

{
  "tls_insecure": {
    "label": "Skip certificate verification",
    "required": true,
    "value": false
  },
  "password": {
    "label": "Password",
    "required": false,
    "value": ""
  },
  "database": {
    "label": "Database",
    "required": true,
    "value": ""
  },
  "direct_connection": {
    "label": "Direct connection",
    "required": true,
    "value": false
  },
  "ssl_ca": {
    "label": "Certificate Authority (.pem)",
    "required": false,
    "value": ""
  },
  "ssl_enabled": {
    "label": "SSL/TLS Connection",
    "required": true,
    "value": false
  },
  "host": {
    "label": "Server hostname",
    "required": true,
    "value": ""
  },
  "collection": {
    "label": "Collection",
    "required": true,
    "value": ""
  },
  "user": {
    "label": "Username",
    "required": false,
    "value": ""
  }
}

We can set the connector configuration values using the update configuration endpoint. Since connectors are stateless services that communicate with Elasticsearch via the Connector Protocol, you might need to wait a bit after the Connector service starts for the configuration schema to be registered. For our test setup, it should be sufficient to provide the required MongoDB host, database, and collection from which we want to sync the data. I’m intentionally skipping authentication with username and password as we are running MongoDB locally with security disabled to make our toy example simpler.

curl -X PUT "${ES_URL}/_connector/${CONNECTOR_ID}/_configuration" \
-H "Authorization: ApiKey "${API_KEY}"" \
-H "Content-Type: application/json" \
-d'
{
  "values": {
    "host": "mongodb://127.0.0.1:27017",
    "database": "product_catalog",
    "collection": "products"
  }
}
'

Triggering an on-demand sync

Now that we have the connector created and configured and the connector service is running locally we can sync the data to see if everything works end to end.

curl -X POST "${ES_URL}/_connector/_sync_job" \
-H "Authorization: ApiKey ${API_KEY}" \
-H "Content-Type: application/json" \
-d'
{
  "id": "'"$CONNECTOR_ID"'",
  "job_type": "full"
}
'

Upon the first sync the product-catalog index will be created.

It might take up to 30 seconds for the sync to start, you can check when the sync started by inspecting connector service logs, you should see something like:

[Connector id: product-catalog-connector, index name: product-catalog, Sync job id: 37PQYo8BuUEwFes5cC9M] Executing full sync

Alternatively, you can check the status of your last sync job by listing the connector's sync jobs. Reviewing the status, error (if any), and indexed_document_count properties can help you understand the current job's state.

curl -X GET "${ES_URL}/_connector/_sync_job?connector_id=${CONNECTOR_ID}&size=1&pretty" \
-H "Authorization: ApiKey ${API_KEY}" | jq '.results[] | {status, error, indexed_document_count}'

{
  "status": "completed",
  "error": null,
  "indexed_document_count": 3
}

Once the sync job is created, its status will be set to pending, then the connector service will start execution of the sync and the status will change to in_progress.

Eventually the sync job will finish and its status will be set to completed (as in the response above). We can check additional sync stats such as indexed_document_count that is equal to 3 and it matches our dummy dataset count. Yay!

We can inspect the connector index to which the data was indexed, and we should see 3 items as well!

curl -X POST "${ES_URL}/product-catalog/_search?pretty" \
-H "Authorization: ApiKey "${API_KEY}"" |  jq '.hits.total.value'

3

Keeping your ES index in sync with source

In a real life scenario the catalog would be changing. For example there can be changes in the stock count for existing items, or you might introduce more products to your catalog. In that case we can configure our connector to sync the data periodically to stay up-to-date with the source of truth in MongoDB.

Let’s enable sync scheduling and set it to run every 15 mins. We can use update scheduling endpoint for it:

curl -X PUT "${ES_URL}/_connector/${CONNECTOR_ID}/_scheduling" \
-H "Authorization: ApiKey "${API_KEY}"" \
-H "Content-Type: application/json" \
-d'
{
  "scheduling": {
    "full": {
      "enabled": true,
      "interval":  "0 0,15,30,45 * * * ?"
    }
  }
}
'

As long as your connector service would keep running in the background it would take care of scheduling and starting sync jobs at the set time interval. Connector service is lightweight and won’t consume much resources when idle so it should be fine to have it running in the background.

Of course, at any point you can open Kibana and navigate to the Connectors tab to, for example, check their status, job history, or change their scheduling in the UI.

Connectors Kibana UI

Sync Rules - Index only what you want

While your product catalog may feature thousands of items, perhaps only a handful are currently in stock (see stock_count in our example).

In the context of our product catalog, let's say we aim to index only those products that are in stock. Consequently, the product “Widget” which is out of stock, should be excluded from our search index.

There are two approaches supported by connector service to accomplish this:

  • Basic Sync Rules: These rules enable you to filter content at the connector level before it is indexed to Elasticsearch (ES). Essentially, all data is retrieved from your source, but you can control which data to forward using basic filtering rules. This functionality is available for all connectors.
  • Advanced Sync Rules: This method allows you to filter data directly at the source. Consider an SQL statement like: SELECT * WHERE stock_count > 0;, it enables you to fetch only the data you need from your source. Advanced sync rules can significantly improve sync times, especially when indexing only a fraction of your complete dataset. Note that advanced filtering is available for a select group of connectors.

In our case, see MongoDB connector documentation about supported sync rules. We can set the advanced filtering rule to index only in-stock products with the update filtering endpoint that allows you to draft sync rules:

curl -X PUT "${ES_URL}/_connector/${CONNECTOR_ID}/_filtering" \
-H "Authorization: ApiKey "${API_KEY}"" \
-H "Content-Type: application/json" \
-d'
{
  "advanced_snippet": {
    "value": {
      "find": {
        "filter": {
          "stock_count": {
            "$gt": 0
          }
        }
      }
    }
  }
}
'

Now the connector will validate the draft of sync rules. The initial draft validation state wil be edited. If the provided sync rule draft is valid its validation state will be marked as valid and the draft sync rules will be activated by the running connector service.

You can always check the validation status of your sync rule draft by inspecting the output of GET _connector/product-catalog-connector request. If your draft has been validated you should see:

  • its filtering draft validation status marked as valid
  • it should be listed as an active filtering (since your draft was activated)
curl -X GET "${ES_URL}/_connector/${CONNECTOR_ID}?pretty" \
-H "Authorization: ApiKey "${API_KEY}"" | jq '{filtering}'

{
  "filtering": [
    {
      "domain": "DEFAULT",
      "draft": {
        "advanced_snippet": {
          "updated_at": "2024-05-09T12:49:18.155532096Z",
          "created_at": "2024-05-09T12:49:18.155532096Z",
          "value": {
            "find": {
              "filter": {
                "stock_count": {
                  "$gt": 0
                }
              }
            }
          }
        },
        "rules": [
          ...
        ],
        "validation": {
          "state": "valid",
          "errors": []
        }
      },
      "active": {
        "advanced_snippet": {
          "updated_at": "2024-05-09T12:49:18.155532096Z",
          "created_at": "2024-05-09T12:49:18.155532096Z",
          "value": {
            "find": {
              "filter": {
                "stock_count": {
                  "$gt": 0
                }
              }
            }
          }
        },
        "rules": [
        ...
        ],
        "validation": {
          "state": "valid",
          "errors": []
        }
      }
    }
  ]
}

In case the advanced filtering rule is not syntactically correct, e.g. the filter keyword in MongoDB query would have a typo filterrr, you should see appropriate error in the draft’s validation section, example:

{
  "filtering": [
    {
      "domain": "DEFAULT",
      "draft": {
        "advanced_snippet": {
          "updated_at": "2024-05-10T13:26:11.777102054Z",
          "created_at": "2024-05-10T13:26:11.777102054Z",
          "value": {
            "find": {
              "filterrr": {
                "stock_count": {
                  "$gt": 0
                }
              }
            }
          }
        },
        "rules": [...],
        "validation": {
          "state": "invalid",
          "errors": [
            {
              "ids": [
                "advanced_snippet"
              ],
              "messages": [
                "data.find must not contain {'filterrr'} properties"
              ]
            }
          ]
        }
      },
      "active": {...}
    }
  ]
}

The next sync should only index items that are in stock, so now you should find 2 products in your search index that have stock_count bigger than 0.

curl -X POST "${ES_URL}/product-catalog/_search?pretty" \
-H "Authorization: ApiKey "${API_KEY}"" |  jq '.hits.total.value'

2

Generate vector embeddings at ingest time

To enable semantic search, generate vector embeddings of your data during ingestion. You'll need to adjust your index mappings and create an ingest pipeline. For detailed instructions, check out this section from: Using Elasticsearch as a vector database or refer to Tutorial: semantic search with ELSER.

Once your pipeline is created and configured, and assuming it's named e5-small-product-catalog, you can add the custom ingest pipeline to your connector using the following command:

curl -X PUT "${ES_URL}/_connector/${CONNECTOR_ID}/_pipeline?pretty" \
-H "Authorization: ApiKey "${API_KEY}"" \
-H "Content-Type: application/json" \
-d'
{
    "pipeline": {
        "extract_binary_content": true,
        "name": "e5-small-product-catalog",
        "reduce_whitespace": true,
        "run_ml_inference": true
    }
}
'

This would automate vector embeddings generation for your data every time it syncs from the source.

Monitoring your connectors

There are two endpoints that are particularly useful for connectors monitoring:

For example, you could set up a periodic connector health check that would:

  • Retrieve IDs of all connectors
  • For each connector ID, retrieve associate sync jobs
    • Record a sync job type (full, incremental, access_control)
    • Track its status, e.g. pending, in_progress, error, canceled or completed
    • Track the average sync duration
    • Track any errors

The API calls would look as follows, starting with retrieving your connector ID:

curl "${ES_URL}/_connector?pretty" -H "Authorization: ApiKey "${API_KEY}"" | jq '[.results[] | {id}]'

[
  {
    "id": "product-catalog-connector"
  }
]

For each connector from the above call, let’s list its sync job history, note jobs are listed in the most-recent order:

curl -X GET "${ES_URL}/_connector/_sync_job?connector_id=product-catalog-connector&size=100&job_type=full" \
-H "Authorization: ApiKey "${API_KEY}"" | jq '[.results[] | {id, job_type, status, indexed_document_count, total_document_count, started_at, completed_at, error}]'

[
  {
    "id": "fAJlZo8BPmy2hB-4_7jr",
    "job_type": "full",
    "status": "completed",
    "indexed_document_count": 2,
    "total_document_count": 2,
    "started_at": "2024-05-11T06:45:39.779977+00:00",
    "completed_at": "2024-05-11T06:45:43.764303+00:00",
    "error": null
  }
]

If your connector handles sync jobs of various types, you can set the job_type URL parameter to full, incremental, or access_control. If this parameter is not provided, the response will not be filtered by job type.

You could use the status field to monitor your system's health. Consider these scenarios:

  • If the percentage of failed jobs (status == error) is increasing, it might indicate issues with your data source or Elasticsearch cluster. In such cases, examine the populated error field to identify the specific problem.
  • If the number of pending jobs (status == pending) is rising, it could suggest that your connector service is unable to maintain the desired schedule. For instance, syncing a large data source with a schedule set to run every minute might lead to backpressure in the framework. New pending jobs will continue to be scheduled, but we may not start and complete them before the next scheduled jobs are due to begin.

Additionally, you can use started_at and completed_at to track the average duration of a sync. A sudden change in average sync duration might be a good condition to raise an automated alert.

Tips on automation

The Connector API can be a powerful tool for supporting automation. Here are a few tips for automating your workflows.

  • When using version control for connector configurations, be careful not to expose configuration values. Utilize dedicated solutions like GitHub environment variables or integrations with services such as Vault.
  • The same caution applies to any credentials (username, password, and API key) used to connect to Elasticsearch.
  • Any change applied to connector service config.yml (e.g. adding a new connector) requires a service restart.

Next steps

Now, the connector service will continuously sync your database with the Elasticsearch index, allowing you to shift your focus to refining the search experience built on top of our data.

If you wish to incorporate more data from additional sources, you can set up and configure more connectors. Refer to Elastic’s connector catalog for supported data sources. If your source is not currently supported, the connector framework allows you to develop custom connectors and modify existing ones. For more details, see the guide on creating customized connectors for Elasticsearch.

Using Elasticsearch Serverless offers significant benefits for any developer. It is fully managed, autoscales, and is automatically upgraded by Elastic, granting you access to new features and capabilities as soon as they are released. This allows you to focus more on gaining value and insights from your data without the need to manage the underlying cluster.

For the next steps for building a search experience, I recommend reading the following articles:

Also, explore our Elasticsearch client libraries, which can help accelerate the development of your search experience.

Appendix

Spin up MongoDB instance with Docker

Let’s use Docker to spin up a database with some example data for the purpose of this blog post.

docker run --name mongodb -d -p 27017:27017 mongo:latest

Once the instance starts you can prepare and copy the script to insert dummy data to your product catalog.

Here is the insert-data.js

// Connect to the MongoDB database
db = db.getSiblingDB('product_catalog');

// Insert data into the 'products' collection
db.products.insertMany([
  { name: "Gadget", description: "A useful gadget", price: 19.99, stock_count: 100 },
  { name: "Widget", description: "An essential widget", price: 29.99, stock_count: 0 },
  { name: "Doodad", description: "A fancy doodad", price: 49.99, stock_count: 200 }
]);

Copy the script to container and insert data into MongoDB:

docker cp insert-data.js mongodb:/insert-data.js
docker exec -it mongodb mongosh product_catalog /insert-data.js

You can verify your data is present by querying for entries from product_catalog db, this command should return your 3 entries:

docker exec -it mongodb mongosh product_catalog --eval "db.products.find().toArray()"

Now you should have your MongoDB instance running with the product catalog data ready to be used with this example.

You can build search with data from any source. Check out this webinar to learn about different connectors and sources that Elasticsearch supports.
Ready to try this out on your own? Start a free trial.
Recommended Articles