Map/Reduce integration
editMap/Reduce integration
editFor low-level or performance-sensitive environments, elasticsearch-hadoop provides dedicated InputFormat
and OutputFormat
implementations that can read and write data to Elasticsearch.
In Map/Reduce, the Mapper
s and Reducer
s are reading and writing Writable
objects, a Hadoop specific interface optimized for serialization. As such, elasticsearch-hadoop InputFormat
and OutputFormat
will return and expect MapWritable
objects; A map is used for each document being read or written. The map itself can have any type of internal structure as long as its objects are also Writable
- it can hold nested maps, numbers or strings in their Writable
representation.
Internally elasticsearch-hadoop automatically converts the Map
of Writable
to JSON documents and vice-versa so you do not have to deal with the low-level parsing or conversion to and from JSON. Moreover, if the data sent to Elasticsearch is already in JSON format, it can be streamed in directly without any conversion to Writable
objects.
Read the rest of the chapter to find out more.
Installation
editIn order to use elasticsearch-hadoop, the jar needs to be available to the job class path. At ~250kB
and without any dependencies, the jar can be either bundled in the job archive, manually or through CLI Generic Options (if your jar implements the Tool interface), be distributed through Hadoop’s DistributedCache or made available by provisioning the cluster manually.
All the options above affect only the code running on the distributed nodes. If your code that launches the Hadoop job refers to elasticsearch-hadoop, make sure to include the JAR in the HADOOP_CLASSPATH
:
HADOOP_CLASSPATH="<colon-separated-paths-to-your-jars-including-elasticsearch-hadoop>"
CLI example.
$ bin/hadoop jar myJar.jar -libjars elasticsearch-hadoop.jar
Configuration
editWhen using elasticsearch-hadoop in a Map/Reduce job, one can use Hadoop’s Configuration
object to configure elasticsearch-hadoop by setting the various options as properties on the aforementioned object.
Typically one would set the Elasticsearch host and port (assuming it is not running on the default localhost:9200
), the target index/type and potentially the query, for example:
Configuration conf = new Configuration(); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); ...
A node within the Elasticsearch cluster elasticsearch-hadoop will be connecting to. By default, elasticsearch-hadoop will detect the rest of the nodes in the cluster. |
|
The |
Simply use the configuration object when constructing the Hadoop job and you are all set.
Writing data to Elasticsearch
editWith elasticsearch-hadoop, Map/Reduce jobs can write data to Elasticsearch making it searchable through indexes. elasticsearch-hadoop supports both (so-called) old and new Hadoop APIs.
EsOutputFormat
expects a Map<Writable, Writable>
representing a document value that is converted internally into a JSON document and indexed in Elasticsearch.
Hadoop OutputFormat
requires implementations to expect a key and a value however, since for Elasticsearch only the document (that is the value) is necessary, EsOutputFormat
ignores the key.
Old (org.apache.hadoop.mapred
) API
editTo write data to ES, use org.elasticsearch.hadoop.mr.EsOutputFormat
on your job along with the relevant configuration properties:
JobConf conf = new JobConf(); conf.setSpeculativeExecution(false); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); conf.setOutputFormat(EsOutputFormat.class); conf.setMapOutputValueClass(MapWritable.class); conf.setMapperClass(MyMapper.class); ... JobClient.runJob(conf);
Disable speculative execution |
|
Target index/type |
|
Dedicated |
|
Specify the mapper output class ( |
A Mapper
implementation can use EsOutputFormat
as follows:
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { // create the MapWritable object MapWritable doc = new MapWritable(); ... // write the result to the output collector // one can pass whatever value to the key; EsOutputFormat ignores it output.collect(NullWritable.get(), map); }}
For cases where the id (or other metadata fields like ttl
or timestamp
) of the document needs to be specified, one can do so by setting the appropriate mapping namely es.mapping.id
. Thus assuming the documents contain a field called radioId
which is unique and is suitable for an identifier, one can update the job configuration as follows:
JobConf conf = new JobConf(); conf.set("es.mapping.id", "radioId");
At runtime, elasticsearch-hadoop will extract the value from each document and use it accordingly during the bulk call.
Writing existing JSON to Elasticsearch
editFor cases where the job input data is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. In such cases, one needs to indicate the json input by setting
the es.input.json
parameter. As such, in this case elasticsearch-hadoop expects either a Text
or BytesWritable
(preferred as it requires no String
conversion) object as output; if these types are not used, the library will simply fall back to the toString
representation of the target object.
Table 1. Writable
to use for JSON representation
Writable |
Comment |
---|---|
|
use this when the JSON data is represented as a |
|
use this if the JSON data is represented as a |
anything else |
make sure the |
Make sure the data is properly encoded, in UTF-8
. The job output is considered the final form of the document sent to Elasticsearch.
JobConf conf = new JobConf(); conf.set("es.input.json", "yes"); conf.setMapOutputValueClass(Text.class); ... JobClient.runJob(conf);
Indicate the input for |
|
Set the proper output type ( |
The Mapper
implementation becomes:
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { // assuming the document is a String called 'source' String source = ... Text jsonDoc = new Text(source); // send the doc directly output.collect(NullWritable.get(), jsonDoc); }}
Writing to dynamic/multi-resources
editFor cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the es.resource.write
field which accepts pattern that are resolved from the document content, at runtime.
Following the aforementioned media example, one could configure it as follows:
JobConf conf = new JobConf(); conf.set("es.resource.write","my-collection-{media-type}/doc");
If Writable
objects are used, for each MapWritable
elasticsearch-hadoop will extract the value under media-type
key and use that as the Elasticsearch index suffix. If raw JSON is used, then elasticsearch-hadoop will parse the document, extract the field media-type
and use its value accordingly.
New (org.apache.hadoop.mapreduce
) API
editUsing the new is strikingly similar - in fact, the exact same class (org.elasticsearch.hadoop.mr.EsOutputFormat
) is used:
Configuration conf = new Configuration(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); Job job = new Job(conf); job.setOutputFormatClass(EsOutputFormat.class); job.setMapOutputValueClass(MapWritable.class); ... job.waitForCompletion(true);
Disable mapper speculative execution |
|
Disable reducer speculative execution |
|
Target index/type |
|
Specify |
Same goes for the Mapper
instance :
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { // create the MapWritable object MapWritable doc = new MapWritable(); ... context.write(NullWritable.get(), doc); }}
Specifying the id or other document metadata is just as easy:
Configuration conf = new Configuration(); conf.set("es.mapping.id", "radioId");
Writing existing JSON to Elasticsearch
editAs before, when dealing with JSON directly, under the new API the configuration looks as follows:
Configuration conf = new Configuration(); conf.set("es.input.json", "yes"); Job job = new Job(conf); job.setMapOutputValueClass(BytesWritable.class); ... job.waitForCompletion(true);
Indicate the input for |
|
Set the output type, in this example |
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { // assuming the document is stored as bytes byte[] source = ... BytesWritable jsonDoc = new BytesWritable(source); // send the doc directly context.write(NullWritable.get(), jsonDoc); }}
Writing to dynamic/multi-resources
editAs expected, the difference between the old
and new
API are minimal (to be read non-existing) in this case as well:
Configuration conf = new Configuration(); conf.set("es.resource.write","my-collection-{media-type}/doc"); ...
Reading data from Elasticsearch
editIn a similar fashion, to read data from Elasticsearch, one needs to use org.elasticsearch.hadoop.mr.EsInputFormat
class.
While it can read an entire index, it is much more convenient to use a query - elasticsearch-hadoop will automatically execute the query in real time and return back the feed the results back to Hadoop. Since the query is executed against the real data, this acts as a live view of the data set.
Just like its counter partner (EsOutputFormat
), EsInputFormat
returns a Map<Writable, Writable>
for each JSON document returned by Elasticsearch. Since the InputFormat
requires both a key and a value to be returned, EsInputFormat
will return the document id (inside Elasticsearch) as the key (typically ignored) and the document/map as the value.
If one needs the document structure returned from Elasticsearch to be preserved, consider using org.elasticsearch.hadoop.mr.LinkedMapWritable
. The class extends Hadoop’s MapWritable
(and thus can easily replace it) and preserve insertion order; that is when iterating the map, the entries will be returned in insertion order (as oppose to MapWritable
which does not maintain it). However, due to the way Hadoop works, one needs to specify LinkedMapWritable
as the job map output value (instead of MapWritable
).
Old (org.apache.hadoop.mapred
) API
editFollowing our example above on radio artists, to get a hold of all the artists that start with me, one could use the following snippet:
JobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); conf.setInputFormat(EsInputFormat.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(MapWritable.class); ... JobClient.runJob(conf);
Target index/type |
|
Query |
|
Dedicated |
|
|
|
|
A Mapper
using EsInputFormat
might look as follows:
public class MyMapper extends MapReduceBase implements Mapper { @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { Text docId = (Text) key; MapWritable doc = (MapWritable) value; ... }}
Feel free to use Java 5 generics to avoid the cast above. For clarity and readability, the examples in this chapter do not include generics.
New (org.apache.hadoop.mapreduce
) API
editAs expected, the mapreduce
API version is quite similar:
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists/"); conf.set("es.query", "?q=me*"); Job job = new Job(conf); job.setInputFormatClass(EsInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); ... job.waitForCompletion(true);
Target index/type |
|
Query |
|
|
and well as the Mapper
implementation:
public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { Text docId = (Text) key; MapWritable doc = (MapWritable) value; ... }}
Reading from Elasticsearch in JSON format
editIn the case where the results from Elasticsearch need to be in JSON format (typically to be sent down the wire to some other system), one can instruct elasticsearch-hadoop to return the data as is. By setting es.output.json
to true
, the connector will parse the response from Elasticsearch, identify the documents and, without converting them, return their content to the user as Text
objects:
Configuration conf = new Configuration(); conf.set("es.resource", "source/category"); conf.set("es.output.json", "true");
Using different indices for reading and writing
editSometimes, one needs to read data from one Elasticsearch resource, process it and then write it back to a different resource inside the same job . es.resource
setting is not enough since it implies the same resource both as a source and destination.
In such cases, one should use es.resource.read
and es.resource.write
to differentiate between the two resources (the example below uses the mapreduce API):
Configuration conf = new Configuration(); conf.set("es.resource.read", "source/category"); conf.set("es.resource.write", "sink/group");
Type conversion
editIf automatic index creation is used, please review this section for more information.
elasticsearch-hadoop automatically converts Hadoop built-in Writable
types to Elasticsearch field types (and back) as shown in the table below:
Table 2. Writable
Conversion Table
Writable |
Elasticsearch type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
It is worth mentioning that rich data types available only in Elasticsearch, such as GeoPoint
or GeoShape
are supported by converting their structure into the primitives available in the table above. For example, based on its storage a geo_point
might be
returned as a Text
(basically a String
) or an ArrayWritable
.