Cascading support
editCascading support
edit
Cascading is a data processing API and processing query planner used for defining, sharing, and executing data-processing workflows on a single computing node or distributed computing cluster. |
||
-- Cascading website |
Cascading abstracts the Map/Reduce API and focuses on data processing
in terms of tuples flowing through pipes between taps,
from input (called SourceTap
) to output (named SinkTap
). As the data flows, various operations are applied to the tuple; the whole system being transformed to Map/Reduce operations at runtime.
With elasticsearch-hadoop, Elasticsearch can be plugged into Cascading flows as a SourceTap
or SinkTap
through EsTap
, data to/from Elasticsearch being transparently converted from/to Cascading `tuple`s.
Installation
editJust like other libraries, elasticsearch-hadoop needs to be available in the jar classpath (either by being manually deployed in the cluster or shipped along with the Hadoop job).
Configuration
editGlobal configuration
editCascading is configured through a Map<Object, Object>
, typically a Properties
object which indicates the various Cascading settings and also the application jar:
Properties props = new Properties(); AppProps.setApplicationJarClass(props, Main.class); FlowConnector flow = new HadoopFlowConnector(props);
elasticsearch-hadoop options can be specified in the same way, these being picked up automatically by all `EsTap`s down the flow:
Properties props = new Properties(); props.setProperty("es.index.auto.create", "false"); ... FlowConnector flow = new HadoopFlowConnector(props);
This approach can be used for local and remote/Hadoop flows - simply use the appropriate FlowConnector
.
Per-Tap
configuration
editIf a flow
contains multiple Elasticsearch taps, the global approach does not work since the settings will clash with each other. For these scenario, elasticsearch-hadoop allows using per-Tap
configuration:
Tap books = new EsTap("es-server", 9200, "my-col/books", "?q=potter"); Tap movies = new EsTap("es-server", 9200, "my-col/movies", "?q=terminator");
Note that the Tap
configuration is merged with the global one so one can mix and match accordingly - for example specify the defaults in the global configuration and only declare the specifics on the Tap
instance. Additionally, for maximum flexibility EsTap
allows multiple arguments to be passed in, including a Properties
object for the full range of options.
Mapping
editBy default, elasticsearch-hadoop uses the Cascading tuple to map the data in Elasticsearch, using both the field names and types in the process. There are cases however when the field names cannot be used directly with Elasticsearch (a common case when working with an existing flow). For such cases, one can use the es.mapping.names
setting which accepts a comma-separated list of mapped names in the following format: Cascading field name
:Elasticsearch field name
To wit:
Properties myTapCfg = new Properties(); myTapCfg.set("es.mapping.names", "date:@timestamp"); Tap myTap = new EsTap(..., myTapCfg);
Since elasticsearch-hadoop 2.1, the connector preserves the tuple names case sensitivity.
Writing data to Elasticsearch
editSimply hook EsTap
into the Cascading flow:
Tap in = new Lfs(new TextDelimited(new Fields("id", "name", "url", "picture")), "/resources/artists.dat"); Tap out = new EsTap("radio/artists", new Fields("name", "url", "picture")); new HadoopFlowConnector().connect(in, out, new Pipe("write-to-Es")).complete();
For cases where the id (or other metadata fields like ttl
or timestamp
) of the document needs to be specified, one can do so by setting the appropriate mapping, namely es.mapping.id
. Following the previous example, to indicate to Elasticsearch to use the field id
as the document id, update the Tap
configuration:
Properties myTapCfg = new Properties(); myTapCfg.set("es.mapping.id", "id");
Writing existing JSON to Elasticsearch
editWhen the job input data is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. In such cases, one needs to indicate the json input by setting
the es.input.json
parameter. As such, in this case elasticsearch-hadoop expects to receive a tuple with a single field (representing the JSON document); the library will recognize Text
or BytesWritable
types otherwise it just
calls toString
to get a hold of the JSON content.
Make sure the data is properly encoded, in UTF-8
. The job output is considered the final form of the document sent to Elasticsearch.
Properties props = new Properties(); ... props.setProperty("es.input.json", "true"); Tap in = new Lfs(new TextLine(new Fields("line")),"/resources/artists.json"); Tap out = new EsTap("json-cascading-local/artists"); FlowConnector flow = new HadoopFlowConnector(props); flow.connect(in, out, new Pipe("import-json")).complete();
Writing to dynamic/multi-resources
editOne can index the data to a different resource, depending on the tuple being read, by using patterns. Reusing the aforementioned media example, one could configure it as follows:
Resource pattern using field |
|
Schema definition associated with the |
For each tuple about to be written, elasticsearch-hadoop will extract the media.type
entry and use its value to determine the target resource.
The functionality is available when dealing with raw JSON as well - in this case, the value will be extracted from the JSON document itself. Assuming the JSON source contains documents with the following structure:
the Tap
declaration can be as follows:
props.setProperty("es.input.json", "true"); Tap in = new Lfs(new TextLine(new Fields("line")),"/archives/collection.json"); Tap out = new EsTap("my-collection-{media_type}/doc", new Fields("line"));
Resource pattern relying on fields within the JSON document and not on the |
|
Schema declaration for the |
Reading data from Elasticsearch
editJust the same, add EsTap
on the other end of a pipe, to read (instead of writing) to it.
Tap in = new EsTap("radio/artists/", "?q=me*"); Tap out = new StdOut(new TextLine()); new LocalFlowConnector().connect(in, out, new Pipe("read-from-Es")).complete();
Type conversion
editDepending on the platform used, Cascading can use internally either Writable
or JDK types for its tuples. Elasticsearch handles both transparently
(see the Map/Reduce conversion section) though we recommend using the same types (if possible) in both cases to avoid the overhead of maintaining two different versions.
If automatic index creation is used, please review this section for more information.
Cascading Lingual
editelasticsearch-hadoop also provides integration with Lingual, a Cascading extension that provides an ANSI SQL interface for Apache Hadoop. That is, one can execute in Hadoop, SQL queries directly on Elasticsearch.
Below is a quick setup of using elasticsearch-hadoop with Lingual (1.1) - for detailed information please refer to the Lingual user guide:
export LINGUAL_PLATFORM=hadoop # register {es} as a provider lingual catalog --init lingual catalog --provider --add ./elasticsearch-hadoop-<version>.jar # add a custom schema (called 'titles') for querying lingual catalog --schema es-test --add lingual catalog --schema es-test --stereotype titles -add \ --columns emp_no,title,from_date,to_date --types int,string,date,date lingual catalog --schema es-test --format es --add --provider es lingual catalog --schema es-test --protocol es --add --provider es \ --properties=host=es-server lingual catalog --schema es-test --table titles --stereotype titles \ -add employees/titles --format es --provider es --protocol es
Once the desired catalog has been declared and elasticsearch-hadoop registered with it, one can start querying the data
lingual shell (shell) select count(*) from "es-test"."titles" where "title" = 'Engineer'; 115003