- Introducing Elasticsearch Service
- Adding data to Elasticsearch
- Migrating data
- Ingesting data from your application
- Ingest data with Node.js on Elasticsearch Service
- Ingest data with Python on Elasticsearch Service
- Ingest data from Beats to Elasticsearch Service with Logstash as a proxy
- Ingest data from a relational database into Elasticsearch Service
- Ingest logs from a Python application using Filebeat
- Ingest logs from a Node.js web application using Filebeat
- Configure Beats and Logstash with Cloud ID
- Best practices for managing your data
- Configure index management
- Enable cross-cluster search and cross-cluster replication
- Access other deployments of the same Elasticsearch Service organization
- Access deployments of another Elasticsearch Service organization
- Access deployments of an Elastic Cloud Enterprise environment
- Access clusters of a self-managed environment
- Enabling CCS/R between Elasticsearch Service and ECK
- Edit or remove a trusted environment
- Migrate the cross-cluster search deployment template
- Manage data from the command line
- Preparing a deployment for production
- Securing your deployment
- Monitoring your deployment
- Monitor with AutoOps
- Configure Stack monitoring alerts
- Access performance metrics
- Keep track of deployment activity
- Diagnose and resolve issues
- Diagnose unavailable nodes
- Why are my shards unavailable?
- Why is performance degrading over time?
- Is my cluster really highly available?
- How does high memory pressure affect performance?
- Why are my cluster response times suddenly so much worse?
- How do I resolve deployment health warnings?
- How do I resolve node bootlooping?
- Why did my node move to a different host?
- Snapshot and restore
- Managing your organization
- Your account and billing
- Billing Dimensions
- Billing models
- Using Elastic Consumption Units for billing
- Edit user account settings
- Monitor and analyze your account usage
- Check your subscription overview
- Add your billing details
- Choose a subscription level
- Check your billing history
- Update billing and operational contacts
- Stop charges for a deployment
- Billing FAQ
- Elasticsearch Service hardware
- Elasticsearch Service GCP instance configurations
- Elasticsearch Service GCP default provider instance configurations
- Elasticsearch Service AWS instance configurations
- Elasticsearch Service AWS default provider instance configurations
- Elasticsearch Service Azure instance configurations
- Elasticsearch Service Azure default provider instance configurations
- Change hardware for a specific resource
- Elasticsearch Service regions
- About Elasticsearch Service
- RESTful API
- Release notes
- Enhancements and bug fixes - December 2024
- Enhancements and bug fixes - November 2024
- Enhancements and bug fixes - Late October 2024
- Enhancements and bug fixes - Early October 2024
- Enhancements and bug fixes - September 2024
- Enhancements and bug fixes - Late August 2024
- Enhancements and bug fixes - Early August 2024
- Enhancements and bug fixes - July 2024
- Enhancements and bug fixes - Late June 2024
- Enhancements and bug fixes - Early June 2024
- Enhancements and bug fixes - Early May 2024
- Bring your own key, and more
- AWS region EU Central 2 (Zurich) now available
- GCP region Middle East West 1 (Tel Aviv) now available
- Enhancements and bug fixes - March 2024
- Enhancements and bug fixes - January 2024
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- AWS region EU North 1 (Stockholm) now available
- GCP regions Asia Southeast 2 (Indonesia) and Europe West 9 (Paris)
- Enhancements and bug fixes
- Enhancements and bug fixes
- Bug fixes
- Enhancements and bug fixes
- Role-based access control, and more
- Newly released deployment templates for Integrations Server, Master, and Coordinating
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- Enhancements and bug fixes
- Cross environment search and replication, and more
- Enhancements and bug fixes
- Enhancements and bug fixes
- Azure region Canada Central (Toronto) now available
- Azure region Brazil South (São Paulo) now available
- Azure region South Africa North (Johannesburg) now available
- Azure region Central India (Pune) now available
- Enhancements and bug fixes
- Azure new virtual machine types available
- Billing Costs Analysis API, and more
- Organization and billing API updates, and more
- Integrations Server, and more
- Trust across organizations, and more
- Organizations, and more
- Elastic Consumption Units, and more
- AWS region Africa (Cape Town) available
- AWS region Europe (Milan) available
- AWS region Middle East (Bahrain) available
- Enhancements and bug fixes
- Enhancements and bug fixes
- GCP Private Link, and more
- Enhancements and bug fixes
- GCP region Asia Northeast 3 (Seoul) available
- Enhancements and bug fixes
- Enhancements and bug fixes
- Native Azure integration, and more
- Frozen data tier and more
- Enhancements and bug fixes
- Azure region Southcentral US (Texas) available
- Azure region East US (Virginia) available
- Custom endpoint aliases, and more
- Autoscaling, and more
- Cross-region and cross-provider support, warm and cold data tiers, and more
- Better feature usage tracking, new cost and usage analysis page, and more
- New features, enhancements, and bug fixes
- AWS region Asia Pacific (Hong Kong)
- Enterprise subscription self service, log in with Microsoft, bug fixes, and more
- SSO for Enterprise Search, support for more settings
- Azure region Australia East (New South Wales)
- New logging features, better GCP marketplace self service
- Azure region US Central (Iowa)
- AWS region Asia Pacific (Mumbai)
- Elastic solutions and Microsoft Azure Marketplace integration
- AWS region Pacific (Seoul)
- AWS region EU West 3 (Paris)
- Traffic management and improved network security
- AWS region Canada (Central)
- Enterprise Search
- New security setting, in-place configuration changes, new hardware support, and signup with Google
- Azure region France Central (Paris)
- Regions AWS US East 2 (Ohio) and Azure North Europe (Ireland)
- Our Elasticsearch Service API is generally available
- GCP regions Asia East 1 (Taiwan), Europe North 1 (Finland), and Europe West 4 (Netherlands)
- Azure region UK South (London)
- GCP region US East 1 (South Carolina)
- GCP regions Asia Southeast 1 (Singapore) and South America East 1 (Sao Paulo)
- Snapshot lifecycle management, index lifecycle management migration, and more
- Azure region Japan East (Tokyo)
- App Search
- GCP region Asia Pacific South 1 (Mumbai)
- GCP region North America Northeast 1 (Montreal)
- New Elastic Cloud home page and other improvements
- Azure regions US West 2 (Washington) and Southeast Asia (Singapore)
- GCP regions US East 4 (N. Virginia) and Europe West 2 (London)
- Better plugin and bundle support, improved pricing calculator, bug fixes, and more
- GCP region Asia Pacific Southeast 1 (Sydney)
- Elasticsearch Service on Microsoft Azure
- Cross-cluster search, OIDC and Kerberos authentication
- AWS region EU (London)
- GCP region Asia Pacific Northeast 1 (Tokyo)
- Usability improvements and Kibana bug fix
- GCS support and private subscription
- Elastic Stack 6.8 and 7.1
- ILM and hot-warm architecture
- Elasticsearch keystore and more
- Trial capacity and more
- APM Servers and more
- Snapshot retention period and more
- Improvements and snapshot intervals
- SAML and multi-factor authentication
- Next generation of Elasticsearch Service
- Branding update
- Minor Console updates
- New Cloud Console and bug fixes
- What’s new with the Elastic Stack
Ingest data from a relational database into Elasticsearch Service
editIngest data from a relational database into Elasticsearch Service
editThis guide explains how to ingest data from a relational database into Elasticsearch Service through Logstash, using the Logstash JDBC input plugin. It demonstrates how Logstash can be used to efficiently copy records and to receive updates from a relational database, and then send them into Elasticsearch in an Elasticsearch Service deployment.
The code and methods presented here have been tested with MySQL. They should work with other relational databases.
The Logstash Java Database Connectivity (JDBC) input plugin enables you to pull in data from many popular relational databases including MySQL and Postgres. Conceptually, the JDBC input plugin runs a loop that periodically polls the relational database for records that were inserted or modified since the last iteration of this loop.
This document presents:
Time required: 2 hours
Prerequisites
editFor this tutorial you need a source MySQL instance for Logstash to read from. A free version of MySQL is available from the MySQL Community Server section of the MySQL Community Downloads site.
Get Elasticsearch Service
edit- Get a free trial.
- Log into Elastic Cloud.
- Select Create deployment.
- Give your deployment a name. You can leave all other settings at their default values.
- Select Create deployment and save your Elastic deployment credentials. You need these credentials later on.
- When the deployment is ready, click Continue and a page of Setup guides is displayed. To continue to the deployment homepage click I’d like to do something else.
Prefer not to subscribe to yet another service? You can also get Elasticsearch Service through AWS, Azure, and GCP marketplaces.
Connect securely
editWhen connecting to Elasticsearch Service you can use a Cloud ID to specify the connection details. Find your Cloud ID by going to the Kibana main menu and selecting Management > Integrations, and then selecting View deployment details.
To connect to, stream data to, and issue queries with Elasticsearch Service, you need to think about authentication. Two authentication mechanisms are supported, API key and basic authentication. Here, to get you started quickly, we’ll show you how to use basic authentication, but you can also generate API keys as shown later on. API keys are safer and preferred for production environments.
- Download and unpack Logstash on the local machine that hosts MySQL or another machine granted access to the MySQL machine.
Get the MySQL JDBC driver
editThe Logstash JDBC input plugin does not include any database connection drivers. You need a JDBC driver for your relational database for the steps in the later section Configure a Logstash pipeline with the JDBC input plugin.
- Download and unpack the JDBC driver for MySQL from the Connector/J section of the MySQL Community Downloads site.
- Make a note of the driver’s location as it’s needed in the steps that follow.
Prepare a source MySQL database
editLet’s look at a simple database from which you’ll import data and send it to Elasticsearch Service. This example uses a MySQL database with timestamped records. The timestamps enable you to determine easily what’s changed in the database since the most recent data transfer to Elasticsearch Service.
Consider the database structure and design
editFor this example, let’s create a new database es_db with table es_table, as the source of our Elasticsearch data.
-
Run the following SQL statement to generate a new MySQL database with a three column table:
CREATE DATABASE es_db; USE es_db; DROP TABLE IF EXISTS es_table; CREATE TABLE es_table ( id BIGINT(20) UNSIGNED NOT NULL, PRIMARY KEY (id), UNIQUE KEY unique_id (id), client_name VARCHAR(32) NOT NULL, modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
Let’s explore the key concepts in this SQL snippet:
- es_table
- The name of the table that stores the data.
- id
- The unique identifier for records. id is defined as both a PRIMARY KEY and UNIQUE KEY to guarantee that each id appears only once in the current table. This is translated to _id for updating or inserting the document into Elasticsearch.
- client_name
- The data that will ultimately be ingested into Elasticsearch. For simplicity, this example includes only a single data field.
- modification_time
- The timestamp of when the record was inserted or last updated. Further in, you can use this timestamp to determine what has changed since the last data transfer into Elasticsearch.
-
Consider how to handle deletions and how to notify Elasticsearch about them. Often, deleting a record results in its immediate removal from the MySQL database. There’s no record of that deletion. The change isn’t detected by Logstash, so that record remains in Elasticsearch.
There are two possible ways to address this:
- You can use "soft deletes" in your source database. Essentially, a record is first marked for deletion through a boolean flag. Other programs that are currently using your source database would have to filter out "soft deletes" in their queries. The "soft deletes" are sent over to Elasticsearch, where they can be processed. After that, your source database and Elasticsearch must both remove these "soft deletes."
- You can periodically clear the Elasticsearch indices that are based off of the database, and then refresh Elasticsearch with a fresh ingest of the contents of the database.
-
Log in to your MySQL server and add three records to your new database:
use es_db INSERT INTO es_table (id, client_name) VALUES (1,"Targaryen"), (2,"Lannister"), (3,"Stark");
-
Verify your data with a SQL statement:
select * from es_table;
The output should look similar to the following:
+----+-------------+---------------------+ | id | client_name | modification_time | +----+-------------+---------------------+ | 1 | Targaryen | 2021-04-21 12:17:16 | | 2 | Lannister | 2021-04-21 12:17:16 | | 3 | Stark | 2021-04-21 12:17:16 | +----+-------------+---------------------+
Now, let’s go back to Logstash and configure it to ingest this data.
Configure a Logstash pipeline with the JDBC input plugin
editLet’s set up a sample Logstash input pipeline to ingest data from your new JDBC Plugin and MySQL database. Beyond MySQL, you can input data from any database that supports JDBC.
-
In
<localpath>/logstash-7.12.0/
, create a new text file namedjdbc.conf
. -
Copy and paste the following code into this new text file. This code creates a Logstash pipeline through a JDBC plugin.
input { jdbc { jdbc_driver_library => "<driverpath>/mysql-connector-java-<versionNumber>.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db" jdbc_user => "<myusername>" jdbc_password => "<mypassword>" jdbc_paging_enabled => true tracking_column => "unix_ts_in_secs" use_column_value => true tracking_column_type => "numeric" schedule => "*/5 * * * * *" statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC" } } filter { mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["id", "@version", "unix_ts_in_secs"] } } output { stdout { codec => "rubydebug"} }
Specify the full path to your local JDBC driver .jar file (including version number). For example:
jdbc_driver_library => "/usr/share/mysql/mysql-connector-java-8.0.24.jar"
Provide the IP address or hostname and the port of your MySQL host. For example,
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/es_db"
Provide your MySQL credentials. The username and password must both be enclosed in quotation marks.
If you are using MariaDB (a popular open source community fork of MySQL), there are a couple of things that you need to do differently:
In place of the MySQL JDBC driver, download and unpack the JDBC driver for MariaDB.
Substitute the following lines in the
jdbc.conf
code, including theANSI_QUOTES
snippet in the last line:jdbc_driver_library => "<driverPath>/mariadb-java-client-<versionNumber>.jar" jdbc_driver_class => "org.mariadb.jdbc.Driver" jdbc_connection_string => "jdbc:mariadb://<mySQLHost>:3306/es_db?sessionVariables=sql_mode=ANSI_QUOTES"
Following are some additional details about the Logstash pipeline code:
- jdbc_driver_library
-
The Logstash JDBC plugin does not come packaged with JDBC driver libraries. The JDBC driver library must be passed explicitly into the plugin using the
jdbc_driver_library
configuration option. - tracking_column
-
This parameter specifies the field
unix_ts_in_secs
that tracks the last document read by Logstash from MySQL, stored on disk in logstash_jdbc_last_run. The parameter determines the starting value for documents that Logstash requests in the next iteration of its polling loop. The value stored inlogstash_jdbc_last_run
can be accessed in a SELECT statement assql_last_value
. - unix_ts_in_secs
-
The field generated by the SELECT statement, which contains the
modification_time
as a standard Unix timestamp (seconds since the epoch). The field is referenced by thetracking column
. A Unix timestamp is used for tracking progress rather than a normal timestamp, as a normal timestamp may cause errors due to the complexity of correctly converting back and forth between UMT and the local timezone. - sql_last_value
-
This is a built-in parameter containing the starting point of the current iteration of the Logstash polling loop, and it is referenced in the SELECT statement line of the JDBC input configuration. This parameter is set to the most recent value of
unix_ts_in_secs
, which is read from.logstash_jdbc_last_run
. This value is the starting point for documents returned by the MySQL query that is executed in the Logstash polling loop. Including this variable in the query guarantees that we’re not resending data that is already stored in Elasticsearch. - schedule
-
This uses cron syntax to specify how often Logstash should poll MySQL for changes. The specification
*/5 * * * * *
tells Logstash to contact MySQL every 5 seconds. Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by rufus-scheduler. The syntax is cron-like with some extensions specific to Rufus (for example, timezone support). - modification_time < NOW()
- This portion of the SELECT is explained in detail in the next section.
- filter
-
In this section, the value
id
is copied from the MySQL record into a metadata field called_id
, which is later referenced in the output to ensure that each document is written into Elasticsearch with the correct_id
value. Using a metadata field ensures that this temporary value does not cause a new field to be created. Theid
,@version
, andunix_ts_in_secs
fields are also removed from the document, since they don’t need to be written to Elasticsearch. - output
- This section specifies that each document should be written to the standard output using the rubydebug output to help with debugging.
-
Launch Logstash with your new JDBC configuration file:
bin/logstash -f jdbc.conf
Logstash outputs your MySQL data through standard output (
stdout
), your command line interface. The results for the initial data load should look similar to the following:[INFO ] 2021-04-21 12:32:32.816 [Ruby-0-Thread-15: :1] jdbc - (0.009082s) SELECT * FROM (SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > 0 AND modification_time < NOW()) ORDER BY modification_time ASC) AS 't1' LIMIT 100000 OFFSET 0 { "client_name" => "Targaryen", "modification_time" => 2021-04-21T12:17:16.000Z, "@timestamp" => 2021-04-21T12:17:16.923Z } { "client_name" => "Lannister", "modification_time" => 2021-04-21T12:17:16.000Z, "@timestamp" => 2021-04-21T12:17:16.961Z } { "client_name" => "Stark", "modification_time" => 2021-04-21T12:17:16.000Z, "@timestamp" => 2021-04-21T12:17:16.963Z }
The Logstash results periodically display SQL SELECT statements, even when there’s nothing new or modified in the MySQL database:
[INFO ] 2021-04-21 12:33:30.407 [Ruby-0-Thread-15: :1] jdbc - (0.002835s) SELECT count(*) AS 'count' FROM (SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > 1618935436 AND modification_time < NOW()) ORDER BY modification_time ASC) AS 't1' LIMIT 1
-
Open your MySQL console. Let’s insert another record into that database using the following SQL statement:
use es_db INSERT INTO es_table (id, client_name) VALUES (4,"Baratheon");
Switch back to your Logstash console. Logstash detects the new record and the console displays results similar to the following:
[INFO ] 2021-04-21 12:37:05.303 [Ruby-0-Thread-15: :1] jdbc - (0.001205s) SELECT * FROM (SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > 1618935436 AND modification_time < NOW()) ORDER BY modification_time ASC) AS 't1' LIMIT 100000 OFFSET 0 { "client_name" => "Baratheon", "modification_time" => 2021-04-21T12:37:01.000Z, "@timestamp" => 2021-04-21T12:37:05.312Z }
-
Review the Logstash output results to make sure your data looks correct. Use
CTRL + C
to shut down Logstash.
Output to Elasticsearch
editIn this section, we configure Logstash to send the MySQL data to Elasticsearch. We modify the configuration file created in the section Configure a Logstash pipeline with the JDBC input plugin so that data is output directly to Elasticsearch. We start Logstash to send the data, and then log into Elasticsearch Service to verify the data in Kibana.
-
Open the
jdbc.conf
file in the Logstash folder for editing. -
Update the output section with the one that follows:
output { elasticsearch { index => "rdbms_idx" ilm_enabled => false cloud_id => "<DeploymentName>:<ID>" cloud_auth => "elastic:<Password>" ssl => true # api_key => "<myAPIid:myAPIkey>" } }
Use the Cloud ID of your Elasticsearch Service deployment. You can include or omit the
<DeploymentName>:
prefix at the beginning of the Cloud ID. Both versions work fine. Find your Cloud ID by going to the Kibana main menu and selecting Management > Integrations, and then selecting View deployment details.the default username is
elastic
. It is not recommended to use theelastic
account for ingesting data as this is a superuser. We recommend using a user with reduced permissions, or an API Key with permissions specific to the indices or data streams that will be written to. Check Configuring security in Logstash for information on roles and API Keys. Use the password provided when you created the deployment if using theelastic
user, or the password used when creating a new ingest user with the roles specified in the Configuring security in Logstash documentation.Following are some additional details about the configuration file settings:
- index
-
The name of the Elasticsearch index,
rdbms_idx
, to associate the documents. - api_key
- If you choose to use an API key to authenticate (as discussed in the next step), you can provide it here.
-
Optional: For additional security, you can generate an Elasticsearch API key through the Elasticsearch Service console and configure Logstash to use the new key to connect securely to Elasticsearch Service.
- Log in to the Elasticsearch Service Console.
- Select the deployment name and go to ☰ > Management > Dev Tools.
-
Enter the following:
POST /_security/api_key { "name": "logstash-apikey", "role_descriptors": { "logstash_read_write": { "cluster": ["manage_index_templates", "monitor"], "index": [ { "names": ["logstash-*","rdbms_idx"], "privileges": ["create_index", "write", "read", "manage"] } ] } } }
This creates an API key with the cluster
monitor
privilege which gives read-only access for determining the cluster state, andmanage_index_templates
allows all operations on index templates. Some additional privileges also allowcreate_index
,write
, andmanage
operations for the specified index. The indexmanage
privilege is added to enable index refreshes. -
Click ▶. The output should be similar to the following:
{ "api_key": "tV1dnfF-GHI59ykgv4N0U3", "id": "2TBR42gBabmINotmvZjv", "name": "logstash_api_key" }
-
Enter your new
api_key
value into the Logstashjdbc.conf
file, in the format<id>:<api_key>
. If your results were as shown in this example, you would enter2TBR42gBabmINotmvZjv:tV1dnfF-GHI59ykgv4N0U3
. Remember to remove the pound (#
) sign to uncomment the line, and comment out theusername
andpassword
lines:output { elasticsearch { index => "rdbms_idx" cloud_id => "<myDeployment>" ssl => true ilm_enabled => false api_key => "2TBR42gBabmINotmvZjv:tV1dnfF-GHI59ykgv4N0U3" # user => "<Username>" # password => "<Password>" } }
-
At this point, if you simply restart Logstash as is with your new output, then no MySQL data is sent to our Elasticsearch index.
Why? Logstash retains the previous
sql_last_value
timestamp and sees that no new changes have occurred in the MySQL database since that time. Therefore, based on the SQL query that we configured, there’s no new data to send to Logstash.Solution: Add
clean_run => true
as a new line in the JDBC input section of thejdbc.conf
file. When set totrue
, this parameter resetssql_last_value
back to zero.input { jdbc { ... clean_run => true ... } }
After running Logstash once with
sql_last_value
set totrue
you can remove theclean_run
line, unless you prefer the reset behavior to happen again at each restart of Logstash -
Open a command line interface instance, go to your Logstash installation path, and start Logstash:
bin/logstash -f jdbc.conf
-
Logstash outputs the MySQL data to your Elasticsearch Service deployment. Let’s take a look in Kibana and verify that data:
- Log in to the Elasticsearch Service Console.
- Select the deployment and go to ☰ > Management > Dev Tools
-
Copy and paste the following API GET request into the Console pane, and then click ▶. This queries all records in the new
rdbms_idx
index.GET rdbms_idx/_search { "query": { "match_all": {} } }
-
The Results pane lists the
client_name
records originating from your MySQL database, similar to the following example:
Now, you should have a good understanding of how to configure Logstash to ingest data from your relational database through the JDBC Plugin. You have some design considerations to track records that are new, modified, and deleted. You should have the basics needed to begin experimenting with your own database and Elasticsearch.
On this page
ElasticON events are back!
Learn about the Elastic Search AI Platform from the experts at our live events.
Register now