Connector API tutorial
editConnector API tutorial
editLearn how to set up a self-managed connector using the Elasticsearch Connector APIs.
For this example we’ll use the connectors-postgresql,PostgreSQL connector to sync data from a PostgreSQL database to Elasticsearch. We’ll spin up a simple PostgreSQL instance in Docker with some example data, create a connector, and sync the data to Elasticsearch. You can follow the same steps to set up a connector for another data source.
This tutorial focuses on running a self-managed connector on your own infrastructure, and managing syncs using the Connector APIs. See connectors for an overview of how connectors work.
If you’re just getting started with Elasticsearch, this tutorial might be a bit advanced. Refer to quickstart for a more beginner-friendly introduction to Elasticsearch.
If you’re just getting started with connectors, you might want to start in the UI first. We have two tutorials that focus on managing connectors using the UI:
- Elastic managed connector tutorial. Set up a native MongoDB connector, fully managed in Elastic Cloud.
- self-managed connector tutorial. Set up a self-managed PostgreSQL connector.
Prerequisites
edit- You should be familiar with how connectors, connectors work, to understand how the API calls relate to the overall connector setup.
- You need to have Docker Desktop installed.
- You need to have Elasticsearch running, and an API key to access it. Refer to the next section for details, if you don’t have an Elasticsearch deployment yet.
Set up Elasticsearch
editIf you already have an Elasticsearch deployment on Elastic Cloud (Hosted deployment or Serverless project), you’re good to go. To spin up Elasticsearch in local dev mode in Docker for testing purposes, open the collapsible section below.
Run local Elasticsearch in Docker
docker run -p 9200:9200 -d --name elasticsearch \ -e "discovery.type=single-node" \ -e "xpack.security.enabled=false" \ -e "xpack.security.http.ssl.enabled=false" \ -e "xpack.license.self_generated.type=trial" \ docker.elastic.co/elasticsearch/elasticsearch:8.16.2
This Elasticsearch setup is for development purposes only. Never use this configuration in production. Refer to Set up Elasticsearch for production-grade installation instructions, including Docker.
We will use the default password changeme
for the elastic
user. For production environments, always ensure your cluster runs with security enabled.
export ELASTIC_PASSWORD="changeme"
Since we run our cluster locally with security disabled, we won’t use API keys to authenticate against the Elasticsearch. Instead, in each cURL request, we will use the -u
flag for authentication.
Let’s test that we can access Elasticsearch:
curl -s -X GET -u elastic:$ELASTIC_PASSWORD http://localhost:9200
Note: With Elasticsearch running locally, you will need to pass the username and password to authenticate against Elasticsearch in the configuration file for the connector service.
Run PostgreSQL instance in Docker (optional)
editFor this tutorial, we’ll set up a PostgreSQL instance in Docker with some example data. Of course, you can skip this step and use your own existing PostgreSQL instance if you have one. Keep in mind that using a different instance might require adjustments to the connector configuration described in the next steps.
Expand to run simple PostgreSQL instance in Docker and import example data
Let’s launch a PostgreSQL container with a user and password, exposed at port 5432
:
docker run --name postgres -e POSTGRES_USER=myuser -e POSTGRES_PASSWORD=mypassword -p 5432:5432 -d postgres
Download and import example data
Next we need to create a directory to store our example dataset for this tutorial. In your terminal, run the following command:
mkdir -p ~/data
We will use the Chinook dataset example data.
Run the following command to download the file to the ~/data
directory:
curl -L https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_PostgreSql.sql -o ~/data/Chinook_PostgreSql.sql
Now we need to import the example data into the PostgreSQL container and create the tables.
Run the following Docker commands to copy our sample data into the container and execute the psql
script:
docker cp ~/data/Chinook_PostgreSql.sql postgres:/ docker exec -it postgres psql -U myuser -f /Chinook_PostgreSql.sql
Let’s verify that the tables are created correctly in the chinook
database:
docker exec -it postgres psql -U myuser -d chinook -c "\dt"
The album
table should contain 347 entries and the artist
table should contain 275 entries.
This tutorial uses a very basic setup. To use advanced functionality such as filtering rules and incremental syncs, enable track_commit_timestamp
on your PostgreSQL database. Refer to postgresql-connector-client-tutorial for more details.
Now it’s time for the real fun! We’ll set up a connector to create a searchable mirror of our PostgreSQL data in Elasticsearch.
Create a connector
editWe’ll use the Create connector API to create a PostgreSQL connector instance.
Run the following API call, using the Dev Tools Console or curl
:
resp = client.connector.put( connector_id="my-connector-id", name="Music catalog", index_name="music", service_type="postgresql", ) print(resp)
const response = await client.connector.put({ connector_id: "my-connector-id", name: "Music catalog", index_name: "music", service_type: "postgresql", }); console.log(response);
PUT _connector/my-connector-id { "name": "Music catalog", "index_name": "music", "service_type": "postgresql" }
service_type
refers to the third-party data source you’re connecting to.
Note that we specified the my-connector-id
ID as a part of the PUT
request.
We’ll need the connector ID to set up and run the connector service locally.
If you’d prefer to use an autogenerated ID, replace PUT _connector/my-connector-id
with POST _connector
.
Run connector service
editThe connector service runs automatically in Elastic Cloud, if you’re using our managed Elastic managed connectors. Because we’re running a self-managed connector, we need to spin up this service locally.
Now we’ll run the connector service so we can start syncing data from our PostgreSQL instance to Elasticsearch. We’ll use the steps outlined in connectors-run-from-docker.
When running the connectors service on your own infrastructure, you need to provide a configuration file with the following details:
-
Your Elasticsearch endpoint (
elasticsearch.host
) -
An Elasticsearch API key (
elasticsearch.api_key
) -
Your third-party data source type (
service_type
) -
Your connector ID (
connector_id
)
Create an API key
editIf you haven’t already created an API key to access Elasticsearch, you can use the _security/api_key endpoint.
Here, we assume your target Elasticsearch index name is music
. If you use a different index name, adjust the request body accordingly.
resp = client.security.create_api_key( name="music-connector", role_descriptors={ "music-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "music", ".search-acl-filter-music", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": False } ] } }, ) print(resp)
const response = await client.security.createApiKey({ name: "music-connector", role_descriptors: { "music-connector-role": { cluster: ["monitor", "manage_connector"], indices: [ { names: ["music", ".search-acl-filter-music", ".elastic-connectors*"], privileges: ["all"], allow_restricted_indices: false, }, ], }, }, }); console.log(response);
POST /_security/api_key { "name": "music-connector", "role_descriptors": { "music-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "music", ".search-acl-filter-music", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": false } ] } } }
You’ll need to use the encoded
value from the response as the elasticsearch.api_key
in your configuration file.
You can also create an API key in the Kibana and Serverless UIs.
Prepare the configuration file
editLet’s create a directory and a config.yml
file to store the connector configuration:
mkdir -p ~/connectors-config touch ~/connectors-config/config.yml
Now, let’s add our connector details to the config file.
Open config.yml
and paste the following configuration, replacing placeholders with your own values:
elasticsearch.host: <ELASTICSEARCH_ENDPOINT> # Your Elasticsearch endpoint elasticsearch.api_key: <ELASTICSEARCH_API_KEY> # Your Elasticsearch API key connectors: - connector_id: "my-connector-id" service_type: "postgresql"
We provide an example configuration file in the elastic/connectors
repository for reference.
Run the connector service
editNow that we have the configuration file set up, we can run the connector service locally. This will point your connector instance at your Elasticsearch deployment.
Run the following Docker command to start the connector service:
docker run \ -v "$HOME/connectors-config:/config" \ --rm \ --tty -i \ --network host \ docker.elastic.co/integrations/elastic-connectors:8.16.2.0 \ /app/bin/elastic-ingest \ -c /config/config.yml
Verify your connector is connected by getting the connector status (should be needs_configuration
) and last_seen
field (note that time is reported in UTC).
The last_seen
field indicates that the connector successfully connected to Elasticsearch.
resp = client.connector.get( connector_id="my-connector-id", ) print(resp)
const response = await client.connector.get({ connector_id: "my-connector-id", }); console.log(response);
GET _connector/my-connector-id
Configure connector
editNow our connector instance is up and running, but it doesn’t yet know where to sync data from. The final piece of the puzzle is to configure our connector with details about our PostgreSQL instance. When setting up a connector in the Elastic Cloud or Serverless UIs, you’re prompted to add these details in the user interface.
But because this tutorial is all about working with connectors programmatically, we’ll use the Update connector configuration API to add our configuration details.
Before configuring the connector, ensure that the configuration schema is registered by the service.
For Elastic managed connectors, this occurs shortly after creation via the API.
For self-managed connectors, the schema registers on service startup (once the config.yml
is populated).
Configuration updates via the API are possible only after schema registration.
Verify this by checking the configuration property returned by the GET _connector/my-connector-id
request.
It should be non-empty.
Run the following API call to configure the connector with our connectors-postgresql-client-configuration,PostgreSQL configuration details:
resp = client.connector.update_configuration( connector_id="my-connector-id", values={ "host": "127.0.0.1", "port": 5432, "username": "myuser", "password": "mypassword", "database": "chinook", "schema": "public", "tables": "album,artist" }, ) print(resp)
const response = await client.connector.updateConfiguration({ connector_id: "my-connector-id", values: { host: "127.0.0.1", port: 5432, username: "myuser", password: "mypassword", database: "chinook", schema: "public", tables: "album,artist", }, }); console.log(response);
PUT _connector/my-connector-id/_configuration { "values": { "host": "127.0.0.1", "port": 5432, "username": "myuser", "password": "mypassword", "database": "chinook", "schema": "public", "tables": "album,artist" } }
Configuration details are specific to the connector type. The keys and values will differ depending on which third-party data source you’re connecting to. Refer to the individual connectors-references,connector references for these configuration details.
Sync data
editWe’re using a self-managed connector in this tutorial. To use these APIs with an Elastic managed connector, there’s some extra setup for API keys. Refer to Manage API keys for details.
We’re now ready to sync our PostgreSQL data to Elasticsearch. Run the following API call to start a full sync job:
resp = client.perform_request( "POST", "/_connector/_sync_job", headers={"Content-Type": "application/json"}, body={ "id": "my-connector-id", "job_type": "full" }, ) print(resp)
const response = await client.transport.request({ method: "POST", path: "/_connector/_sync_job", body: { id: "my-connector-id", job_type: "full", }, }); console.log(response);
POST _connector/_sync_job { "id": "my-connector-id", "job_type": "full" }
To store data in Elasticsearch, the connector needs to create an index.
When we created the connector, we specified the music
index.
The connector will create and configure this Elasticsearch index before launching the sync job.
In the approach we’ve used here, the connector will use dynamic mappings to automatically infer the data types of your fields. In a real-world scenario you would use the Elasticsearch Create index API to first create the index with the desired field mappings and index settings. Defining your own mappings upfront gives you more control over how your data is indexed.
Check sync status
editUse the Get sync job API to track the status and progress of the sync job. By default, the most recent job statuses are returned first. Run the following API call to check the status of the sync job:
resp = client.perform_request( "GET", "/_connector/_sync_job", params={ "connector_id": "my-connector-id", "size": "1" }, ) print(resp)
const response = await client.transport.request({ method: "GET", path: "/_connector/_sync_job", querystring: { connector_id: "my-connector-id", size: "1", }, }); console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1
The job document will be updated as the sync progresses, you can check it as often as you’d like to poll for updates.
Once the job completes, the status should be completed
and indexed_document_count
should be 622.
Verify that data is present in the music
index with the following API call:
resp = client.count( index="music", ) print(resp)
const response = await client.count({ index: "music", }); console.log(response);
GET music/_count
Elasticsearch stores data in documents, which are JSON objects. List the individual documents with the following API call:
resp = client.search( index="music", ) print(resp)
const response = await client.search({ index: "music", }); console.log(response);
GET music/_search
Troubleshooting
editUse the following command to inspect the latest sync job’s status:
resp = client.perform_request( "GET", "/_connector/_sync_job", params={ "connector_id": "my-connector-id", "size": "1" }, ) print(resp)
const response = await client.transport.request({ method: "GET", path: "/_connector/_sync_job", querystring: { connector_id: "my-connector-id", size: "1", }, }); console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1
If the connector encountered any errors during the sync, you’ll find these in the error
field.
Cleaning up
editTo delete the connector and its associated sync jobs run this command:
resp = client.connector.delete( connector_id="my-connector-id&delete_sync_jobs=true", ) print(resp)
const response = await client.connector.delete({ connector_id: "my-connector-id&delete_sync_jobs=true", }); console.log(response);
DELETE _connector/my-connector-id&delete_sync_jobs=true
This won’t delete the Elasticsearch index that was created by the connector to store the data.
Delete the music
index by running the following command:
resp = client.indices.delete( index="music", ) print(resp)
const response = await client.indices.delete({ index: "music", }); console.log(response);
DELETE music
To remove the PostgreSQL container, run the following commands:
docker stop postgres docker rm postgres
To remove the connector service, run the following commands:
docker stop <container_id> docker rm <container_id>
Next steps
editCongratulations! You’ve successfully set up a self-managed connector using the Connector APIs.
Here are some next steps to explore:
- Learn more about the Connector APIs.
- Learn how to deploy Elasticsearch, Kibana, and the connectors service using Docker Compose in our quickstart guide.