Apache Pig support

edit

Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs.

-- Pig website

It provides a high-level, powerful, scripting-like transformation language which gets compiled into Map/Reduce jobs at runtime by the Pig compiler. To simplify working with arbitrary data, Pig associates a schema (or type information) with each data set for validation and performance. This in turn, breaks it down into discrete data types that can be transformed through various operators or custom functions (or UDFs). Data can be loaded from and stored to various storages such as the local file-system or HDFS, and with elasticsearch-hadoop into Elasticsearch as well.

Installation

edit

In order to use elasticsearch-hadoop, its jar needs to be in Pig’s classpath. There are various ways of making that happen though typically the REGISTER command is used:

REGISTER /path/elasticsearch-hadoop.jar;

The command expects a proper URI that can be found either on the local file-system or remotely. Typically it’s best to use a distributed file-system (like HDFS or Amazon S3) and use that since the script might be executed on various machines.

As an alternative, when using the command-line, one can register additional jars through the -Dpig.additional.jars option (that accepts an URI as well):

$ pig -Dpig.additional.jars=/path/elasticsearch-hadoop.jar:<other.jars> script.pig

or if the jars are on HDFS

$ pig \
-Dpig.additional.jars=hdfs://<cluster-name>:<cluster-port>/<path>/elasticsearch-hadoop.jar:<other.jars> script.pig

Configuration

edit

With Pig, one can specify the configuration properties (as an alternative to Hadoop Configuration object) as a constructor parameter when declaring EsStorage:

STORE B INTO 'radio/artists' 
       USING org.elasticsearch.hadoop.pig.EsStorage
             ('es.http.timeout = 5m', 
              'es.index.auto.create = false'); 

elasticsearch-hadoop configuration (target resource)

elasticsearch-hadoop option (http timeout)

another elasticsearch-hadoop configuration (disable automatic index creation)

To avoid having to specify the fully qualified class name (org.elasticsearch.hadoop.pig.EsStorage), consider using a shortcut through DEFINE command:

DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage();

Do note that it is possible (and recommended) to specify the configuration parameters to reduce script duplication, such as es.query or es.mapping.names:

DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage('my.cfg.param=value');

Pig definitions are replaced as are; even though the syntax allows parametrization, Pig will silently ignore any parameters outside the DEFINE declaration.

Tuple field names

edit

Among the various types available in Pig, tuples are used the most. Tuples are defined as “ordered sets of fields” (e.g. (19,2)) however structurally they are shaped as ordered maps since each field has a name, which may be defined or not (e.g. (field:19, another:2)). The ordered aspect is important and forces elasticsearch-hadoop to use JSON arrays for tuples (using JSON objects is not an option as it does not preserve ordering besides the fact that it requires keys/names which might be or not available in a tuple). Obeying the rule of least surprise, elasticsearch-hadoop by default will disregard a tuple’s field names, both when writing and reading.

To change this behavior (which in effect means treating tuples as arrays of maps instead of arrays), use the boolean property es.mapping.pig.tuple.use.field.names (by default false) and set it to true.

The table below illustrates the difference between the two settings:

Tuple schema Tuple value Resulting JSON representation

es.mapping.pig.tuple.use.field.names false (default)

(foo: (nr:int, name:chararray))

(1,"kimchy")

{"foo":[1, "kimchy"]}

(bar: (int, chararray))

(1,"kimchy")

{"bar":[1, "kimchy"]}

es.mapping.pig.tuple.use.field.names true

(foo: (nr:int, name:chararray))

(1,"kimchy")

{"foo":[{"nr":1, "name":"kimchy"}]}

(bar: (int, chararray))

(1,"kimchy")

{"bar":[{"val_0":1, "val_1":"name"}]}

