INSERT INTO LOGSTASH SELECT DATA FROM DATABASE

作者

Ever want to search your database entities from Elasticsearch? Now you can use Logstash to do just that! In this blog we introduce the JDBC input, which has been created to import data from any database that supports the JDBC interface. Below, we show you few examples of using this plugin.

Getting Started

Installation

bin/plugin install logstash-input-jdbc


Driver Support

Popular databases like Oracle, Postgresql, and MySQL have compatible JDBC drivers that can be used with this input. This plugin does not come packaged with any of these JDBC drivers out of the box, but is straightforward to download. You can then configure the plugin to use the desired jdbc driver library. The setting jdbc_driver_library and jdbc_driver_class are used to load the library path and the driver's class name.

Lets get started with the examples!

Example 1: Simple Postgres Input

Here is an example of how you get started reading from a local Postgresql database. As a prerequisite, download the Postgresql JDBC drivers to use with the plugin.

Setting Up The Database

Before we get started, let's create a table called contacts and populate it with some contacts!

create table contacts (
<p>    uid serial,
    email VARCHAR(80) not null,
    first_name VARCHAR(80) NOT NULL,
    last_name VARCHAR(80) NOT NULL
);
INSERT INTO contacts(email, first_name, last_name) VALUES('jim@example.com', 'Jim', 'Smith');
INSERT INTO contacts(email, first_name, last_name) VALUES(null, 'John', 'Smith');
INSERT INTO contacts(email, first_name, last_name) VALUES('carol@example.com', 'Carol', 'Smith');
INSERT INTO contacts(email, first_name, last_name) VALUES('sam@example.com', 'Sam', null);</p>

After this runs, here are the contents in the database in table form.

Email First Name Last Name
null John Smith
carol@example.com Carol Smith
sam@example.com Sam Smith
jim@example.com Jim Smith

Logstash Configuration

We can go ahead and output all these events to the console with this sample Logstash configuration:

# file: simple-out.conf
input {
    jdbc {
        # Postgres jdbc connection string to our database, mydb
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/mydb"
        # The user we wish to execute our statement as
        jdbc_user => "postgres"
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "/path/to/postgresql-9.4-1201.jdbc41.jar"
        # The name of the driver class for Postgresql
        jdbc_driver_class => "org.postgresql.Driver"
        # our query
        statement => "SELECT * from contacts"
    }
}
output {
    stdout { codec => json_lines }
}

Now we can run Logstash and see the results!

<p style="line-height: 22.3999996185303px; color: rgb(57, 57, 57); font-family: Arial, Helvetica, Verdana, Tahoma, sans-serif; white-space: normal;">$ logstash-1.5.3/bin/logstash -f simple-out.conf </p><p style="line-height: 22.3999996185303px; color: rgb(57, 57, 57); font-family: Arial, Helvetica, Verdana, Tahoma, sans-serif; white-space: normal;">Logstash startup completed</p><p style="line-height: 22.3999996185303px; color: rgb(57, 57, 57); font-family: Arial, Helvetica, Verdana, Tahoma, sans-serif; white-space: normal;">{"uid":1,"email":null,"first_name":"hello","last_name":null,"@version":"1","@timestamp":"2015-07-29T21:03:18.958Z"}
{"uid":2,"email":"jim@example.com","first_name":"Jim","last_name":"Smith","@version":"1","@timestamp":"2015-07-29T21:03:18.959Z"}
{"uid":3,"email":null,"first_name":"John","last_name":"Smith","@version":"1","@timestamp":"2015-07-29T21:03:18.959Z"}
{"uid":4,"email":"carol@example.com","first_name":"Carol","last_name":"Smith","@version":"1","@timestamp":"2015-07-29T21:03:18.959Z"}
{"uid":5,"email":"sam@example.com","first_name":"Sam","last_name":null,"@version":"1","@timestamp":"2015-07-29T21:03:18.961Z"}</p><p style="line-height: 22.3999996185303px; color: rgb(57, 57, 57); font-family: Arial, Helvetica, Verdana, Tahoma, sans-serif; white-space: normal;">Logstash shutdown completed</p>

Awesome, we read data from Postgresql!

Up next, we will demonstrate two examples of how you may use this plugin in the context of Elasticsearch.

Example 2: Synchronizing Data In Your Table To Elasticsearch

In the case that we are using our database as an input source for Elasticsearch, we may be interested in keeping our existing documents in-sync with our data as the database undergoes updates. In this case, we can simply index our rows in Elasticsearch with unique ids such that any time we re-index them, they will just update. This way, we prevent Elasticsearch from assigning a new ID for each record and generating duplicates!

<p># file: contacts-index-logstash.conf
input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/mydb"
        jdbc_user => "postgres"
        jdbc_validate_connection => true
        jdbc_driver_library => "/path/to/postgresql-9.4-1201.jdbc41.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        statement => "SELECT * from contacts"
    }
}
output {
    elasticsearch {
        protocol => http
        index => "contacts"
        document_type => "contact"
        document_id => "%{uid}"
        host => "ES_NODE_HOST"
    }
}</p>

Let's do a quick check to see that "Sam" was indexed into Elasticsearch

<p>curl ES_NODE_HOST:9200/contacts/contact/5?pretty
{
    "_index" : "contacts",
    "_type" : "contact",
    "_id" : "5",
    "_version" : 1,
    "found" : true,
    "_source":{"uid":5,"email":"sam@example.com","first_name":"Sam","last_name":null,"@version":"1","@timestamp":"2015-07-29T22:12:20.146Z"}
}</p>

So far we just saw how to use a query to fetch results from a database query, but what if we want to update our index with new changes? What if some of our contacts changed emails, or we want to update someone's last name? Here is a sequence of changes that we can apply to our table and later verify the behavior we want in the resulting Elasticsearch index.

<p>UPDATE contacts SET last_name = 'Smith' WHERE email = 'sam@example.com';
UPDATE contacts SET email = 'john@example.com' WHERE uid = 3;
INSERT INTO contacts(email, first_name, last_name) VALUES('new@example.com', 'New', 'Smith');</p>

Now we can run Logstash with the same configuration. When we do the same query as before, we will 

notice that our document containing Sam has been updated and @version is now 2

<p>curl ES_NODE_HOST:9200/contacts/contact/5?pretty
{</p><p>    "_index" : "contacts",
    "_type" : "contact",
    "_id" : "5",
    "_version" : 2,
    "found" : true,
    "_source":{"uid":5,"email":"sam@example.com","first_name":"Sam","last_name":"Smith","@version":"1","@timestamp":"2015-07-29T22:12:56.980Z"}
}</p>

Using this method, we can re-index our table into Elasticsearch without ending up with duplicates. One thing to note is that we are not able to capture deletes to documents under this scheme.

Example 3: MusicBrainz Demo

MusicBrainz is an open music database containing up-to-date information about artists, their works, and everything in-between. You can learn more at http://musicbrainz.org

MusicBrainz graciously hosts a biweekly data dump of their database here.

This data is 1.8GB with information about around 18 million tracks

How to get the MusicBrainz data

You must first run your own mirror of the MusicBrainz database. This can be achieved using a tool called mbslave. The project's repo has instructions on syncing with the data-dump.

Formulating a query to load

Now that we have all of this wealth of music data in an accessible database, we can choose a subset of the data we wish to index into Elasticsearch. We may be interested in exploring the data about artists and their releases. Here is a SQL query to fetch a few attributes belonging to artists and their releases:

<p>SELECT
    release_group.gid AS album_id,
    release_group.type AS album_primary_type_id,
    release_group_primary_type.name AS album_primary_type_name,
    release.name AS release_name,
    artist.name AS artist_name,
    artist.gid AS artist_gid,
    artist_credit.id AS artist_credit_id,
    artist.type AS artist_type_id,
    artist_type.name AS artist_type_name,
    artist.begin_date_year artist_begin_date_year,
    area.name AS artist_country_name,
    release_country.date_year AS release_year,
    release_country.date_month AS release_month,
    release_country.date_day AS release_day
FROM
    musicbrainz.artist
INNER JOIN musicbrainz.artist_credit_name
    ON artist_credit_name.artist = artist.id
INNER JOIN musicbrainz.artist_credit
    ON artist_credit.id = artist_credit_name.artist_credit
INNER JOIN musicbrainz.release_group
    ON release_group.artist_credit = artist_credit.id
<span></span>INNER JOIN musicbrainz.release
    ON release.release_group = release_group.id
INNER JOIN musicbrainz.release_country
    ON release.id = release_country.release
INNER JOIN musicbrainz.artist_type
    ON artist.type = artist_type.id
INNER JOIN musicbrainz.area
    ON artist.area = area.id
INNER JOIN musicbrainz.release_group_primary_type
    ON release_group_primary_type.id = release_group.type
<span></span>WHERE
    ((release_country.date_year IS NOT NULL) AND
    (release_country.date_month IS NOT NULL) AND
    (release_country.date_day IS NOT NULL))</p>

Using Logstash to Query The Database and Index Into Elasticsearch

<p>input {
    jdbc {
        jdbc_driver_library => "/path/to/driver.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_url => "jdbc://postgresql"
        jdbc_user => "musicbrainz"
        statement_filepath => "query.sql"
    }
}</p><p>output {
    elasticsearch { protocol => http }</p><p>}</p>

In this case, we have such a complex query that we chose to leverage the statement_filepath parameter option. 

Exploring Data in Kibana

One great feature of migrating a part of the data into Elasticsearch is the ability to generate great insightful visualizations using Kibana. For starters, let's see how many musical releases are introduced year over year!

(image: Number of releases per year from 1900 to 2010)

Seeing general counts is a nice starter, but we can explore much more! For example, these releases have artists, countries, and release types associated with them. In the following Kibana dashboard we can see the 20 artists with the most number of releases associated to them. We can also visualize the differences between album, EP, and singles releases across the various producing countries.

We can drill into our donut visualization and filter for EPs that were produced by artists from the United Kingdom.

You may recognize some of these artists, while others may be new to you. Musicbrainz collects data about so many artists, there is always something new to discover!

More Information

This post only begins to explore the integrations and features the JDBC plugin includes. For more information, check out the plugin's documentation. We would love your feedback on our forum; if you think you’ve found a bug in this plugin, please submit an issue.