WARNING: Version 6.1 has passed its EOL date.
This documentation is no longer being maintained and may be removed. If you are running this version, we strongly advise you to upgrade. For the latest information, see the current release documentation.
Configuration
editConfiguration
editelasticsearch-hadoop behavior can be customized through the properties below, typically by setting them on the target job’s Hadoop Configuration
. However some of them can be specified through other means depending on the library used (see the relevant section).
All configuration properties start with the es
prefix. The namespace es.internal
is reserved by the library for its internal use and should not be used by the user at any point.
Required settings
edit-
es.resource
-
Elasticsearch resource location, where data is read and written to. Requires the format
<index>/<type>
(relative to the Elasticsearch host/port (see below))).
es.resource = twitter/tweet # index 'twitter', type 'tweet'
-
es.resource.read
(defaults toes.resource
) - Elasticsearch resource used for reading (but not writing) data. Useful when reading and writing data to different Elasticsearch indices within the same job. Typically set automatically (except for the Map/Reduce module which requires manual configuration).
-
es.resource.write
(defaults toes.resource
) - Elasticsearch resource used for writing (but not reading) data. Used typically for dynamic resource writes or when writing and reading data to different Elasticsearch indices within the same job. Typically set automatically (except for the Map/Reduce module which requires manual configuration).
Note that specifying multiple
indices and/or types in the above resource settings are allowed only for reading. Specifying multiple indices for
writes is only supported through the use of dynamic resource (described below). Use _all/types
to search types
in all indices or index/
to search all types within index
. Do note that reading multiple
indices/types typically works only when they have the same structure and only with some libraries. Integrations that
require a strongly typed mapping (such as a table like Hive or SparkSQL) are likely to fail.
Dynamic/multi resource writes
editFor writing, elasticsearch-hadoop allows the target resource to be resolved at runtime by using patterns (by using the {<field-name>}
format), resolved at runtime based on the data being streamed to Elasticsearch. That is, one can save documents to a certain index
or type
based on one or multiple fields resolved from the document about to be saved.
For example, assuming the following document set (described here in JSON for readability - feel free to translate this into the actual Java objects):
{ "media_type":"game", "title":"Final Fantasy VI", "year":"1994" }, { "media_type":"book", "title":"Harry Potter", "year":"2010" }, { "media_type":"music", "title":"Surfing With The Alien", "year":"1987" }
to index each of them based on their media_type
one would use the following pattern:
# index the documents based on their type es.resource.write = my-collection/{media_type}
which would result in Final Fantasy VI
indexed under my-collection/game
, Harry Potter
under my-collection/book
and Surfing With The Alien
under my-collection/music
.
For more information, please refer to the dedicated integration section.
Dynamic resources are supported only for writing, for doing multi-index/type reads, use an appropriate search query.
Formatting dynamic/multi resource writes
editWhen using dynamic/multi writes, one can also specify a formatting of the value returned by the field. Out of the box, elasticsearch-hadoop provides formatting for date/timestamp fields which is useful for automatically grouping time-based data (such as logs) within a certain time range under the same index. By using the Java SimpleDataFormat syntax, one can format and parse the date in a locale-sensitive manner.
For example assuming the data contains a @timestamp
field, one can group the documents in daily indices using the following configuration:
The same configuration property is used (es.resource.write
) however, through the special |
characters a formatting pattern is specified.
Please refer to the SimpleDateFormat javadocs for more information on the syntax supported.
In this case yyyy.MM.dd
translates the date into the year (specified by four digits), month by 2 digits followed by the day by two digits (such as 2015.01.28
).
Logstash users will find this pattern quite familiar.
Essential settings
editNetwork
edit-
es.nodes
(default localhost) -
List of Elasticsearch nodes to connect to. When using Elasticsearch remotely, do set this option. Note that the list does not have to contain every node inside the Elasticsearch cluster; these are discovered automatically by elasticsearch-hadoop by default (see below). Each node can also have its HTTP/REST port specified individually (e.g.
mynode:9600
). -
es.port
(default 9200) -
Default HTTP/REST port used for connecting to Elasticsearch - this setting is applied to the nodes in
es.nodes
that do not have any port specified.
Added in 2.2.
-
es.nodes.path.prefix
(default empty) -
Prefix to add to all requests made to Elasticsearch. Useful in environments where the cluster is proxied/routed under a certain path. For example, if the cluster is located at
someaddress:someport/custom/path/prefix
, one would setes.nodes.path.prefix
to/custom/path/prefix
.
Querying
edit-
es.query
(default none) -
Holds the query used for reading data from the specified
es.resource
. By default it is not set/empty, meaning the entire data under the specified index/type is returned.es.query
can have three forms:- uri query
-
using the form
?uri_query
, one can specify a query string. Notice the leading?
. - query dsl
-
using the form
query_dsl
- note the query dsl needs to start with{
and end with}
as mentioned here - external resource
-
if none of the two above do match, elasticsearch-hadoop will try to interpret the parameter as a path within the HDFS file-system. If that is not the case, it will try to load the resource from the classpath or, if that fails, from the Hadoop
DistributedCache
. The resource should contain either auri query
or aquery dsl
.
To wit, here is an example:
# uri (or parameter) query es.query = ?q=costinl # query dsl es.query = { "query" : { "term" : { "user" : "costinl" } } } # external resource es.query = org/mypackage/myquery.json
In other words, es.query
is flexible enough so that you can use whatever search api you prefer, either inline or by loading it from an external resource.
We recommend using query dsl externalized in a file, included within the job jar (and thus available on its classpath). This makes it easy to identify, debug and organize your queries. Through-out the documentation we use the uri query to save text and increase readability - real-life queries quickly become unwieldy when used as uris.
Operation
edit-
es.input.json
(default false) - Whether the input is already in JSON format or not (the default). Please see the appropriate section of each integration for more details about using JSON directly.
-
es.write.operation
(default index) -
The write operation elasticsearch-hadoop should peform - can be any of:
-
index
(default) - new data is added while existing data (based on its id) is replaced (reindexed).
-
create
- adds new data - if the data already exists (based on its id), an exception is thrown.
-
update
- updates existing data (based on its id). If no data is found, an exception is thrown.
-
upsert
- known as merge or insert if the data does not exist, updates if the data exists (based on its id).
-
Added in 2.1.
-
es.output.json
(default false) - Whether the output from the connector should be in JSON format or not (the default). When enabled, the documents are returned in raw JSON format (as returned from Elasticsearch). Please see the appropriate section of each integration for more details about using JSON directly.
Added in 5.0.0.
-
es.ingest.pipeline
(default none) -
The name of an existing Elasticsearch Ingest pipeline that should be targeted when indexing or creating documents. Only usable when doing
index
andcreate
operations; Incompatible withupdate
orupsert
operations.
Mapping (when writing to Elasticsearch)
edit-
es.mapping.id
(default none) - The document field/property name containing the document id.
-
es.mapping.parent
(default none) -
The document field/property name containing the document parent. To specify a constant, use the
<CONSTANT>
format.
Added in 5.6.0.
-
es.mapping.join
(default none) -
The document field/property name containing the document’s join field. Constants are not accepted. Join fields on a
document must contain either the parent relation name as a string, or an object that contains the child relation name
and the id of its parent. If a child document is identified when using this setting, the document’s routing is
automatically set to the parent id if no other routing is configured in
es.mapping.routing
. -
es.mapping.routing
(default depends ones.mapping.join
) -
The document field/property name containing the document routing. To specify a constant, use the
<CONSTANT>
format. If a join field is specified withes.mapping.join
, then this defaults to the value of the join field’s parent id. If a join field is not specified, then this defaults to none. -
es.mapping.version
(default none) -
The document field/property name containing the document version. To specify a constant, use the
<CONSTANT>
format. -
es.mapping.version.type
(default depends ones.mapping.version
) -
Indicates the type of versioning used.
If
es.mapping.version
is undefined (default), its value is unspecified. Ifes.mapping.version
is specified, its value becomesexternal
.
Deprecated in 6.0.0.
-
es.mapping.ttl
(default none) -
The document field/property name containing the document time-to-live. To specify a constant, use the
<CONSTANT>
format. Will not work on Elasticsearch 6.0+ index versions, but support will continue for 5.x index versions and below.
Deprecated in 6.0.0.
-
es.mapping.timestamp
(default none) -
The document field/property name containing the document timestamp. To specify a constant, use the
<CONSTANT>
format. Will not work on Elasticsearch 6.0+ index versions, but support will continue for 5.x index versions and below.
Added in 2.1.
-
es.mapping.include
(default none) - Field/property to be included in the document sent to Elasticsearch. Useful for extracting the needed data from entities. The syntax is similar to that of Elasticsearch include/exclude. Multiple values can be specified by using a comma. By default, no value is specified meaning all properties/fields are included.
The es.mapping.include
feature is ignored when es.input.json
is specified. In order to prevent the connector from indexing
data that is implicitly excluded, any jobs with these property conflicts will refuse to execute!
Added in 2.1.
-
es.mapping.exclude
(default none) - Field/property to be excluded in the document sent to Elasticsearch. Useful for eliminating unneeded data from entities. The syntax is similar to that of Elasticsearch include/exclude. Multiple values can be specified by using a comma. By default, no value is specified meaning no properties/fields are excluded.
The es.mapping.exclude
feature is ignored when es.input.json
is specified. In order to prevent the connector from indexing
data that is explicitly excluded, any jobs with these property conflicts will refuse to execute!
For example:
# extracting the id from the field called 'uuid' es.mapping.id = uuid # specifying a parent with id '123' es.mapping.parent = <123> # combine include / exclude for complete control # include es.mapping.include = u*, foo.* # exclude es.mapping.exclude = *.description
Using the configuration above, each entry will have only its top-level fields, starting with u
and nested fields under foo
included
in the document with the exception of any nested field named description
. Additionally the document parent will be 123
while the
document id extracted from field uuid
.
Field information (when reading from Elasticsearch)
editAdded in 2.1.
-
es.mapping.date.rich
(default true) -
Whether to create a rich
Date
like object forDate
fields in Elasticsearch or returned them as primitives (String
orlong
). By default this is true. The actual object type is based on the library used; noteable exception being Map/Reduce which provides no built-inDate
object and as suchLongWritable
andText
are returned regardless of this setting.
Added in 2.2.
-
es.read.field.include
(default empty) - Fields/properties that are parsed and considered when reading the documents from Elasticsearch. By default empty meaning all fields are considered. Use this property with caution as it can have nasty side-effects. Typically used in cases where some documents returned do not fit into an expected mapping.
Added in 2.2.
-
es.read.field.exclude
(default empty) - Fields/properties that are discarded when reading the documents from Elasticsearch. By default empty meaning no fields are excluded. Use this property with caution as it can have nasty side-effects. Typically used in cases where some documents returned do not fit into an expected mapping.
For example:
# To exclude field company.income es.read.field.exclude = company.income
Added in 2.2.
-
es.read.field.as.array.include
(default empty) - Fields/properties that should be considered as arrays/lists. Since Elasticsearch can map one or multiple values to a field, elasticsearch-hadoop cannot determine from the mapping whether to instantiate one value or a array type (depending on the library type). When encountering multiple values, elasticsearch-hadoop will automatically use the array/list type but in strict mapping scenarios (like Spark SQL) this might lead to an unexpected schema change. The syntax is similar to that of Elasticsearch include/exclude. Multiple values can be specified by using a comma. By default, no value is specified meaning no properties/fields are included.
For example:
# mapping nested.bar as an array es.read.field.as.array.include = nested.bar # mapping nested.bar as a 3-level/dimensional array es.read.field.as.array.include = nested.bar:3
-
es.read.field.as.array.exclude
(default empty) -
Fields/properties that should be considered as arrays/lists. Similar to
es.read.field.as.array.include
above. Multiple values can be specified by using a comma. By default, no value is specified meaning no properties/fields are excluded (and since none is included as indicated above), no field is treated as array before-hand.
Metadata (when reading from Elasticsearch)
edit-
es.read.metadata
(default false) - Whether to include the document metadata (such as id and version) in the results or not (default).
-
es.read.metadata.field
(default _metadata) -
The field under which the metadata information is placed. When
es.read.metadata
is set to true, the information is returned as aMap
under the specified field. -
es.read.metadata.version
(default false) -
Whether to include the document version in the returned metadata. Applicable only if
es.read.metadata
is enabled.
Update settings (when writing to Elasticsearch)
editOne using the update
or upsert
operation, additional settings (that mirror the update API) are available:
-
es.update.script.inline
(default none) - Inline Script used for updating the document.
Added in 6.0.
-
es.update.script.file
(default none) - Name of file script to use for updating document. File scripts are removed in 6.x, and as such, this property will throw an error if used against a 6.x and above cluster.
Added in 6.0.
-
es.update.script.stored
(default none) - Identifier of a stored script to use for updating the document.
-
es.update.script.lang
(default none) - Script language. By default, no value is specified applying the node configuration.
-
es.update.script.params
(default none) -
Script parameters (if any). The document (currently read) field/property who’s value is used. To specify a constant, use the
<CONSTANT>
format. Multiple values can be specified through commas (,
)
For example:
# specifying 2 parameters, one extracting the value from field 'number', the other containing the value '123': es.update.script.params = param1:number,param2:<123>
-
es.update.script.params.json
-
Script parameters specified in
raw
, JSON format. The specified value is passed as is, without any further processing or filtering. Typically used for migrating existing update scripts.
For example:
es.update.script.params.json = {"param1":1, "param2":2}
-
es.update.retry.on.conflict
(default 0) - How many times an update to a document is retried in case of conflict. Useful in concurrent environments.
Advanced settings
editIndex
edit-
es.index.auto.create
(default yes) - Whether elasticsearch-hadoop should create an index (if its missing) when writing data to Elasticsearch or fail.
-
es.index.read.missing.as.empty
(default no) - Whether elasticsearch-hadoop will allow reading of non existing indices (and return an empty data set) or not (and throw an exception)
-
es.field.read.empty.as.null
(default yes) -
Whether elasticsearch-hadoop will treat empty fields as
null
. This settings is typically not needed (as elasticsearch-hadoop already handles the null case) but is enabled for making it easier to work with text fields that haven’t been sanitized yet. -
es.field.read.validate.presence
(default warn) -
To help out spot possible mistakes when querying data from Hadoop (which results in incorrect data being returned), elasticsearch-hadoop can perform validation spotting missing fields and potential typos. Possible values are :
-
ignore
- no validation is performed
-
warn
- a warning message is logged in case the validation fails
-
strict
- an exception is thrown, halting the job, if a field is missing
-
The default (warn
) will log any typos to the console when the job starts:
WARN main mr.EsInputFormat - Field(s) [naem, adress] not found in the Elasticsearch mapping specified; did you mean [name, location.address]?
-
es.read.source.filter
(default none) - Normally when using an integration that allows specifying some form of schema (such as Hive, or Cascading), the connector will automatically extract the field names from the schema and request only those fields from Elasticsearch to save on bandwidth. When using an integration that does not leverage any data schemas (such as normal MR and Spark), this property allows you to specify a comma delimited string of field names that you would like to return from Elasticsearch.
If es.read.source.filter
is set, an exception will be thrown in the
case that the connector tries to push down a different source field filtering.
In these cases you should clear this property and trust that the connector knows
which fields should be returned. This occurs in SparkSQL, Hive, and when
specifying a schema in Cascading and Pig.
User specified source filters were found [name,timestamp], but the connector is executing in a state where it has provided its own source filtering [name,timestamp,location.address]. Please clear the user specified source fields under the [es.read.source.filter] property to continue. Bailing out...
Added in 5.4.
-
es.index.read.allow.red.status
(default false) -
When executing a job that is reading from Elasticsearch, if the resource provided for reads
includes an index that has missing shards that are causing the cluster to have a status of
red
, then ES-Hadoop will inform you of this status and fail fast. In situations where the job must continue using the remaining available data that is still reachable, users can enable this property to instruct the connector to ignore shards it could not reach.
Using es.index.read.allow.red.status
can lead to jobs running over incomplete
datasets. Jobs executed against a red cluster will yield inconsistent results when
compared to jobs executed against a fully green or yellow cluster. Use this setting with
caution.
Input
editAdded in 5.0.0.
-
es.input.max.docs.per.partition
(default 100000) - When reading from an Elasticsearch cluster that supports scroll slicing (Elasticsearch v5.0.0 and above), this parameter advises the connector on what the maximum number of documents per input partition should be. The connector will sample and estimate the number of documents on each shard to be read and divides each shard into input slices using the value supplied by this property. This property is ignored if you are reading from an Elasticsearch cluster that does not support scroll slicing (Elasticsearch any version below v5.0.0).
Network
edit-
es.nodes.discovery
(default true) -
Whether to discover the nodes within the Elasticsearch cluster or only to use the ones given in
es.nodes
for metadata queries. Note that this setting only applies during start-up; afterwards when reading and writing, elasticsearch-hadoop uses the target index shards (and their hosting nodes) unlesses.nodes.client.only
is enabled. -
es.nodes.client.only
(default false) -
Whether to use Elasticsearch client nodes (or load-balancers). When enabled, elasticsearch-hadoop will route all its requests (after nodes discovery, if enabled) through the client nodes within the cluster. Note this typically significantly reduces the node parallelism and thus it is disabled by default. Enabling it also
disables
es.nodes.data.only
(since a client node is a non-data node).
Added in 2.1.2.
-
es.nodes.data.only
(default true) - Whether to use Elasticsearch data nodes only. When enabled, elasticsearch-hadoop will route all its requests (after nodes discovery, if enabled) through the data nodes within the cluster. The purpose of this configuration setting is to avoid overwhelming non-data nodes as these tend to be "smaller" nodes. This is enabled by default.
Added in 5.0.0.
-
es.nodes.ingest.only
(default false) -
Whether to use Elasticsearch ingest nodes only. When enabled, elasticsearch-hadoop will route all of its requests (after nodes discovery, if enabled) through the ingest nodes within the cluster. The purpose of this configuration setting is to avoid incurring the cost of forwarding data meant for a pipeline from non-ingest nodes; Really only useful when writing data to an Ingest Pipeline (see
es.ingest.pipeline
above).
Added in 2.2.
-
es.nodes.wan.only
(default false) -
Whether the connector is used against an Elasticsearch instance in a cloud/restricted environment over the WAN, such as Amazon Web Services. In this mode, the connector disables discovery and only connects through the declared
es.nodes
during all operations, including reads and writes. Note that in this mode, performance is highly affected.
Added in 2.2.
-
es.nodes.resolve.hostname
(default depends) -
Whether the connector should resolve the nodes hostnames to IP addresses or not. By default it is
true
unlesswan
mode is enabled (see above) in which case it will default to false.
Added in 2.2.
-
es.http.timeout
(default 1m) - Timeout for HTTP/REST connections to Elasticsearch.
-
es.http.retries
(default 3) -
Number of retries for establishing a (broken) http connection. The retries are applied for each conversation with an Elasticsearch node. Once the retries are depleted, the connection will automatically be re-reouted to the next
available Elasticsearch node (based on the declaration of
es.nodes
, followed by the discovered nodes - if enabled). -
es.scroll.keepalive
(default 10m) - The maximum duration of result scrolls between query requests.
-
es.scroll.size
(default 50) - Number of results/items returned by each individual per request.
Added in 2.2.
-
es.scroll.limit
(default -1) -
Number of total results/items returned by each individual scroll. A negative value indicates that all documents that match should be returned. Do note that this applies per scroll which is typically bound to one of the job tasks.
Thus the total number of documents returned is
LIMIT * NUMBER_OF_SCROLLS (OR TASKS)
-
es.action.heart.beat.lead
(default 15s) - The lead to task timeout before elasticsearch-hadoop informs Hadoop the task is still running to prevent task restart.
Added in 5.3.0.
Setting HTTP Request Headers
edit-
es.net.http.header.[HEADER-NAME]
-
By using the
es.net.http.header.
prefix, you can provide HTTP Headers to all requests made to Elasticsearch from elasticsearch-hadoop. Please note that some standard HTTP Headers are reserved by the connector to ensure correct operation and cannot be set or overridden by the user (Accept
andContent-Type
for instance).
For example, here the user is setting the Max-Forwards
HTTP header:
es.net.http.header.Max-Forwards = 10
Added in 2.1.
Basic Authentication
edit-
es.net.http.auth.user
- Basic Authentication user name
-
es.net.http.auth.pass
- Basic Authentication password
Added in 2.1.
SSL
edit-
es.net.ssl
(default false) - Enable SSL
-
es.net.ssl.keystore.location
- key store (if used) location (typically a URL, without a prefix it is interpreted as a classpath entry)
-
es.net.ssl.keystore.pass
- key store password
-
es.net.ssl.keystore.type
(default JKS) - key store type. PK12 is a common, alternative format
-
es.net.ssl.truststore.location
- trust store location (typically a URL, without a prefix it is interpreted as a classpath entry)
-
es.net.ssl.truststore.pass
- trust store password
-
es.net.ssl.cert.allow.self.signed
(default false) - Whether or not to allow self signed certificates
-
es.net.ssl.protocol
(default TLS) - SSL protocol to be used
Proxy
edit-
es.net.proxy.http.host
- Http proxy host name
-
es.net.proxy.http.port
- Http proxy port
-
es.net.proxy.http.user
- Http proxy user name
-
es.net.proxy.http.pass
- Http proxy password
-
es.net.proxy.http.use.system.props
(default yes) -
Whether the use the system Http proxy properties (namely
http.proxyHost
andhttp.proxyPort
) or not
Added in 2.2.
-
es.net.proxy.https.host
- Https proxy host name
Added in 2.2.
-
es.net.proxy.https.port
- Https proxy port
Added in 2.2.
-
es.net.proxy.https.user
- Https proxy user name
Added in 2.2.
-
es.net.proxy.https.pass
- Https proxy password
Added in 2.2.
-
es.net.proxy.https.use.system.props
(default yes) -
Whether the use the system Https proxy properties (namely
https.proxyHost
andhttps.proxyPort
) or not -
es.net.proxy.socks.host
- Http proxy host name
-
es.net.proxy.socks.port
- Http proxy port
-
es.net.proxy.socks.user
- Http proxy user name
-
es.net.proxy.socks.pass
- Http proxy password
-
es.net.proxy.socks.use.system.props
(default yes) -
Whether the use the system Socks proxy properties (namely
socksProxyHost
andsocksProxyHost
) or not
elasticsearch-hadoop allows proxy settings to be applied only to its connection using the setting above. Take extra care when there is already a JVM-wide proxy setting (typically through system properties) to avoid unexpected behavior.
IMPORTANT: The semantics of these properties are described in the JVM docs. In some cases, setting up the JVM property java.net.useSystemProxies
to true
works better then setting these properties manually.
Serialization
edit-
es.batch.size.bytes
(default 1mb) - Size (in bytes) for batch writes using Elasticsearch bulk API. Note the bulk size is allocated per task instance. Always multiply by the number of tasks within a Hadoop job to get the total bulk size at runtime hitting Elasticsearch.
-
es.batch.size.entries
(default 1000) -
Size (in entries) for batch writes using Elasticsearch bulk API - (0 disables it). Companion to
es.batch.size.bytes
, once one matches, the batch update is executed. Similar to the size, this setting is per task instance; it gets multiplied at runtime by the total number of Hadoop tasks running. -
es.batch.write.refresh
(default true) - Whether to invoke an index refresh or not after a bulk update has been completed. Note this is called only after the entire write (meaning multiple bulk updates) have been executed.
-
es.batch.write.retry.count
(default 3) - Number of retries for a given batch in case Elasticsearch is overloaded and data is rejected. Note that only the rejected data is retried. If there is still data rejected after the retries have been performed, the Hadoop job is cancelled (and fails). A negative value indicates infinite retries; be careful in setting this value as it can have unwanted side effects.
-
es.batch.write.retry.wait
(default 10s) - Time to wait between batch write retries.
-
es.ser.reader.value.class
(default depends on the library used) -
Name of the
ValueReader
implementation for converting JSON to objects. This is set by the framework depending on the library (Map/Reduce, Cascading, Hive, Pig, etc…) used. -
es.ser.writer.value.class
(default depends on the library used) -
Name of the
ValueWriter
implementation for converting objects to JSON. This is set by the framework depending on the library (Map/Reduce, Cascading, Hive, Pig, etc…) used.