When using tuples, it is highly recommended to create the index mapping before-hand as it is quite common for tuples to contain mixed types (numbers, strings, other tuples, etc…​) which, when mapped as an array (the default) can cause parsing errors (as the automatic mapping can infer the fields to be numbers instead of strings, etc…​). In fact, the example above falls in this category since the tuple contains both a number (1) and a string ("kimchy"), which will the auto-detection to map both foo and bar as a number and thus causing an exception when encountering "kimchy". Please refer to this for more information. Additionally consider breaking/flattening the tuple into primitive/data atoms before sending the data off to Elasticsearch.

Reducers parallelism

edit

By default, Pig will only use one reducer per job which in most cases is inefficient. To address these issue:

Use the Parallel Features
As explained in the reference docs, out of the box Pig expects each reducer to process about 1 GB of data; unfortunately if the data is scattered around the network this becomes inefficient as the entire job is effectively serialized. Change this by increasing the number of reducers to map that of your shards through the default_parallel property or PARALLEL keyword:
-- launch the Map/Reduce job with 5 reducers
SET default_parallel 5;

or by using the PARALLEL keyword with COGROUP, CROSS, DISTINCT, GROUP, JOIN(inner), JOIN(outer) and ORDER BY.

B = GROUP A BY t PARALLEL 18;
Disable split combination
Out of the box Pig over-eagerly combines its input splits even if it does not know how big they are. This again kills parallelism since it serializes the queries to Elasticsearch ; typically this looks as follows in the logs:
20yy-mm-dd hh:mm:ss,mss [JobControl] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 25
20yy-mm-dd hh:mm:ss,mss [JobControl] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1

Avoid this by setting pig.noSplitCombination to true (one can also use pig.splitCombination to false however we recommend the former) either by setting the property before invoking the script:

pig -Dpig.noSplitCombination=true myScript.pig

in the Pig script itself:

SET pig.noSplitCombination TRUE;

or through the global pig.properties configuration in your Pig install:

pig.noSplitCombination=true

Unfortunately elasticsearch-hadoop cannot set these properties automatically so the user has to do that manually per script or making them global through the Pig configuration as described above.

Mapping

edit

Out of the box, elasticsearch-hadoop uses the Pig schema to map the data in Elasticsearch, using both the field names and types in the process. There are cases however when the names in Pig cannot be used with Elasticsearch (invalid characters, existing names with different layout, etc…​). For such cases, one can use the es.mapping.names setting which accepts a comma-separated list of mapped names in the following format: Pig field name : Elasticsearch field name

For example:

STORE B INTO  '...' USING org.elasticsearch.hadoop.pig.EsStorage(
    'es.mapping.names=date:@timestamp, uRL:url')         

Pig column date mapped in Elasticsearch to @timestamp; Pig column uRL mapped in Elasticsearch to url

Since elasticsearch-hadoop 2.1, the Pig schema case sensitivity is preserved to Elasticsearch and back.

Writing data to Elasticsearch

edit

Elasticsearch is exposed as a native Storage to Pig so it can be used to store data into it:

-- load data from HDFS into Pig using a schema
A = LOAD 'src/test/resources/artists.dat' USING PigStorage()
                    AS (id:long, name, url:chararray, picture: chararray);
-- transform data
B = FOREACH A GENERATE name, TOTUPLE(url, picture) AS links;
-- save the result to Elasticsearch
STORE B INTO 'radio/artists'
       USING org.elasticsearch.hadoop.pig.EsStorage(); 

Elasticsearch resource (index and type) associated with the given storage

additional configuration parameters can be passed inside the () - in this case the defaults are used

For cases where the id (or other metadata fields like ttl or timestamp) of the document needs to be specified, one can do so by setting the appropriate mapping, namely es.mapping.id. Following the previous example, to indicate to Elasticsearch to use the field id as the document id, update the Storage configuration:

STORE B INTO 'radio/artists USING org.elasticsearch.hadoop.pig.EsStorage('es.mapping.id=id'...);

Writing existing JSON to Elasticsearch

edit

When the job input data is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. In such cases, one needs to indicate the json input by setting the es.input.json parameter. As such, in this case elasticsearch-hadoop expects to receive a tuple with a single field (representing the JSON document); the library will recognize common textual types such as chararray or bytearray otherwise it just calls toString to get a hold of the JSON content.

