WARNING: Version 2.1 has passed its EOL date.
This documentation is no longer being maintained and may be removed. If you are running this version, we strongly advise you to upgrade. For the latest information, see the current release documentation.
Apache Spark support
editApache Spark support
edit
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs. |
||
-- Spark website |
Spark provides fast iterative/functional-like capabilities over large data sets, typically by caching data in memory. As opposed to the rest of the libraries mentioned in this documentation, Apache Spark is computing framework that is not tied to Map/Reduce itself however it does integrate with Hadoop, mainly to HDFS. elasticsearch-hadoop allows Elasticsearch to be used in Spark in two ways: through the dedicated support available since 2.1 or through the Map/Reduce bridge since 2.0
Installation
editJust like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath. As Spark has multiple deployment modes, this can translate to the target classpath, whether it is on only one node (as is the case with the local mode - which will be used through-out the documentation) or per-node depending on the desired infrastructure.
Native support
editAdded in 2.1.
elasticsearch-hadoop provides native integration between Elasticsearch and Apache Spark, in the form of a RDD
(Resilient Distributed Dataset) (or Pair RDD
to be precise) that can read data from Elasticsearch. The RDD
is offered in two flavors: one for Scala (which returns the data as Tuple2
with Scala collections) and one for Java (which returns the data as Tuple2
containing java.util
collections).
Whenever possible, consider using the native integration as it offers the best performance and maximum flexibility.
Configuration
editTo configure one, one can set the various properties described in the Configuration chapter through the SparkConf
object:
import org.apache.spark.SparkConf val conf = new SparkConf().setAppName(appName).setMaster(master) conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); conf.set("es.index.auto.create", "true");
Command-lineFor those that want to set the properties through the command-line (either directly or by loading them from a file), note that Spark only accepts those that start with the "spark." prefix and will ignore the rest (and depending on the version a warning might be thrown). To work-around this limitation, define the elasticsearch-hadoop properties by appending the spark.
prefix (thus they become spark.es.
) and elasticsearch-hadoop will automatically resolve them:
Writing data to Elasticsearch
editWith elasticsearch-hadoop, any RDD
can be saved to Elasticsearch as long as its content can be translated into documents. In practice this means the RDD
type needs to be a Map
(whether a Scala or a Java one), a JavaBean
or a Scala case class. When that is not the case, one can easily transform the data
in Spark or plug-in their own custom ValueWriter
.
Scala
editWhen using Scala, simply import the org.elasticsearch.spark
package which, through the pimp my library pattern, enriches the any RDD
API with saveToEs
methods:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") sc.makeRDD( Seq(numbers, airports) ).saveToEs("spark/docs")
Spark Scala imports |
|
elasticsearch-hadoop Scala imports |
|
start Spark through its Scala API |
|
|
|
index the content (namely the two documents (numbers and airports)) in Elasticsearch under |
Scala users might be tempted to use Seq
and the →
notation for declaring root objects (that is the JSON document) instead of using a Map
. While similar, the first notation results in slightly different types that cannot be matched to a JSON document: Seq
is an order sequence (in other words a list) while ←
creates a Tuple
which is more or less an ordered, fixed number of elements. As such, a list of lists cannot be used as a document since it cannot be mapped to a JSON object; however it can be used freely within one. Hence why in the example above Map(k→v)
was used instead of Seq(k→v)
As an alternative to the implicit import above, one can elasticsearch-hadoop Spark support in Scala through EsSpark
in org.elasticsearch.spark.rdd
package which acts as an utility class allowing explicit method invocations. Additionally instead of Map
s (which are convenient but require one mapping per instance due to their difference in structure), use a case class :
import org.apache.spark.SparkContext import org.elasticsearch.spark.rdd.EsSpark // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) EsSpark.saveToEs(rdd, "spark/docs")
|
|
Define a case class named |
|
Create an |
|
Index the |
For cases where the id (or other metadata fields like ttl
or timestamp
) of the document needs to be specified, one can do so by setting the appropriate mapping namely es.mapping.id
. Following the previous example, to indicate to Elasticsearch to use the field id
as the document id, update the RDD
configuration (it is also possible to set the property on the SparkConf
though due to its global effect it is discouraged):
EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
Java
editJava users have a dedicated class that provides a similar functionality to EsSpark
, namely JavaEsSpark
in the org.elasticsearch.spark.rdd.api.java
(a package similar to Spark’s Java API):
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); JavaEsSpark.saveToEs(javaRDD, "spark/docs");
Spark Java imports |
|
elasticsearch-hadoop Java imports |
|
start Spark through its Java API |
|
to simplify the example, use Guava(a dependency of Spark) |
|
create a simple |
|
index the content (namely the two documents (numbers and airports)) in Elasticsearch under |
The code can be further simplifies by using Java 5 static imports. Additionally, the Map
(who’s mapping is dynamic due to its loose structure) can be replaced with a JavaBean
:
public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.java.api.JavaEsSpark; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize( ImmutableList.of(upcoming, lastWeek)); saveToEs(javaRDD, "spark/docs");
statically import |
|
define an |
|
call |
Setting the document id (or other metadata fields like ttl
or timestamp
) is similar to its Scala counterpart though potentially bit more verbose depending on whether you are using the JDK classes or some other utilities (like Guava):
JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
Writing existing JSON to Elasticsearch
editFor cases where the data in the RDD
is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. As such, in this case, elasticsearch-hadoop expects either an RDD
containing String
or byte arrays (byte[]
/Array[Byte]
), assuming each entry represents a JSON document. If the RDD
does not have the proper signature, the saveJsonToEs
methods cannot be applied (in Scala they will not be available).
Scala
editval json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" new SparkContext(conf).makeRDD(Seq(json1, json2)) .saveJsonToEs("spark/json-trips")
example of an entry within the |
|
index the JSON data through the dedicated |
Java
editString json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaContextSpark jsc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");
example of an entry within the |
|
notice the |
|
index the JSON data through the dedicated |
Writing to dynamic/multi-resources
editFor cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the es.resource.write
field which accepts pattern that are resolved from the document content, at runtime. Following the aforementioned media example, one could configure it as follows:
Scala
editval game = Map( "media_type"->"game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection/{media-type}")
Document key used for splitting the data. Any field can be declared (but make sure it is available in all documents) |
|
Save each object based on its resource pattern, in this example based on |
For each document/object about to be written, elasticsearch-hadoop will extract the media_type
field and use its value to determine the target resource.
Java
editAs expected, things in Java are strikingly similar:
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); saveToEs(javaRDD, "my-collection/{media-type}");
Handling document metadata
editElasticsearch allows each document to have its own metadata. As explained above, through the various mapping options one can customize these parameters so that their values are extracted from their belonging document. Further more, one can even include/exclude what parts of the data are backed to Elasticsearch. In Spark, elasticsearch-hadoop extends this functionality allowing metadata to be supplied _outside_ the document itself through the use of pair RDD
s.
In other words, for RDD
s containing a key-value tuple, the metadata can be extracted from the key and the value used as the document source.
The metadata is described through the Metadata
Java enum within org.elasticsearch.spark.rdd
package which identifies its type - id
, ttl
, version
, etc…
Thus an RDD
keys can be a Map
containing the Metadata
for each document and its associated values. If RDD
key is not of type Map
, elasticsearch-hadoop will consider the object as representing the document id and use it accordingly.
This sounds more complicated than it is, so let us see some examples.
Scala
editPair RDD
s, or simply put RDD
s with the signature RDD[(K,V)]
can take advantage of the saveToEsWithMeta
methods that are available either through the implicit import of org.elasticsearch.spark
package or EsSpark
object.
To manually specify the id for each document, simply pass in the Object
(not of type Map
) in your RDD
:
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) airportsRDD.saveToEsWithMeta(airportsRDD, "airports/2015")
|
|
The key of each tuple within the |
|
Since |
When more than just the id needs to be specified, one should use a scala.collection.Map
with keys of type org.elasticsearch.spark.rdd.Metadata
:
import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) airportsRDD.saveToEsWithMeta(airportsRDD, "airports/2015")
Import the |
|
The metadata used for |
|
The metadata used for |
|
The metadata used for |
|
The metadata and the documents are assembled into a pair |
|
The |
Java
editIn a similar fashion, on the Java side, JavaEsSpark
provides saveToEsWithMeta
methods that are applied to JavaPairRDD
(the equivalent in Java of RDD[(K,V)]
). Thus to save documents based on their ids one can use:
import org.elasticsearch.spark.rdd.java.api.JavaEsSpark; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
Create a |
|
Tuple for the first document wrapped around the id ( |
|
Tuple for the second document wrapped around the id ( |
|
The |
When more than just the id needs to be specified, one can chose to use a java.util.Map
populated with keys of type org.elasticsearch.spark.rdd.Metadata
:
import org.elasticsearch.spark.rdd.java.api.JavaEsSpark; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
|
|
static import for the |
|
Metadata for |
|
Metadata for |
|
Tuple between |
|
Tuple associating |
|
|
Reading data from Elasticsearch
editFor reading, one should define the Elasticsearch RDD
that streams data from Elasticsearch to Spark.
Scala
editSimilar to writing, the org.elasticsearch.spark
package, enriches the SparkContext
API with esRDD
methods:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val RDD = sc.esRDD("radio/artists")
Spark Scala imports |
|
elasticsearch-hadoop Scala imports |
|
start Spark through its Scala API |
|
a dedicated |
The method can be overloaded to specify an additional query or even a configuration Map
(overriding SparkConf
):
... import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) sc.esRDD("radio/artists", "?q=me*")
The documents from Elasticsearch are returned, by default, as a Tuple2
containing as the first element the document id and the second element the actual document represented through Scala collections, namely one `Map[String, Any]`where the keys represent the field names and the value their respective values.
Java
editJava users have a dedicated JavaPairRDD
that works the same as its Scala counterpart however the returned Tuple2
values (or second element) returns the documents as native, java.util
collections.
import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.java.api.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");
Spark Java imports |
|
elasticsearch-hadoop Java imports |
|
start Spark through its Java API |
|
a dedicated |
In a similar fashion one can use the overloaded esRDD
methods to specify a query or pass a Map
object for advanced configuration.
Let us see how this looks like, but this time around using Java static imports - further more, let us discard the documents ids and retrieve only the RDD
values:
import static org.elasticsearch.spark.rdd.java.api.JavaEsSpark.*; ... JavaRDD<Map<String, Object>> rdd = esRDD(jsc, "radio/artists", "?q=me*") .values();
statically import |
|
create an |
|
return only values of the |
By using the JavaEsSpark
API, one gets a hold of Spark’s dedicated JavaPairRDD
which are better suited in Java environments than the base RDD
(due to its Scala
signatures). Moreover, the dedicated RDD
returns Elasticsearch documents as proper Java collections so one does not have to deal with Scala collections (which
is typically the case with RDD
s). This is particularly powerful when using Java 8, which we strongly recommend as its
lambda expressions make collection processing extremely concise.
To wit, let us assume one wants to filter the documents from the RDD
and return only those that contain a value that contain mega
(please ignore the fact one can and should do the filtering directly through Elasticsearch).
In versions prior to Java 8, the code would look something like this:
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter( new Function<Map<String, Object>, Boolean>() { @Override public Boolean call(Map<String, Object> map) throws Exception { returns map.contains("mega"); } });
with Java 8, the filtering becomes a one liner:
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc -> doc.contains("mega"));
Reading data in JSON format
editIn case where the results from Elasticsearch need to be in JSON format (typically to be sent down the wire to some other system), one can use the dedicated esJsonRDD
methods. In this case, the connector will returns the documents content as it is received from Elasticsearch without any processing as an RDD[(String, String)]
in Scala or JavaPairRDD[String, String]
in Java with the keys representing the document id and the value its actual content in JSON format.
Type conversion
editIf automatic index creation is used, please review this section for more information.
elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch types (and back) as shown in the table below:
Table 5. Scala Types Conversion Table
Scala type | Elasticsearch type |
---|---|
|
|
|
|
|
empty |
|
|
|
|
|
|
case class |
|
|
|
in addition, the following implied conversion applies for Java types:
Table 6. Java Types Conversion Table
Java type | Elasticsearch type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Java Bean |
|
The conversion is done as a best effort; built-in Java and Scala types are guaranteed to be properly converted, however there are no guarantees for user types whether in Java or Scala. As mentioned in the tables above, when a case
class is encountered in Scala or JavaBean
in Java, the converters will try to unwrap
its content and save it as an object
. Note this works only for top-level user objects - if the user object has other user objects nested in, the conversion is likely to fail since the converter does not perform nested unwrapping
.
This is done on purpose since the converter has to serialize and deserialize the data and user types introduce ambiguity due to data loss; this can be addressed through some type of mapping however that takes the project way to close to the realm of ORMs and arguably introduces too much complexity for little to now gain; thanks to the processing functionality in Spark and the plugability in elasticsearch-hadoop one can easily transform objects into other types, if needed with minimal effort and maximum control.
Spark SQL support
editAdded in 2.1.
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine. |
||
-- Spark website |
On top of the core Spark support, elasticsearch-hadoop also provides integration with Spark SQL. In other words, Elasticsearch becomes a native source for Spark SQL so that data can be indexed and queried from Spark SQL transparently.
Spark SQL works with structured data - in other words, all entries are expected to have the same structure (same number of fields, of the same type and name). Using unstructured data (documents with different
structures) is not supported and will cause problems. For such cases, use PairRDD
s.
Supported Spark SQL versions
editSpark SQL is a young component, going through significant changes between releases. Spark SQL became a stable component in version 1.3, however it is not backwards compatible with the previous releases. elasticsearch-hadoop supports both version Spark SQL 1.1-1.2 and 1.3 (and higher) through two different jars:
elasticsearch-spark-<version>.jar
and elasticsearch-hadoop-<version>.jar
support Spark SQL 1.3 (or higher) while elasticsearch-spark-1.2-<version>.jar
supports Spark SQL 1.1 and 1.2. In other words, if you are not using Spark SQL 1.3, append the -1.2
suffix to the elasticsearch-hadoop artifact id.
Spark SQL support is available under org.elasticsearch.spark.sql
package.
API differencesFrom the elasticsearch-hadoop user perspectives, the differences between Spark SQL 1.3 and its previous versions are fairly trivial. This document describes at length the differences which are briefly mentioned below:
-
DataFrame
vsSchemaRDD
-
The core unit of Spark SQL in 1.3+ is a
DataFrame
while previously it was aSchemaRDD
- Unified API vs dedicated Java/Scala APIs
- In Spark SQL 1.3+ there is only one API for both Java and Scala, previous versions had dedicated APIs in particular with regards to data types.
The documentation below will focus on Spark SQL 1.3+ however accompanies each example with the suitable Spark SQL 1.1-1.2 code.
Writing DataFrame
(Spark SQL 1.3+) to Elasticsearch
editWith elasticsearch-hadoop, DataFrame
s can be indexed to Elasticsearch.
Scala
editIn Scala, simply import org.elasticsearch.spark.sql
package which enriches the given DataFrame
class with saveToEs
methods; while these have the same signature as the org.elasticsearch.spark
package, they are designed for DataFrame
implementations:
// reusing the example from Spark SQL documentation import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._ import org.elasticsearch.spark.sql._ ... // sc = existing SparkContext val sqlContext = new SQLContext(sc) // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF() people.saveToEs("spark/people")
Spark SQL package import |
|
elasticsearch-hadoop Spark package import |
|
Read a text file as normal |
|
Index the resulting |
Java
editIn a similar fashion, for Java usage the dedicated package org.elasticsearch.spark.sql.java.api
provides similar functionality through the JavaEsSpark SQL
:
import org.apache.spark.sql.api.java.*; import org.elasticsearch.spark.sql.java.api.JavaEsSpark SQL; ... DataFrame people = ... JavaEsSpark SQL.saveToEs("spark/people");
Spark SQL Java imports |
|
elasticsearch-hadoop Spark SQL Java imports |
|
index the |
Again, with Java 5 static imports this can be further simplied to:
For maximum control over the mapping of your SchemaRDD
in Elasticsearch, it is highly recommended to create the mapping before hand. See this chapter for more information.
Writing SchemaRDD
(Spark SQL 1.2) to Elasticsearch
editWhen dealing with Spark SQL 1.1/1.2 simply interchange DataFrame
with SchemaRDD
as the Java and Scala APIs are the same.
Scala
edit// reusing the example from Spark SQL documentation import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._ import org.elasticsearch.spark.sql._ ... // sc = existing SparkContext val sqlContext = new SQLContext(sc) // case class used to define the RDD schema case class Person(name: String, surname: String, age: Int) // create SchemaRDD val people = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) people.saveToEs("spark/people")
Spark SQL package import |
|
elasticsearch-hadoop Spark package import |
|
Read a text file as normal |
|
Index the resulting |
Java
editAs expected, the Java example is identical :
import org.apache.spark.sql.api.java.*; import org.elasticsearch.spark.sql.java.api.JavaEsSpark SQL; ... JavaSchemaRDD people = ... JavaEsSpark SQL.saveToEs("spark/people");
Spark SQL Java imports |
|
elasticsearch-hadoop Spark SQL Java imports |
|
index the |
Again, with Java 5 static imports this can be further simplied to:
For maximum control over the mapping of your SchemaRDD
in Elasticsearch, it is highly recommended to create the mapping before hand. See this chapter for more information.
Writing existing JSON to Elasticsearch
editWhen using Spark SQL, if the input data is in JSON format, simply convert it to a DataFrame
(in Spark SQL 1.3) or a SchemaRDD
(for Spark SQL 1.1/1.2) (as described in Spark documentation) through SQLContext
/JavaSQLContext
jsonFile
methods.
Using pure SQL to read from Elasticsearch
editAvailable in Apache Spark SQL 1.2 (or higher)
The index and its mapping, have to exist prior to creating the temporary table
Spark SQL 1.2 introduced a new API for reading from external data sources, which is supported by elasticsearch-hadoop simplifying the SQL configured needed for interacting with Elasticsearch. Further more, behind the scenes it understands the operations executed by Spark and thus can optimize the data and queries made (such as filtering or pruning), improving performance.
Data Sources in Spark SQL 1.3
editWhen using Spark SQL 1.3, elasticsearch-hadoop allows access to Elasticsearch through SQLContext
load
method. In other words, to create a DataFrame
backed by Elasticsearch in a declarative manner:
val sql = new SQLContext... // Spark 1.3 style val df = sql.load( "spark/index", "org.elasticsearch.spark.sql")
|
|
path or resource to load - in this case the index/type in Elasticsearch |
|
the data source provider - |
In Spark 1.4, one would use the following similar API calls:
|
|
the data source provider - |
|
path or resource to load - in this case the index/type in Elasticsearch |
In Spark 1.5, this can be further simplified to:
Whatever API is used, once created, the DataFrame
can be accessed freely to manipulate the data.
The sources declaration also allows specific options to be passed in, namely:
Name | Default value | Description |
---|---|---|
|
Elasticsearch index/type |
required |
|
|
Whether to translate (push-down) Spark SQL into Elasticsearch Query DSL |
|
|
Whether to use exact (not analyzed) matching or not (analyzed) |
Both options are explained in the next section.
To specify the options (including the generic elasticsearch-hadoop ones), one simply passes a Map
to the aforementioned methods:
For example:
val sql = new SQLContext... // options for Spark 1.3 need to include the target path/resource val options13 = Map("path" -> "spark/index", "pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.3 style val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) // options for Spark 1.4 - the path/resource is specified separately val options = Map("pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.4 style val spark14DF = sql.read().format("org.elasticsearch.spark.sql") .options(options) .load("spark/index")
|
|
|
|
pass the options when definition/loading the source |
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', nodes 'someNode')" ) "
Spark’s temporary table name |
|
|
|
elasticsearch-hadoop configuration options, the mandatory one being |
Do note that due to the SQL parser, the .
(among other common characters used for delimiting) is not allowed; the connector tries to work around it by append the es.
prefix automatically however this works only for specifying the configuration options with only one .
(like es.nodes
above). Because of this, if properties with multiple .
are needed, one should use the SQLContext.load
or SQLContext.read
methods above and pass the properties as a Map
.
Push-Down operations
editAvailable only in Spark 1.3 or higher
An important hidden feature of using elasticsearch-hadoop as a Spark source
is that the connector understand the operations performed within the DataFrame
/SQL and, by default, will translate them into the appropriate QueryDSL. In other words, the connector pushes down the operations directly at the source, where the data is efficiently filtered out so that only the required data is streamed back to Spark.
This significantly increases the queries performance and minimizes the CPU, memory and I/O on both Spark and Elasticsearch clusters as only the needed data is returned (as oppose to returning the data in bulk only to be processed and discarded by Spark).
Note the push down operations apply even when one specifies a query - the connector will enhance it according to the specified SQL.
As a side note, elasticsearch-hadoop supports all the `Filter`s available in Spark (1.3.0 to 1.4.0) while retaining backwards binary-compatibility with Spark 1.3.0, pushing down to full extent the SQL operations to Elasticsearch without any user interference.
To wit, consider the following Spark SQL:
// as a DataFrame val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips") df.printSchema() // root //|-- departure: string (nullable = true) //|-- arrival: string (nullable = true) //|-- days: long (nullable = true) val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))
or in pure SQL:
CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips") SELECT departure FROM trips WHERE arrival = "OTP" and days > 3
The connector translates the query into:
{ "query" : { "filtered" : { "query" : { "match_all" : {} }, "filter" : { "and" : [{ "query" : { "match" : { "arrival" : "OTP" } } }, { "days" : { "gt" : 3 } } ] } } } }
Further more, the pushdown filters can work on analyzed
terms (the default) or can be configured to be strict and provide exact
matches (work only on not-analyzed
fields). Unless one manually specifies the mapping, it is highly recommended to leave the defaults as they are. This and other topics are discussed at length in the Elasticsearch Reference Documentation.
Data Sources in Spark SQL 1.2
editAvailable since Spark SQL 1.2, one can also access a data source by declaring it as a Spark temporary table (backed by elasticsearch-hadoop):
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', " + "scroll_size '20')" )
Spark’s temporary table name |
|
|
|
elasticsearch-hadoop configuration options, the mandatory one being |
|
Since using |
Once defined, the schema is picked up automatically. So one can issue queries, right away:
val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")
As elasticsearch-hadoop is aware of the queries being made, it can optimize the requests done to Elasticsearch. For example, given the following query:
val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")
it knows only the name
and id
fields are required (the first to be returned to the user, the second for Spark’s internal filtering) and thus will ask only for this data, making the queries quite efficient.
Reading DataFrame
s (Spark SQL 1.3) from Elasticsearch
editAs you might have guessed, one can define a DataFrame
backed by Elasticsearch documents. Or even better, have them backed by a query result, effectively creating dynamic, real-time views over your data.
Scala
editThrough the org.elasticsearch.spark.sql
package, esDF
methods are available on the SQLContext
API:
import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ ... val sql = new SQLContext(sc) val people = sql.esDF("spark/people") // check the associated schema println(people.schema.treeString) // root // |-- name: string (nullable = true) // |-- surname: string (nullable = true) // |-- age: long (nullable = true)
Spark SQL Scala imports |
|
elasticsearch-hadoop SQL Scala imports |
|
create a |
|
the |
|
notice how the |
And just as with the Spark core support, additional parameters can be specified such as a query. This is quite a powerful concept as one can filter the data at the source (Elasticsearch) and use Spark only on the results:
Controlling the DataFrame
schemaIn some cases, especially when the index in Elasticsearch contains a lot of fields, it is desireable to create a DataFrame
that contains only a subset of them. While one can modify the DataFrame
(by working on its backing RDD
) through the official Spark API or through dedicated queries, elasticsearch-hadoop allows the user to specify what fields to include and exclude from Elasticsearch when creating the DataFrame
.
Through es.read.field.include
and es.read.field.exclude
properties, one can indicate what fields to include or exclude from the index mapping. The syntax is similar to that of Elasticsearch include/exclude. Multiple values can be specified by using a comma. By default, no value is specified meaning all properties/fields are included and no properties/fields are excluded.
For example:
# include es.read.field.include = *name, address.* # exclude es.read.field.exclude = *.created
Due to the way SparkSQL works with a DataFrame
schema, elasticsearch-hadoop needs to be aware of what fields are returned from Elasticsearch before executing the actual queries. While one can restrict the fields manually through the underlying Elasticsearch query, elasticsearch-hadoop is unaware of this and the results are likely to be different or worse, errors will occur. Use the properties above instead, which Elasticsearch will properly use alongside the user query.
Java
editFor Java users, a dedicated API exists through JavaEsSpark SQL
. It is strikingly similar to EsSpark SQL
however it allows configuration options to be passed in through Java collections instead of Scala ones; other than that using the two is exactly the same:
import org.apache.spark.sql.api.java.JavaSQLContext; import org.elasticsearch.spark.sql.java.api.JavaEsSparkSQL; ... SQLContext sql = new SQLContext(sc); DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");
Spark SQL import |
|
elasticsearch-hadoop import |
|
create a Java |
Better yet, the DataFrame
can be backed by a query result:
Reading SchemaRDD
s (Spark SQL 1.2) from Elasticsearch
editAnd again, if you are using Spark SQL 1.1/1.2 simply interchange DataFrame
with SchemaRDD
and esDF
with esRDD
:
Scala
editThrough the org.elasticsearch.spark.sql
package, esRDD
methods are available on the SQLContext
API:
import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ ... val sql = new SQLContext(sc) val people = sql.esRDD("spark/people") // check the associated schema println(people.schema) // root // |-- name: string (nullable = true) // |-- surname: string (nullable = true) // |-- age: long (nullable = true)
Spark SQL Scala imports |
|
elasticsearch-hadoop SQL Scala imports |
|
create a |
|
the |
|
notice how the |
And just as with the Spark core support, additional parameters can be specified such as a query. This is quite a powerful concept as one can filter the data at the source (Elasticsearch) and use Spark only on the results:
Java
editimport org.apache.spark.sql.api.java.JavaSQLContext; import org.elasticsearch.spark.sql.java.api.JavaEsSparkSQL; ... JavaSQLContext jsql = new JavaSQLContext(sc); JavaSchemaRDD people = JavaEsSparkSQL.esRDD(jsql, "spark/people");
Spark SQL import |
|
elasticsearch-hadoop import |
|
create a Java |
Better yet, the JavaSchemaRDD
can be backed by a query result:
Spark SQL Type conversion
editIf automatic index creation is used, please review this section for more information.
elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch types (and back) as shown in the table below:
While Spark SQL DataType
s have an equivalent in both Scala and Java and thus the RDD conversion can apply, there are slightly different semantics - in particular with the java.sql
types due to the way Spark SQL handles them:
Table 7. Spark SQL 1.3+ Conversion Table
Spark SQL DataType |
Elasticsearch type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Available only in Spark 1.2+ |
|
|
|
Using the Map/Reduce layer
editAnother way of using Spark with Elasticsearch is through the Map/Reduce layer, that is by leveraging the dedicate Input/OuputFormat
in elasticsearch-hadoop. However, unless one is stuck on
elasticsearch-hadoop 2.0, we strongly recommend using the native integration as it offers significantly better performance and flexibility.
Configuration
editThrough elasticsearch-hadoop, Spark can integrate with Elasticsearch through its dedicated InputFormat
, and in case of writing, through OutputFormat
. These are described at length in the Map/Reduce chapter so please refer to that for an in-depth explanation.
In short, one needs to setup a basic Hadoop Configuration
object with the target Elasticsearch cluster and index, potentially a query, and she’s good to go.
From Spark’s perspective, the only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely Writable
s. As such, InputFormat
and OutputFormat
s are required to return Writables
which, out of the box, Spark does not understand.
The good news is, one can easily enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.
SparkConf sc = new SparkConf(); //.setMaster("local"); sc.set("spark.serializer", KryoSerializer.class.getName()); // needed only when using the Java API JavaSparkContext jsc = new JavaSparkContext(sc);
Or if you prefer Scala
Note that the Kryo serialization is used as a work-around for dealing with Writable
types; one can choose to convert the types directly (from Writable
to Serializable
types) - which is fine however for getting started, the one liner above seems to be the most effective.
Reading data from Elasticsearch
editTo read data, simply pass in the org.elasticsearch.hadoop.mr.EsInputFormat
class - since it supports both the old
and the new
Map/Reduce APIs, you are free to use either method on SparkContext
's, hadoopRDD
(which we recommend for conciseness reasons) or newAPIHadoopRDD
. Which ever you chose, stick with it to avoid confusion and problems down the road.
Old (org.apache.hadoop.mapred
) API
editJobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
Create the Hadoop object (use the old API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
The Scala version is below:
val conf = new JobConf() conf.set("es.resource", "radio/artists") conf.set("es.query", "?q=me*") val esRDD = sc.hadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();
Create the Hadoop object (use the old API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
New (org.apache.hadoop.mapreduce
) API
editAs expected, the mapreduce
API version is strikingly similar - replace hadoopRDD
with newAPIHadoopRDD
and JobConf
with Configuration
. That’s about it.
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
Create the Hadoop object (use the new API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
The Scala version is below:
val conf = new Configuration() conf.set("es.resource", "radio/artists") conf.set("es.query", "?q=me*") val esRDD = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();
Create the Hadoop object (use the new API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
Using the connector from PySpark
editThanks to its Map/Reduce layer, elasticsearch-hadoop can be used from PySpark as well to both read and write data to Elasticsearch. To wit, below is a snippet from the Spark documentation (make sure to switch to the Python snippet):
$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
Also, the SQL loader can be used as well:
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") df.printSchema()