How to ingest data to Elasticsearch through Apache Camel

Data ingestion into Elasticsearch using Apache Camel is a process that combines the robustness of a search engine with the flexibility of an integration framework. In this article, we will explore how Apache Camel can simplify and optimize data ingestion into Elasticsearch. To illustrate this functionality, we will implement an introductory application that demonstrates, step by step, how to configure and use Apache Camel to send data to Elasticsearch.

What is Apache Camel?

Apache Camel is an open-source integration framework that simplifies connecting diverse systems, allowing developers to focus on business logic without worrying about the complexities of system communication. The central concept in Camel is "routes," which define the path a message follows from origin to destination, potentially including intermediate steps such as transformations, validations, and filtering.

Apache Camel architecture

Camel Apache architecture

Camel uses "components" to connect to different systems and protocols, such as databases and messaging services, and "endpoints" to represent the entry and exit points of messages. These concepts provide a modular and flexible design, making it easier to configure and manage complex integrations efficiently and scalably.

Using Elasticsearch and Apache Camel

We will demonstrate how to configure a simple Java application that uses Apache Camel to ingest data into an Elasticsearch cluster. The processes of creating, updating, and deleting data in Elasticsearch using routes defined in Apache Camel will also be covered.

1. Adding dependencies

The first step in configuring this integration is to add the necessary dependencies to your project's pom.xml file. This will include the Apache Camel and Elasticsearch libraries. We will be using the new Java API Client library, so we must import the camel-elasticsearch component and the version must be the same as the camel-core library.

If you want to use the Java Low level Rest Client, you must use the Elasticsearch Low level Rest Client component.

<dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-core</artifactId>
   <version>4.7.0</version>
</dependency>

<dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-elasticsearch</artifactId>
   <version>4.7.0</version>
</dependency>

<dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-jackson</artifactId>
   <version>4.7.0</version>
</dependency>

<dependency>
   <groupId>co.elastic.clients</groupId>
   <artifactId>elasticsearch-java</artifactId>
   <version>8.14.3</version>
</dependency>

2. Configuring and running the Camel Context

The configuration begins by creating a new Camel context using the DefaultCamelContext class, which serves as the base for defining and executing routes. Next, we configure the Elasticsearch component, which will allow Apache Camel to interact with an Elasticsearch cluster. The ESlasticsearchComponent instance is configured to connect to the address localhost:9200, which is the default address for a local Elasticsearch cluster. For an environment setup that requires authentication, you should read the documentation on how to configure the component and enable basic authentication, referred to as "Configure the component and enable basic authentication".

public class ESComponent {

    public static ElasticsearchComponent getInstance() {
        var elasticsearch = new ElasticsearchComponent();
        elasticsearch.setHostAddresses("localhost:9200");
        return elasticsearch;
    }

    public static String getName() {
        return "elasticsearch";
    }
}

This component is then added to the Camel context, enabling the defined routes to use this component to perform operations in Elasticsearch.

try (var context = new DefaultCamelContext()) {
   context.addComponent(ESComponent.getName(), ESComponent.getInstance());
   context.addRoutes(new OperationBulkRoute());
   context.start();
}

Afterward, the routes are added to the context. We will create routes for bulk indexing, updating, and deleting documents.

3. Configuring Camel routes

Data indexing

The first route we will configure is for data indexing. We will use a JSON file containing a movie catalog. The route will be configured to read the file located at src/main/resources/movies.json, deserialize the JSON content into Java objects, and then apply an aggregation strategy to combine multiple messages into one, allowing batch operations in Elasticsearch. The size of 500 items per message was configured, that is, the bulk will index 500 films at a time.

Route Elasticsearch Operation Bulk

String URI_BULK_OPERATION = String
       .format("elasticsearch://elasticsearch?operation=%s&indexName=%s",
               IndexOperationConfig.BULK_OPERATION,
               INDEX_NAME);
public class OperationBulkRoute extends RouteBuilder {
   private static final Log log = LogFactory.getLog(OperationBulkRoute.class);
   private static final int BULK_SIZE = 500;

   @Override
   public void configure() {
       from("file:src/main/resources?fileName=movies.json&noop=true")
               .routeId("route-bulk-ingest")
               .unmarshal().json()
               .split(body())
               .aggregate(constant(true), new BulkAggregationStrategy())
               .completionSize(BULK_SIZE)
               .to(URI_BULK_OPERATION)
               .process(exchange -> {
                   var body = exchange.getIn().getBody(String.class);
                   log.info(String.format("Response: %s", body));
               })
               .end();
   }
}

The batch of documents will be sent to Elasticsearch's bulk operation endpoint. This approach ensures efficiency and speed when handling large volumes of data.

Data update

The next route will be to update documents. We indexed some movies in the previous step and now we will create new routes to search for a document by reference code and then update the rating field.

We set up a Camel context (DefaultCamelContext), where an Elasticsearch component is registered and a custom route IngestionRoute is added. The operation starts by sending the document code through the ProducerTemplate, which starts the route from the direct:update-ingestion endpoint.

try (var context = new DefaultCamelContext()) {
    context.addComponent(ESComponent.getName(), ESComponent.getInstance());
    context.addRoutes(new IngestionRoute());
    context.start();
    ProducerTemplate producerTemplate = context.createProducerTemplate();
    producerTemplate.sendBody("direct:update-ingestion", documentCode);
    Thread.sleep(5000);
}

Next, we have the IngestionRoute, which is the input endpoint for this flow. The route performs several pipelined operations. First, a search in Elasticsearch is done to locate the document by code (direct:search-by-id), where the SearchByCodeProcessor assembles the query based on the code. Then, the retrieved document is processed by the UpdateRatingProcessor, which converts the result into Movie objects, updates the movie rating to a specific value, and prepares the updated document to be sent back to Elasticsearch for updating.

public class IngestionRoute extends RouteBuilder {
    private static final Log log = LogFactory.getLog(IngestionRoute.class);

    @Override
    public void configure() throws Exception {

        from("direct:update-ingestion")
                .pipeline()
                .to("direct:search-by-id")
                .to(URI_SEARCH_OPERATION)
                .to("direct:update-rating")
                .to(URI_UPDATE_OPERATION)
                .process(exchange -> {
                    var body = exchange.getIn().getBody(String.class);
                    log.info(String.format("Response: %s", body));
                })
                .end();

        from("direct:search-by-id")
                .process(new SearchByCodeProcessor());

        from("direct:update-rating")
                .process(new UpdateRatingProcessor());
    }
}

The SearchByCodeProcessor processor was configured only to execute the search query:

public class SearchByCodeProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        var code = exchange.getIn().getBody();

        String query = "{\n" +
                "  \"query\": {\n" +
                "   \"term\": {\n" +
                "     \"code\": {\n" +
                "       \"value\":" + code + "\n" +
                "     }\n" +
                "   }\n" +
                "  }\n" +
                "}";
        exchange.setProperty("document_code", code);
        exchange.getIn().setBody(query);
    }
}

The UpdateRatingProcessor processor is responsible for updating the rating field.

public class UpdateRatingProcessor implements Processor {

    private final ObjectMapper objectMapper;

    public UpdateRatingProcessor() {
        this.objectMapper = new ObjectMapper();
        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    @Override
    public void process(Exchange exchange) throws Exception {

        HitsMetadata response = exchange.getIn().getBody(HitsMetadata.class);
        var code = Long.parseLong(exchange.getProperty("document_code").toString());

        if (response != null && response.hits() != null) {

            var documents = parseToMovies(response);

            var optionalMovie = documents.stream()
                    .filter(document -> code == (document.getSource().getCode())).findAny();

            optionalMovie.ifPresent(document -> {
                document.getSource().setRating(13.0);
                Map<String, Object> updateMap = new HashMap<>();
                updateMap.put("doc", document.getSource());
                exchange.getIn().setHeader("indexId", document.getId());
                exchange.getIn().setBody(updateMap);
            });
        }
    }

Data deletion

Finally, the route for deleting documents is configured. Here, we will delete a document using its ID. In Elasticsearch, to delete a document we need to know the document identifier, the index where the document is stored and execute a Delete request. In Apache Camel we will perform this operation by creating a new route as shown below.

The route starts from the direct:op-delete endpoint, which serves as the entry point. When a document needs to be deleted, its identifier (_id) is received in the body of the message. The route then sets the indexId header with the value of this identifier using simple("${body}"), which extracts the _id from the body of the message.

public class OperationDeleteRoute extends RouteBuilder {
   private static final Log log = LogFactory.getLog(OperationDeleteRoute.class);

   @Override
   public void configure() {
       from("direct:op-delete")
               .routeId("route-delete")
               .setHeader("indexId", simple("${body}"))
               .to(URI_DELETE_OPERATION)
               .process(exchange -> {
                   var body = exchange.getIn().getBody(String.class);
                   log.info(String.format("Response: %s", body));
               })
               .end();
       ;
   }
}
String URI_DELETE_OPERATION = String
       .format("elasticsearch://elasticsearch?operation=%s&indexName=%s",
               IndexOperationConfig.DELETE_OPERATION,
               INDEX_NAME);

Finally, the message is directed to the endpoint specified by URI_DELETE_OPERATION, which connects to Elasticsearch to perform the document removal operation in the corresponding index.
Now that we have created the route, we can create a Camel context (DefaultCamelContext), which is configured to include the Elasticsearch component.

try (var context = new DefaultCamelContext()) {
   context.addComponent(ESComponent.getName(), ESComponent.getInstance());
   context.addRoutes(new OperationDeleteRoute());
   context.start();
   ProducerTemplate producerTemplate = context.createProducerTemplate();
   producerTemplate.sendBody("direct:op-delete", documentId);
}

Next, the delete route, defined by the OperationDeleteRoute class, is added to the context. With the context initialized, a ProducerTemplate is used to pass the identifier of the document that should be deleted to the direct:op-delete endpoint, which triggers the delete route.

Conclusion

The integration between Apache Camel and Elasticsearch allows for robust and efficient data ingestion, leveraging Camel's flexibility to define routes that can handle different data manipulation scenarios, such as indexing, updating, and deleting. With this setup, you can orchestrate and automate complex processes in a scalable manner, ensuring that your data is managed efficiently in Elasticsearch. This example demonstrated how these tools can be used together to create an efficient and adaptable solution for data ingestion.

References

Ready to try this out on your own? Start a free trial.
Want to get Elastic certified? Find out when the next Elasticsearch Engineer training is running!

6 min read

Recommended Articles