Table 4. Pig types to use for JSON representation

Pig type Comment

bytearray

use this when the JSON data is represented as a byte[] or similar

chararray

use this if the JSON data is represented as a String

anything else

make sure the toString() returns the desired JSON document

Make sure the data is properly encoded, in UTF-8. The field content is considered the final form of the document sent to Elasticsearch.

A = LOAD '/resources/artists.json' USING PigStorage() AS (json:chararray);" 
STORE B INTO 'radio/artists'
    USING org.elasticsearch.hadoop.pig.EsStorage('es.input.json=true'...); 

Load the (JSON) data as a single field (json)

Indicate the input is of type JSON.

Writing to dynamic/multi-resources

edit

One can index the data to a different resource, depending on the row being read, by using patterns. Reusing the aforementioned media example, one could configure it as follows:

A = LOAD 'src/test/resources/media.dat' USING PigStorage()
            AS (name:chararray, type:chararray, year: chararray); 
STORE B INTO 'my-collection/{type}' 
       USING org.elasticsearch.hadoop.pig.EsStorage();

Tuple field used by the resource pattern. Any of the declared fields can be used.

Resource pattern using field type - note the pattern can be used anywhere in the resource (on the index, on the type, in both places, etc…​)

For each tuple about to be written, elasticsearch-hadoop will extract the type field and use its value to determine the target resource.

The functionality is also available when dealing with raw JSON - in this case, the value will be extracted from the JSON document itself. Assuming the JSON source contains documents with the following structure:

{
    "media_type":"game",
    "title":"Final Fantasy VI",
    "year":"1994"
}

field within the JSON document that will be used by the pattern

the table declaration can be as follows:

A = LOAD '/resources/media.json' USING PigStorage() AS (json:chararray);" 
STORE B INTO 'my-collection/{media_type}' 
    USING org.elasticsearch.hadoop.pig.EsStorage('es.input.json=true');

Schema declaration for the tuple. Since JSON input is used, the schema is simply a holder to the raw data

Resource pattern relying on fields within the JSON document and not on the table schema

Reading data from Elasticsearch

edit

As you would expect, loading the data is straight forward:

-- execute Elasticsearch query and load data into Pig
A = LOAD 'radio/artists' 
    USING org.elasticsearch.hadoop.pig.EsStorage('es.query=?me*'); 
DUMP A;

Elasticsearch resource

search query to execute

Due to a bug in Pig, LoadFunctions are not aware of any schema associated with them. This means EsStorage is forced to fully parse the documents from Elasticsearch before passing the data to Pig for projection. In practice, this has little impact as long as a document top-level fields are used; for nested fields consider extracting the values yourself in Pig.

Reading data from Elasticsearch as JSON

edit

In the case where the results from Elasticsearch need to be in JSON format (typically to be sent down the wire to some other system), one can instruct elasticsearch-hadoop to return the data as is. By setting es.output.json to true, the connector will parse the response from Elasticsearch, identify the documents and, without converting them, return their content to the user as String/chararray objects.

Type conversion

edit

If automatic index creation is used, please review this section for more information.

Pig internally uses native java types for most of its types and elasticsearch-hadoop abides to that convention.

Pig type Elasticsearch type

null

null

chararray

string

int

int

long

long

double

double

float

float

bytearray

binary

tuple

array or map (depending on this setting)

bag

array

map

map

Available in Pig 0.10 or higher

boolean

boolean

Available in Pig 0.11 or higher

datetime

date

Available in Pig 0.12 or higher

biginteger

not supported

bigdecimal

not supported

While Elasticsearch understands the Pig types up to version 0.12.1, it is backwards compatible with Pig 0.9

It is worth mentioning that rich data types available only in Elasticsearch, such as GeoPoint or GeoShape are supported by converting their structure into the primitives available in the table above. For example, based on its storage a geo_point might be returned as a chararray or a tuple.