Apache Spark support
editApache Spark support
edit
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs. |
||
-- Spark website |
Spark provides fast iterative/functional-like capabilities over large data sets, typically by caching data in memory. As opposed to the rest of the libraries mentioned in this documentation, Apache Spark is computing framework that is not tied to Map/Reduce itself however it does integrate with Hadoop, mainly to HDFS. elasticsearch-hadoop allows Elasticsearch to be used in Spark in two ways: through the dedicated support available since 2.1 or through the Map/Reduce bridge since 2.0. Spark 2.0 is supported in elasticsearch-hadoop since version 5.0
Installation
editJust like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath. As Spark has multiple deployment modes, this can translate to the target classpath, whether it is on only one node (as is the case with the local mode - which will be used through-out the documentation) or per-node depending on the desired infrastructure.
Native RDD support
editAdded in 2.1.
elasticsearch-hadoop provides native integration between Elasticsearch and Apache Spark, in the form of an RDD
(Resilient Distributed Dataset) (or Pair RDD
to be precise) that can read data from Elasticsearch. The RDD
is offered in two flavors: one for Scala (which returns the data as Tuple2
with Scala collections) and one for Java (which returns the data as Tuple2
containing java.util
collections).
Whenever possible, consider using the native integration as it offers the best performance and maximum flexibility.
Configuration
editTo configure elasticsearch-hadoop for Apache Spark, one can set the various properties described in the Configuration chapter in the SparkConf
object:
import org.apache.spark.SparkConf val conf = new SparkConf().setAppName(appName).setMaster(master) conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); conf.set("es.index.auto.create", "true");
Command-lineFor those that want to set the properties through the command-line (either directly or by loading them from a file), note that Spark only accepts those that start with the "spark." prefix and will ignore the rest (and depending on the version a warning might be thrown). To work around this limitation, define the elasticsearch-hadoop properties by appending the spark.
prefix (thus they become spark.es.
) and elasticsearch-hadoop will automatically resolve them:
Writing data to Elasticsearch
editWith elasticsearch-hadoop, any RDD
can be saved to Elasticsearch as long as its content can be translated into documents. In practice this means the RDD
type needs to be a Map
(whether a Scala or a Java one), a JavaBean
or a Scala case class. When that is not the case, one can easily transform the data
in Spark or plug-in their own custom ValueWriter
.
Scala
editWhen using Scala, simply import the org.elasticsearch.spark
package which, through the pimp my library pattern, enriches the any RDD
API with saveToEs
methods:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") sc.makeRDD( Seq(numbers, airports) ).saveToEs("spark/docs")
Spark Scala imports |
|
elasticsearch-hadoop Scala imports |
|
start Spark through its Scala API |
|
|
|
index the content (namely the two documents (numbers and airports)) in Elasticsearch under |
Scala users might be tempted to use Seq
and the →
notation for declaring root objects (that is the JSON document) instead of using a Map
. While similar, the first notation results in slightly different types that cannot be matched to a JSON document: Seq
is an order sequence (in other words a list) while →
creates a Tuple
which is more or less an ordered, fixed number of elements. As such, a list of lists cannot be used as a document since it cannot be mapped to a JSON object; however it can be used freely within one. Hence why in the example above Map(k→v)
was used instead of Seq(k→v)
As an alternative to the implicit import above, one can use elasticsearch-hadoop Spark support in Scala through EsSpark
in the org.elasticsearch.spark.rdd
package which acts as a utility class allowing explicit method invocations. Additionally instead of Map
s (which are convenient but require one mapping per instance due to their difference in structure), use a case class :
import org.apache.spark.SparkContext import org.elasticsearch.spark.rdd.EsSpark // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) EsSpark.saveToEs(rdd, "spark/docs")
|
|
Define a case class named |
|
Create an |
|
Index the |
For cases where the id (or other metadata fields like ttl
or timestamp
) of the document needs to be specified, one can do so by setting the appropriate mapping namely es.mapping.id
. Following the previous example, to indicate to Elasticsearch to use the field id
as the document id, update the RDD
configuration (it is also possible to set the property on the SparkConf
though due to its global effect it is discouraged):
EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
Java
editJava users have a dedicated class that provides a similar functionality to EsSpark
, namely JavaEsSpark
in the org.elasticsearch.spark.rdd.api.java
(a package similar to Spark’s Java API):
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); JavaEsSpark.saveToEs(javaRDD, "spark/docs");
Spark Java imports |
|
elasticsearch-hadoop Java imports |
|
start Spark through its Java API |
|
to simplify the example, use Guava(a dependency of Spark) |
|
create a simple |
|
index the content (namely the two documents (numbers and airports)) in Elasticsearch under |
The code can be further simplified by using Java 5 static imports. Additionally, the Map
(who’s mapping is dynamic due to its loose structure) can be replaced with a JavaBean
:
public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize( ImmutableList.of(upcoming, lastWeek)); saveToEs(javaRDD, "spark/docs");
statically import |
|
define an |
|
call |
Setting the document id (or other metadata fields like ttl
or timestamp
) is similar to its Scala counterpart, though potentially a bit more verbose depending on whether you are using the JDK classes or some other utilities (like Guava):
JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
Writing existing JSON to Elasticsearch
editFor cases where the data in the RDD
is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. As such, in this case, elasticsearch-hadoop expects either an RDD
containing String
or byte arrays (byte[]
/Array[Byte]
), assuming each entry represents a JSON document. If the RDD
does not have the proper signature, the saveJsonToEs
methods cannot be applied (in Scala they will not be available).
Scala
editval json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" new SparkContext(conf).makeRDD(Seq(json1, json2)) .saveJsonToEs("spark/json-trips")
example of an entry within the |
|
index the JSON data through the dedicated |
Java
editString json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");
example of an entry within the |
|
notice the |
|
index the JSON data through the dedicated |
Writing to dynamic/multi-resources
editFor cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the es.resource.write
field which accepts a pattern that is resolved from the document content, at runtime. Following the aforementioned media example, one could configure it as follows:
Scala
editval game = Map( "media_type"->"game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")
Document key used for splitting the data. Any field can be declared (but make sure it is available in all documents) |
|
Save each object based on its resource pattern, in this example based on |
For each document/object about to be written, elasticsearch-hadoop will extract the media_type
field and use its value to determine the target resource.
Java
editAs expected, things in Java are strikingly similar:
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); saveToEs(javaRDD, "my-collection-{media_type}/doc");
Handling document metadata
editElasticsearch allows each document to have its own metadata. As explained above, through the various mapping options one can customize these parameters so that their values are extracted from their belonging document. Further more, one can even include/exclude what parts of the data are sent back to Elasticsearch. In Spark, elasticsearch-hadoop extends this functionality allowing metadata to be supplied outside the document itself through the use of pair RDD
s.
In other words, for RDD
s containing a key-value tuple, the metadata can be extracted from the key and the value used as the document source.
The metadata is described through the Metadata
Java enum within org.elasticsearch.spark.rdd
package which identifies its type - id
, ttl
, version
, etc…
Thus an RDD
keys can be a Map
containing the Metadata
for each document and its associated values. If RDD
key is not of type Map
, elasticsearch-hadoop will consider the object as representing the document id and use it accordingly.
This sounds more complicated than it is, so let us see some examples.
Scala
editPair RDD
s, or simply put RDD
s with the signature RDD[(K,V)]
can take advantage of the saveToEsWithMeta
methods that are available either through the implicit import of org.elasticsearch.spark
package or EsSpark
object.
To manually specify the id for each document, simply pass in the Object
(not of type Map
) in your RDD
:
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")
|
|
The key of each tuple within the |
|
Since |
When more than just the id needs to be specified, one should use a scala.collection.Map
with keys of type org.elasticsearch.spark.rdd.Metadata
:
import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")
Import the |
|
The metadata used for |
|
The metadata used for |
|
The metadata used for |
|
The metadata and the documents are assembled into a pair |
|
The |
Java
editIn a similar fashion, on the Java side, JavaEsSpark
provides saveToEsWithMeta
methods that are applied to JavaPairRDD
(the equivalent in Java of RDD[(K,V)]
). Thus to save documents based on their ids one can use:
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
Create a |
|
Tuple for the first document wrapped around the id ( |
|
Tuple for the second document wrapped around the id ( |
|
The |
When more than just the id needs to be specified, one can choose to use a java.util.Map
populated with keys of type org.elasticsearch.spark.rdd.Metadata
:
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);
|
|
static import for the |
|
Metadata for |
|
Metadata for |
|
Tuple between |
|
Tuple associating |
|
|
Reading data from Elasticsearch
editFor reading, one should define the Elasticsearch RDD
that streams data from Elasticsearch to Spark.
Scala
editSimilar to writing, the org.elasticsearch.spark
package, enriches the SparkContext
API with esRDD
methods:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val RDD = sc.esRDD("radio/artists")
Spark Scala imports |
|
elasticsearch-hadoop Scala imports |
|
start Spark through its Scala API |
|
a dedicated |
The method can be overloaded to specify an additional query or even a configuration Map
(overriding SparkConf
):
... import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) sc.esRDD("radio/artists", "?q=me*")
The documents from Elasticsearch are returned, by default, as a Tuple2
containing as the first element the document id and the second element the actual document represented through Scala collections, namely one `Map[String, Any]`where the keys represent the field names and the value their respective values.
Java
editJava users have a dedicated JavaPairRDD
that works the same as its Scala counterpart however the returned Tuple2
values (or second element) returns the documents as native, java.util
collections.
import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");
Spark Java imports |
|
elasticsearch-hadoop Java imports |
|
start Spark through its Java API |
|
a dedicated |
In a similar fashion one can use the overloaded esRDD
methods to specify a query or pass a Map
object for advanced configuration.
Let us see how this looks, but this time around using Java static imports. Further more, let us discard the documents ids and retrieve only the RDD
values:
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*; ... JavaRDD<Map<String, Object>> rdd = esRDD(jsc, "radio/artists", "?q=me*") .values();
statically import |
|
create an |
|
return only values of the |
By using the JavaEsSpark
API, one gets a hold of Spark’s dedicated JavaPairRDD
which are better suited in Java environments than the base RDD
(due to its Scala
signatures). Moreover, the dedicated RDD
returns Elasticsearch documents as proper Java collections so one does not have to deal with Scala collections (which
is typically the case with RDD
s). This is particularly powerful when using Java 8, which we strongly recommend as its
lambda expressions make collection processing extremely concise.
To wit, let us assume one wants to filter the documents from the RDD
and return only those that contain a value that contains mega
(please ignore the fact one can and should do the filtering directly through Elasticsearch).
In versions prior to Java 8, the code would look something like this:
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter( new Function<Map<String, Object>, Boolean>() { @Override public Boolean call(Map<String, Object> map) throws Exception { returns map.contains("mega"); } });
with Java 8, the filtering becomes a one liner:
JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc -> doc.contains("mega"));
Reading data in JSON format
editIn case where the results from Elasticsearch need to be in JSON format (typically to be sent down the wire to some other system), one can use the dedicated esJsonRDD
methods. In this case, the connector will return the documents content as it is received from Elasticsearch without any processing as an RDD[(String, String)]
in Scala or JavaPairRDD[String, String]
in Java with the keys representing the document id and the value its actual content in JSON format.
Type conversion
editelasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch types (and back) as shown in the table below:
Table 5. Scala Types Conversion Table
Scala type | Elasticsearch type |
---|---|
|
|
|
|
|
empty |
|
|
|
|
|
|
case class |
|
|
|
in addition, the following implied conversion applies for Java types:
Table 6. Java Types Conversion Table
Java type | Elasticsearch type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Java Bean |
|
The conversion is done as a best effort; built-in Java and Scala types are guaranteed to be properly converted, however there are no guarantees for user types whether in Java or Scala. As mentioned in the tables above, when a case
class is encountered in Scala or JavaBean
in Java, the converters will try to unwrap
its content and save it as an object
. Note this works only for top-level user objects - if the user object has other user objects nested in, the conversion is likely to fail since the converter does not perform nested unwrapping
.
This is done on purpose since the converter has to serialize and deserialize the data and user types introduce ambiguity due to data loss; this can be addressed through some type of mapping however that takes the project way too close to the realm of ORMs and arguably introduces too much complexity for little to no gain; thanks to the processing functionality in Spark and the plugability in elasticsearch-hadoop one can easily transform objects into other types, if needed with minimal effort and maximum control.
Geo typesIt is worth mentioning that rich data types available only in Elasticsearch, such as GeoPoint
or GeoShape
are supported by converting their structure into the primitives available in the table above.
For example, based on its storage a geo_point
might be returned as a String
or a Traversable
.
Spark Streaming support
editAdded in 5.0.
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. |
||
-- Spark website |
Spark Streaming is an extension on top of the core Spark functionality that allows near real time processing of stream data. Spark Streaming works around the idea of DStream
s, or Discretized Streams. DStreams
operate by collecting newly arrived records into a small RDD
and executing it. This repeats every few seconds with a new RDD
in a process called microbatching. The DStream
api includes many of the same processing operations as the RDD
api, plus a few other streaming specific methods. elasticsearch-hadoop provides native integration with Spark Streaming as of version 5.0.
When using the elasticsearch-hadoop Spark Streaming support, Elasticsearch can be targeted as an output location to index data into from a Spark Streaming job in the same way that one might persist the results from an RDD
. Though, unlike RDD
s, you are unable to read data out of Elasticsearch using a DStream
due to the continuous nature of it.
Spark Streaming support provides special optimizations to allow for conservation of network resources on Spark executors when running jobs with very small processing windows. For this reason, one should prefer to use this integration instead of invoking saveToEs
on RDD
s returned from the foreachRDD
call on DStream
.
Writing DStream
to Elasticsearch
editLike RDD
s, any DStream
can be saved to Elasticsearch as long as its content can be translated into documents. In practice this means the DStream
type needs to be a Map
(either a Scala or a Java one), a JavaBean
or a Scala case class. When that is not the case, one can easily transform the data
in Spark or plug-in their own custom ValueWriter
.
Scala
editWhen using Scala, simply import the org.elasticsearch.spark.streaming
package which, through the pimp my library pattern, enriches the DStream
API with saveToEs
methods:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.elasticsearch.spark.streaming._ ... val conf = ... val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") val rdd = sc.makeRDD(Seq(numbers, airports)) val microbatches = mutable.Queue(rdd) ssc.queueStream(microbatches).saveToEs("spark/docs") ssc.start() ssc.awaitTermination()
Spark and Spark Streaming Scala imports |
|
elasticsearch-hadoop Spark Streaming imports |
|
start Spark through its Scala API |
|
start SparkStreaming context by passing it the SparkContext. The microbatches will be processed every second. |
|
|
|
Create a |
|
Start the spark Streaming Job and wait for it to eventually finish. |
As an alternative to the implicit import above, one can use elasticsearch-hadoop Spark Streaming support in Scala through EsSparkStreaming
in the org.elasticsearch.spark.streaming
package which acts as a utility class allowing explicit method invocations. Additionally instead of Map
s (which are convenient but require one mapping per instance due to their difference in structure), use a case class :
import org.apache.spark.SparkContext import org.elasticsearch.spark.streaming.EsSparkStreaming // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) val microbatches = mutable.Queue(rdd) val dstream = ssc.queueStream(microbatches) EsSparkStreaming.saveToEs(dstream, "spark/docs") ssc.start()
|
|
Define a case class named |
|
Create a |
|
Configure the |
|
Start the streaming process |
Once a SparkStreamingContext is started, no new DStream
s can be added or configured. Once a context has stopped, it cannot be restarted. There can only be one active SparkStreamingContext at a time per JVM. Also note that when stopping a SparkStreamingContext programmatically, it stops the underlying SparkContext unless instructed not to.
For cases where the id (or other metadata fields like ttl
or timestamp
) of the document needs to be specified, one can do so by setting the appropriate mapping namely es.mapping.id
. Following the previous example, to indicate to Elasticsearch to use the field id
as the document id, update the DStream
configuration (it is also possible to set the property on the SparkConf
though due to its global effect it is discouraged):
EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
Java
editJava users have a dedicated class that provides a similar functionality to EsSparkStreaming
, namely JavaEsSparkStreaming
in the package org.elasticsearch.spark.streaming.api.java
(a package similar to Spark’s Java API):
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1)); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>(); microbatches.add(javaRDD); JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs"); jssc.start()
Spark and Spark Streaming Java imports |
|
elasticsearch-hadoop Java imports |
|
start Spark and Spark Streaming through its Java API. The microbatches will be processed every second. |
|
to simplify the example, use Guava(a dependency of Spark) |
|
create a simple |
|
index the content (namely the two documents (numbers and airports)) in Elasticsearch under |
|
execute the streaming job. |
The code can be further simplified by using Java 5 static imports. Additionally, the Map
(who’s mapping is dynamic due to its loose structure) can be replaced with a JavaBean
:
public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSparkStreaming; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize(ImmutableList.of(upcoming, lastWeek)); Queue<JavaRDD<TripBean>> microbatches = new LinkedList<JavaRDD<TripBean>>(); microbatches.add(javaRDD); JavaDStream<TripBean> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "spark/docs"); jssc.start()
statically import |
|
define a |
|
call |
|
run that Streaming job |
Setting the document id (or other metadata fields like ttl
or timestamp
) is similar to its Scala counterpart, though potentially a bit more verbose depending on whether you are using the JDK classes or some other utilities (like Guava):
JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));
Writing Existing JSON to Elasticsearch
editFor cases where the data being streamed by the DStream
is already serialized as JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. As such, in this case, elasticsearch-hadoop expects either a DStream
containing String
or byte arrays (byte[]
/Array[Byte]
), assuming each entry represents a JSON document. If the DStream
does not have the proper signature, the saveJsonToEs
methods cannot be applied (in Scala they will not be available).
Scala
editval json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val rdd = sc.makeRDD(Seq(json1, json2)) val microbatch = mutable.Queue(rdd) ssc.queueStream(microbatch).saveJsonToEs("spark/json-trips") ssc.start()
example of an entry within the |
|
configure the stream to index the JSON data through the dedicated |
|
start the streaming job |
Java
editString json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaStreamingContext jssc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>(); microbatches.add(stringRDD); JavaDStream<String> stringDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips"); jssc.start()
example of an entry within the |
|
creating an |
|
notice the |
|
configure stream to index the JSON data through the dedicated |
|
launch stream job |
Writing to dynamic/multi-resources
editFor cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the es.resource.write
field which accepts a pattern that is resolved from the document content, at runtime. Following the aforementioned media example, one could configure it as follows:
Scala
editval game = Map( "media_type" -> "game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") val batch = sc.makeRDD(Seq(game, book, cd)) val microbatches = mutable.Queue(batch) ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc") ssc.start()
Document key used for splitting the data. Any field can be declared (but make sure it is available in all documents) |
|
Save each object based on its resource pattern, in this example based on |
For each document/object about to be written, elasticsearch-hadoop will extract the media_type
field and use its value to determine the target resource.
Java
editAs expected, things in Java are strikingly similar:
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); Queue<JavaRDD<Map<String, ?>>> microbatches = ... JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "my-collection-{media_type}/doc"); jssc.start();
Handling document metadata
editElasticsearch allows each document to have its own metadata. As explained above, through the various mapping options one can customize these parameters so that their values are extracted from their belonging document. Further more, one can even include/exclude what parts of the data are sent back to Elasticsearch. In Spark, elasticsearch-hadoop extends this functionality allowing metadata to be supplied outside the document itself through the use of pair RDD
s.
This is no different in Spark Streaming. For DStreams
s containing a key-value tuple, the metadata can be extracted from the key and the value used as the document source.
The metadata is described through the Metadata
Java enum within org.elasticsearch.spark.rdd
package which identifies its type - id
, ttl
, version
, etc…
Thus a DStream
's keys can be a Map
containing the Metadata
for each document and its associated values. If the DStream
key is not of type Map
, elasticsearch-hadoop will consider the object as representing the document id and use it accordingly.
This sounds more complicated than it is, so let us see some examples.
Scala
editPair DStreams
s, or simply put DStreams
s with the signature DStream[(K,V)]
can take advantage of the saveToEsWithMeta
methods that are available either through the implicit import of org.elasticsearch.spark.streaming
package or EsSparkStreaming
object.
To manually specify the id for each document, simply pass in the Object
(not of type Map
) in your DStream
:
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()
|
|
The key of each tuple within the |
|
We construct a |
|
Since the resulting |
When more than just the id needs to be specified, one should use a scala.collection.Map
with keys of type org.elasticsearch.spark.rdd.Metadata
:
import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()
Import the |
|
The metadata used for |
|
The metadata used for |
|
The metadata used for |
|
The metadata and the documents are assembled into a pair |
|
The |
|
The |
Java
editIn a similar fashion, on the Java side, JavaEsSparkStreaming
provides saveToEsWithMeta
methods that are applied to JavaPairDStream
(the equivalent in Java of DStream[(K,V)]
).
This tends to involve a little more work due to the Java API’s limitations. For instance, you cannot create a JavaPairDStream
directly from a queue of JavaPairRDD
s. Instead, you must create a regular JavaDStream
of Tuple2
objects and convert the JavaDStream
into a JavaPairDStream
. This sounds complex, but it’s a simple work around for a limitation of the API.
First, we’ll create a pair function, that takes a Tuple2
object in, and returns it right back to the framework:
public static class ExtractTuples implements PairFunction<Tuple2<Object, Object>, Object, Object>, Serializable { @Override public Tuple2<Object, Object> call(Tuple2<Object, Object> tuple2) throws Exception { return tuple2; } }
Then we’ll apply the pair function to a JavaDStream
of Tuple2
s to create a JavaPairDStream
and save it:
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... JavaStreamingContext jssc = ... // create an RDD of between the id and the docs JavaRDD<Tuple2<?, ?>> rdd = jsc.parallelize( ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()); JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();
Create a regular |
|
Tuple for the first document wrapped around the id ( |
|
Tuple for the second document wrapped around the id ( |
|
Assemble a regular |
|
Transform the |
|
The |
When more than just the id needs to be specified, one can choose to use a java.util.Map
populated with keys of type org.elasticsearch.spark.rdd.Metadata
. We’ll use the same typing trick to repack the JavaDStream
as a JavaPairDStream
:
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaRDD<Tuple2<?, ?>> pairRdd = jsc.parallelize<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()) JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();
|
|
static import for the |
|
Metadata for |
|
Metadata for |
|
Tuple between |
|
Tuple associating |
|
Create a |
|
Repack the |
|
|
Spark Streaming Type Conversion
editThe elasticsearch-hadoop Spark Streaming support leverages the same type mapping as the regular Spark type mapping. The mappings are repeated here for consistency:
Table 7. Scala Types Conversion Table
Scala type | Elasticsearch type |
---|---|
|
|
|
|
|
empty |
|
|
|
|
|
|
case class |
|
|
|
in addition, the following implied conversion applies for Java types:
Table 8. Java Types Conversion Table
Java type | Elasticsearch type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Java Bean |
|
Geo typesIt is worth re-mentioning that rich data types available only in Elasticsearch, such as GeoPoint
or GeoShape
are supported by converting their structure into the primitives available in the table above.
For example, based on its storage a geo_point
might be returned as a String
or a Traversable
.
Spark SQL support
editAdded in 2.1.
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine. |
||
-- Spark website |
On top of the core Spark support, elasticsearch-hadoop also provides integration with Spark SQL. In other words, Elasticsearch becomes a native source for Spark SQL so that data can be indexed and queried from Spark SQL transparently.
Spark SQL works with structured data - in other words, all entries are expected to have the same structure (same number of fields, of the same type and name). Using unstructured data (documents with different
structures) is not supported and will cause problems. For such cases, use PairRDD
s.
Supported Spark SQL versions
editSpark SQL while becoming a mature component, is still going through significant changes between releases. Spark SQL became a stable component in version 1.3, however it is not backwards compatible with the previous releases. Further more Spark 2.0 introduced significant changed which broke backwards compatibility, through
the Dataset
API.
elasticsearch-hadoop supports both version Spark SQL 1.3-1.6 and Spark SQL 2.0 through two different jars:
elasticsearch-spark-1.x-<version>.jar
and elasticsearch-hadoop-<version>.jar
support Spark SQL 1.3-1.6 (or higher) while elasticsearch-spark-2.0-<version>.jar
supports Spark SQL 2.0.
In other words, unless you are using Spark 2.0, use elasticsearch-spark-1.x-<version>.jar
Spark SQL support is available under org.elasticsearch.spark.sql
package.
API differencesFrom the elasticsearch-hadoop user perspectives, the differences between Spark SQL 1.3-1.6 and Spark 2.0 are fairly consolidated. This document describes at length the differences which are briefly mentioned below:
-
DataFrame
vsDataset
-
The core unit of Spark SQL in 1.3+ is a
DataFrame
. This API remains in Spark 2.0 however underneath it is based on aDataset
- Unified API vs dedicated Java/Scala APIs
-
In Spark SQL 2.0, the APIs are further unified by introducing
SparkSession
and by using the same backing code for both `Dataset`s, `DataFrame`s and `RDD`s.
As conceptually, a DataFrame
is a Dataset[Row]
, the documentation below will focus on Spark SQL 1.3-1.6.
Writing DataFrame
(Spark SQL 1.3+) to Elasticsearch
editWith elasticsearch-hadoop, DataFrame
s (or any Dataset
for that matter) can be indexed to Elasticsearch.
Scala
editIn Scala, simply import org.elasticsearch.spark.sql
package which enriches the given DataFrame
class with saveToEs
methods; while these have the same signature as the org.elasticsearch.spark
package, they are designed for DataFrame
implementations:
// reusing the example from Spark SQL documentation import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._ import org.elasticsearch.spark.sql._ ... // sc = existing SparkContext val sqlContext = new SQLContext(sc) // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF() people.saveToEs("spark/people")
Spark SQL package import |
|
elasticsearch-hadoop Spark package import |
|
Read a text file as normal |
|
Index the resulting |
By default, elasticsearch-hadoop will ignore null values in favor of not writing any field at all. Since a DataFrame
is meant
to be treated as structured tabular data, you can enable writing nulls as null valued fields for DataFrame
Objects
only by toggling the es.spark.dataframe.write.null
setting to true
.
Java
editIn a similar fashion, for Java usage the dedicated package org.elasticsearch.spark.sql.api.java
provides similar functionality through the JavaEsSpark SQL
:
import org.apache.spark.sql.api.java.*; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... DataFrame people = ... JavaEsSparkSQL.saveToEs(people, "spark/people");
Spark SQL Java imports |
|
elasticsearch-hadoop Spark SQL Java imports |
|
index the |
Again, with Java 5 static imports this can be further simplied to:
For maximum control over the mapping of your DataFrame
in Elasticsearch, it is highly recommended to create the mapping before hand. See this chapter for more information.
Writing existing JSON to Elasticsearch
editWhen using Spark SQL, if the input data is in JSON format, simply convert it to a DataFrame
(in Spark SQL 1.3) or a Dataset
(for Spark SQL 2.0) (as described in Spark documentation) through SQLContext
/JavaSQLContext
jsonFile
methods.
Using pure SQL to read from Elasticsearch
editThe index and its mapping, have to exist prior to creating the temporary table
Spark SQL 1.2 introduced a new API for reading from external data sources, which is supported by elasticsearch-hadoop simplifying the SQL configured needed for interacting with Elasticsearch. Further more, behind the scenes it understands the operations executed by Spark and thus can optimize the data and queries made (such as filtering or pruning), improving performance.
Data Sources in Spark SQL
editWhen using Spark SQL, elasticsearch-hadoop allows access to Elasticsearch through SQLContext
load
method. In other words, to create a DataFrame
/Dataset
backed by Elasticsearch in a declarative manner:
val sql = new SQLContext... // Spark 1.3 style val df = sql.load( "spark/index", "org.elasticsearch.spark.sql")
|
|
path or resource to load - in this case the index/type in Elasticsearch |
|
the data source provider - |
In Spark 1.4, one would use the following similar API calls:
|
|
the data source provider - |
|
path or resource to load - in this case the index/type in Elasticsearch |
In Spark 1.5, this can be further simplified to:
Whatever API is used, once created, the DataFrame
can be accessed freely to manipulate the data.
The sources declaration also allows specific options to be passed in, namely:
Name | Default value | Description |
---|---|---|
|
required |
Elasticsearch index/type |
|
|
Whether to translate (push-down) Spark SQL into Elasticsearch Query DSL |
|
|
Whether to use exact (not analyzed) matching or not (analyzed) |
Usable in Spark 1.6 or higher |
||
|
|
Whether to tell Spark apply its own filtering on the filters pushed down |
Both options are explained in the next section.
To specify the options (including the generic elasticsearch-hadoop ones), one simply passes a Map
to the aforementioned methods:
For example:
val sql = new SQLContext... // options for Spark 1.3 need to include the target path/resource val options13 = Map("path" -> "spark/index", "pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.3 style val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) // options for Spark 1.4 - the path/resource is specified separately val options = Map("pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.4 style val spark14DF = sql.read.format("org.elasticsearch.spark.sql") .options(options) .load("spark/index")
|
|
|
|
pass the options when definition/loading the source |
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', nodes 'someNode')" ) "
Spark’s temporary table name |
|
|
|
elasticsearch-hadoop configuration options, the mandatory one being |
Do note that due to the SQL parser, the .
(among other common characters used for delimiting) is not allowed; the connector tries to work around it by append the es.
prefix automatically however this works only for specifying the configuration options with only one .
(like es.nodes
above). Because of this, if properties with multiple .
are needed, one should use the SQLContext.load
or SQLContext.read
methods above and pass the properties as a Map
.
Push-Down operations
editAn important hidden feature of using elasticsearch-hadoop as a Spark source
is that the connector understand the operations performed within the DataFrame
/SQL and, by default, will translate them into the appropriate QueryDSL. In other words, the connector pushes down the operations directly at the source, where the data is efficiently filtered out so that only the required data is streamed back to Spark.
This significantly increases the queries performance and minimizes the CPU, memory and I/O on both Spark and Elasticsearch clusters as only the needed data is returned (as oppose to returning the data in bulk only to be processed and discarded by Spark).
Note the push down operations apply even when one specifies a query - the connector will enhance it according to the specified SQL.
As a side note, elasticsearch-hadoop supports all the `Filter`s available in Spark (1.3.0 and higher) while retaining backwards binary-compatibility with Spark 1.3.0, pushing down to full extent the SQL operations to Elasticsearch without any user interference.
Operators those have been optimized as pushdown filters:
SQL syntax | ES 1.x/2.x syntax | ES 5.x syntax |
---|---|---|
= null , is_null |
missing |
must_not.exists |
= (strict) |
term |
term |
= (not strict) |
match |
match |
> , < , >= , ⇐ |
range |
range |
is_not_null |
exists |
exists |
in (strict) |
terms |
terms |
in (not strict) |
or.filters |
bool.should |
and |
and.filters |
bool.filter |
or |
or.filters |
bool.should [bool.filter] |
not |
not.filter |
bool.must_not |
StringStartsWith |
wildcard(arg*) |
wildcard(arg*) |
StringEndsWith |
wildcard(*arg) |
wildcard(*arg) |
StringContains |
wildcard(*arg*) |
wildcard(*arg*) |
EqualNullSafe (strict) |
term |
term |
EqualNullSafe (not strict) |
match |
match |
To wit, consider the following Spark SQL:
// as a DataFrame val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips") df.printSchema() // root //|-- departure: string (nullable = true) //|-- arrival: string (nullable = true) //|-- days: long (nullable = true) val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))
or in pure SQL:
CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips") SELECT departure FROM trips WHERE arrival = "OTP" and days > 3
The connector translates the query into:
{ "query" : { "filtered" : { "query" : { "match_all" : {} }, "filter" : { "and" : [{ "query" : { "match" : { "arrival" : "OTP" } } }, { "days" : { "gt" : 3 } } ] } } } }
Further more, the pushdown filters can work on analyzed
terms (the default) or can be configured to be strict and provide exact
matches (work only on not-analyzed
fields). Unless one manually specifies the mapping, it is highly recommended to leave the defaults as they are. This and other topics are discussed at length in the Elasticsearch Reference Documentation.
Note that double.filtering
, available since elasticsearch-hadoop 2.2 for Spark 1.6 or higher, allows filters that are already pushed down to Elasticsearch to be processed/evaluated by Spark as well (default) or not. Turning this feature off, especially when dealing with large data sizes speed things up. However one should pay attention to the semantics as turning this off, might return different results (depending on how the data is indexed, analyzed
vs not_analyzed
). In general, when turning strict on, one can disable double.filtering
as well.
Data Sources as tables
editAvailable since Spark SQL 1.2, one can also access a data source by declaring it as a Spark temporary table (backed by elasticsearch-hadoop):
sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', " + "scroll_size '20')" )
Spark’s temporary table name |
|
|
|
elasticsearch-hadoop configuration options, the mandatory one being |
|
Since using |
Once defined, the schema is picked up automatically. So one can issue queries, right away:
val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")
As elasticsearch-hadoop is aware of the queries being made, it can optimize the requests done to Elasticsearch. For example, given the following query:
val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")
it knows only the name
and id
fields are required (the first to be returned to the user, the second for Spark’s internal filtering) and thus will ask only for this data, making the queries quite efficient.
Reading DataFrame
s (Spark SQL 1.3) from Elasticsearch
editAs you might have guessed, one can define a DataFrame
backed by Elasticsearch documents. Or even better, have them backed by a query result, effectively creating dynamic, real-time views over your data.
Scala
editThrough the org.elasticsearch.spark.sql
package, esDF
methods are available on the SQLContext
API:
import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ ... val sql = new SQLContext(sc) val people = sql.esDF("spark/people") // check the associated schema println(people.schema.treeString) // root // |-- name: string (nullable = true) // |-- surname: string (nullable = true) // |-- age: long (nullable = true)
Spark SQL Scala imports |
|
elasticsearch-hadoop SQL Scala imports |
|
create a |
|
the |
|
notice how the |
And just as with the Spark core support, additional parameters can be specified such as a query. This is quite a powerful concept as one can filter the data at the source (Elasticsearch) and use Spark only on the results:
Controlling the DataFrame
schemaIn some cases, especially when the index in Elasticsearch contains a lot of fields, it is desireable to create a DataFrame
that contains only a subset of them. While one can modify the DataFrame
(by working on its backing RDD
) through the official Spark API or through dedicated queries, elasticsearch-hadoop allows the user to specify what fields to include and exclude from Elasticsearch when creating the DataFrame
.
Through es.read.field.include
and es.read.field.exclude
properties, one can indicate what fields to include or exclude from the index mapping. The syntax is similar to that of Elasticsearch include/exclude. Multiple values can be specified by using a comma. By default, no value is specified meaning all properties/fields are included and no properties/fields are excluded.
For example:
# include es.read.field.include = *name, address.* # exclude es.read.field.exclude = *.created
Due to the way SparkSQL works with a DataFrame
schema, elasticsearch-hadoop needs to be aware of what fields are returned from Elasticsearch before executing the actual queries. While one can restrict the fields manually through the underlying Elasticsearch query, elasticsearch-hadoop is unaware of this and the results are likely to be different or worse, errors will occur. Use the properties above instead, which Elasticsearch will properly use alongside the user query.
Java
editFor Java users, a dedicated API exists through JavaEsSpark SQL
. It is strikingly similar to EsSpark SQL
however it allows configuration options to be passed in through Java collections instead of Scala ones; other than that using the two is exactly the same:
import org.apache.spark.sql.api.java.JavaSQLContext; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... SQLContext sql = new SQLContext(sc); DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");
Spark SQL import |
|
elasticsearch-hadoop import |
|
create a Java |
Better yet, the DataFrame
can be backed by a query result:
Spark SQL Type conversion
editelasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch types (and back) as shown in the table below:
While Spark SQL DataType
s have an equivalent in both Scala and Java and thus the RDD conversion can apply, there are slightly different semantics - in particular with the java.sql
types due to the way Spark SQL handles them:
Table 9. Spark SQL 1.3+ Conversion Table
Spark SQL DataType |
Elasticsearch type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Geo Types Conversion TableIn addition to the table above, for Spark SQL 1.3 or higher, elasticsearch-hadoop performs automatic schema detection for geo types, namely Elasticsearch geo_point
and geo_shape
.
Since each type allows multiple formats (geo_point
accepts latitude and longitude to be specified in 4 different ways, while geo_shape
allows a variety of types (currently 9)) and the mapping does not provide such information, elasticsearch-hadoop will sample the determined geo fields at startup and retrieve an arbitrary document that contains all the relevant fields; it will parse it and thus determine the necessary schema (so for example it can tell whether a geo_point
is
specified as a StringType
or as an ArrayType
).
Since Spark SQL is strongly-typed, each geo field needs to have the same format across all documents. Shy of that, the returned data will not fit the detected schema and thus lead to errors.
Spark Structured Streaming support
editAdded in 6.0.
Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming. |
||
-- Spark documentation |
Released as an experimental feature in Spark 2.0, Spark Structured Streaming provides a unified streaming and batch interface built into the Spark SQL integration. As of elasticsearch-hadoop 6.0, we provide native functionality to index streaming data into Elasticsearch.
Like Spark SQL, Structured Streaming works with structured data. All entries are expected to have the
same structure (same number of fields, of the same type and name). Using unstructured data (documents with different
structures) is not supported and will cause problems. For such cases, use DStream
s.
Supported Spark Structured Streaming versions
editSpark Structured Streaming is considered generally available as of Spark v2.2.0. As such, elasticsearch-hadoop support for Structured Streaming (available in elasticsearch-hadoop 6.0+) is only compatible with Spark versions 2.2.0 and onward. Similar to Spark SQL before it, Structured Streaming may be subject to significant changes between releases before its interfaces are considered stable.
Spark Structured Streaming support is available under the org.elasticsearch.spark.sql
and
org.elasticsearch.spark.sql.streaming
packages. It shares a unified interface with Spark SQL in the form of the
Dataset[_]
api. Clients can interact with streaming Dataset
s in almost exactly the same way as regular batch
Dataset
s with only a
few exceptions.
Writing Streaming Datasets
(Spark SQL 2.0+) to Elasticsearch
editWith elasticsearch-hadoop, Stream-backed Dataset
s can be indexed to Elasticsearch.
Scala
editIn Scala, to save your streaming based Dataset
s and DataFrame
s to Elasticsearch, simply configure the stream to
write out using the "es"
format, like so:
import org.apache.spark.sql.SparkSession ... val spark = SparkSession.builder() .appName("EsStreamingExample") .getOrCreate() // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = spark.readStream .textFile("/path/to/people/files/*") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) people.writeStream .option("checkpointLocation", "/save/location") .format("es") .start("spark/people")
Spark SQL import |
|
Create |
|
Instead of calling |
|
Read a directory of text files continuously and convert them into |
|
Provide a location to save the offsets and commit logs for the streaming query |
|
Start the stream using the |
Spark makes no type-based differentiation between batch and streaming based Dataset
s. While you may be
able to import the org.elasticsearch.spark.sql
package to add saveToEs
methods to your Dataset
or
DataFrame
, it will throw an illegal argument exception if those methods are called on streaming based Dataset
s
or DataFrame
s.
Java
editIn a similar fashion, the "es"
format is available for Java usage as well:
import org.apache.spark.sql.SparkSession ... SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate(); // java bean style class public static class PersonBean { private String name; private String surname; private int age; ... } Dataset<PersonBean> people = spark.readStream() .textFile("/path/to/people/files/*") .map(new MapFunction<String, PersonBean>() { @Override public PersonBean call(String value) throws Exception { return someFunctionThatParsesStringToJavaBeans(value.split(",")); } }, Encoders.<PersonBean>bean(PersonBean.class)); people.writeStream() .option("checkpointLocation", "/save/location") .format("es") .start("spark/people");
Spark SQL Java imports. Can use the same session class as Scala |
|
Create SparkSession. Can also use the legacy |
|
We create a java bean class to be used as our data format |
|
Use the |
|
Convert our string data into our PersonBean |
|
Set a place to save the state of our stream |
|
Using the |
Writing existing JSON to Elasticsearch
editWhen using Spark SQL, if the input data is in JSON format, simply convert it to a Dataset
(for Spark SQL 2.0) (as
described in Spark
documentation) through
the DataStreamReader
's json
format.
Sink commit log in Spark Structured Streaming
editSpark Structured Streaming advertises an end-to-end fault-tolerant exactly-once processing model that is made possible through the usage of offset checkpoints and maintaining commit logs for each streaming query. When executing a streaming query, most sources and sinks require you to specify a "checkpointLocation" in order to persist the state of your job. In the event of an interruption, launching a new streaming query with the same checkpoint location will recover the state of the job and pick up where it left off. We maintain a commit log for elasticsearch-hadoop’s Elasticsearch sink implementation in a special directory under the configured checkpoint location:
$> ls /path/to/checkpoint/location metadata offsets/ sinks/ $> ls /path/to/checkpoint/location/sinks elasticsearch/ $> ls /path/to/checkpoint/location/sinks/elasticsearch 12.compact 13 14 15 16 17 18
Each file in the commit log directory corresponds to a batch id that has been committed. The log implementation periodically compacts the logs down to avoid clutter. You can set the location for the log directory a number of ways:
-
Set the explicit log location with
es.spark.sql.streaming.sink.log.path
(see below). -
If that is not set, then the path specified by
checkpointLocation
will be used. -
If that is not set, then a path will be constructed by combining the value of
spark.sql.streaming.checkpointLocation
from the SparkSession with theDataset
's given query name. - If no query name is present, then a random UUID will be used in the above case instead of the query name
-
If none of the above settings are provided then the
start
call will throw an exception
Here is a list of configurations that affect the behavior of Elasticsearch’s commit log:
-
es.spark.sql.streaming.sink.log.enabled
(defaulttrue
) -
Enables or disables the commit log for a streaming job. By default, the log is enabled, and output batches with the
same batch id will be skipped to avoid double-writes. When this is set to
false
, the commit log is disabled, and all outputs will be sent to Elasticsearch, regardless if they have been sent in a previous execution. -
es.spark.sql.streaming.sink.log.path
-
Sets the location to store the log data for this streaming query. If this value is not set, then the Elasticsearch sink will
store its commit logs under the path given in
checkpointLocation
. Any HDFS Client compatible URI is acceptable. -
es.spark.sql.streaming.sink.log.cleanupDelay
(default10m
) - The commit log is managed through Spark’s HDFS Client. Some HDFS compatible filesystems (like Amazon’s S3) propagate file changes in an asynchronous manner. To get around this, after a set of log files have been compacted, the client will wait for this amount of time before cleaning up the old files.
-
es.spark.sql.streaming.sink.log.deletion
(defaulttrue
) -
Determines if the log should delete old logs that are no longer needed. After every batch is committed, the client will
check to see if there are any commit logs that have been compacted and are safe to be removed. If set to
false
, the log will skip this cleanup step, leaving behind a commit file for each batch. -
es.spark.sql.streaming.sink.log.compactInterval
(default10
) - Sets the number of batches to process before compacting the log files. By default, every 10 batches the commit log will be compacted down into a single file that contains all previously committed batch ids.
Spark Structured Streaming Type conversion
editStructured Streaming uses the exact same type conversion rules as the Spark SQL integration.
If automatic index creation is used, please review this section for more information.
elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch types as shown in the table below:
While Spark SQL DataType
s have an
equivalent in both Scala and Java and thus the RDD conversion can apply, there are slightly
different semantics - in particular with the java.sql
types due to the way Spark SQL handles them:
Table 10. Spark SQL 1.3+ Conversion Table
Spark SQL DataType |
Elasticsearch type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Using the Map/Reduce layer
editAnother way of using Spark with Elasticsearch is through the Map/Reduce layer, that is by leveraging the dedicated Input/OuputFormat
in elasticsearch-hadoop. However, unless one is stuck on
elasticsearch-hadoop 2.0, we strongly recommend using the native integration as it offers significantly better performance and flexibility.
Configuration
editThrough elasticsearch-hadoop, Spark can integrate with Elasticsearch through its dedicated InputFormat
, and in case of writing, through OutputFormat
. These are described at length in the Map/Reduce chapter so please refer to that for an in-depth explanation.
In short, one needs to setup a basic Hadoop Configuration
object with the target Elasticsearch cluster and index, potentially a query, and she’s good to go.
From Spark’s perspective, the only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely Writable
s. As such, InputFormat
and OutputFormat
s are required to return Writables
which, out of the box, Spark does not understand.
The good news is, one can easily enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.
SparkConf sc = new SparkConf(); //.setMaster("local"); sc.set("spark.serializer", KryoSerializer.class.getName()); // needed only when using the Java API JavaSparkContext jsc = new JavaSparkContext(sc);
Or if you prefer Scala
Note that the Kryo serialization is used as a work-around for dealing with Writable
types; one can choose to convert the types directly (from Writable
to Serializable
types) - which is fine however for getting started, the one liner above seems to be the most effective.
Reading data from Elasticsearch
editTo read data, simply pass in the org.elasticsearch.hadoop.mr.EsInputFormat
class - since it supports both the old
and the new
Map/Reduce APIs, you are free to use either method on SparkContext
's, hadoopRDD
(which we recommend for conciseness reasons) or newAPIHadoopRDD
. Which ever you chose, stick with it to avoid confusion and problems down the road.
Old (org.apache.hadoop.mapred
) API
editJobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
Create the Hadoop object (use the old API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
The Scala version is below:
val conf = new JobConf() conf.set("es.resource", "radio/artists") conf.set("es.query", "?q=me*") val esRDD = sc.hadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();
Create the Hadoop object (use the old API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
New (org.apache.hadoop.mapreduce
) API
editAs expected, the mapreduce
API version is strikingly similar - replace hadoopRDD
with newAPIHadoopRDD
and JobConf
with Configuration
. That’s about it.
Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();
Create the Hadoop object (use the new API) |
|
Configure the source (index) |
|
Setup the query (optional) |
|
Create a Spark |
The Scala version is below:
Using the connector from PySpark
editThanks to its Map/Reduce layer, elasticsearch-hadoop can be used from PySpark as well to both read and write data to Elasticsearch. To wit, below is a snippet from the Spark documentation (make sure to switch to the Python snippet):
$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
Also, the SQL loader can be used as well:
from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") df.printSchema()