WARNING: Version 2.0 has passed its EOL date.
This documentation is no longer being maintained and may be removed. If you are running this version, we strongly advise you to upgrade. For the latest information, see the current release documentation.
Apache Spark support
editApache Spark support
edit
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs. |
||
-- Spark website |
Spark provides fast iterative/functional-like capabilities over large data sets, typically by caching data in memory. As opposed to the rest of the libraries mentioned in this documentation, Apache Spark is computing framework that is not tied to Map/Reduce itself however it does integrate with Hadoop, mainly to HDFS.
Installation
editJust like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath. As Spark has multiple deployment modes, this can translate to the target classpath, whether it is on only one node (as is the case with the local mode - which will be used through-out the documentation) or per-node depending on the desired infrastructure.
Configuration
editThrough elasticsearch-hadoop, Spark can integrate with Elasticsearch through its dedicated InputFormat
, and in case of writing, through OutputFormat
. These are described at length in the Map/Reduce chapter so please refer to that for an in-depth explanation.
In short, one needs to setup a basic Hadoop Configuration
object with the target Elasticsearch cluster and index, potentially a query, and she’s good to go.
From Spark’s perspective, they only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely Writable
s. As such, InputFormat
and OutputFormat
s are required to return Writables
which, out of the box, Spark does not understand.
The good news is, one can easily enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.
SparkConf sc = new SparkConf(); //.setMaster("local"); sc.set("spark.serializer", KryoSerializer.class.getName()); // needed only when using the Java API JavaSparkContext jsc = new JavaSparkContext(sc);
Or if you prefer Scala
Note that the Kryo serialization is used as a work-around for dealing with Writable
types; one can choose to convert the types directly (from Writable
to Serializable
types) - which is fine however for getting started, the one liner above seems to be the most effective.
Reading data from Elasticsearch
editTo read data, simply pass in the org.elasticsearch.hadoop.mr.EsInputFormat
class - since it supports both the old
and the new
Map/Reduce APIs, you are free to use either method on SparkContext
's, hadoopRDD
(which we recommend for conciseness reasons) or newAPIHadoopRDD
. Which ever you chose, stick with it to avoid confusion and problems down the road.
Old (org.apache.hadoop.mapred
) API
editJobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
Create the Hadoop object (use the old API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
The Scala version is below:
val conf = new JobConf() conf.set("es.resource", "radio/artists") conf.set("es.query", "?q=me*") val esRDD = sc.hadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();
Create the Hadoop object (use the old API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
New (org.apache.hadoop.mapreduce
) API
editAs expected, the mapreduce
API version is strikingly similar - replace hadoopRDD
with newAPIHadoopRDD
and JobConf
with Configuration
. That’s about it.
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
Create the Hadoop object (use the new API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
The Scala version is below: