Building real-time dashboard applications with Apache Flink, Elasticsearch, and Kibana
Gaining actionable insights from continuously produced data in real-time is a common requirement for many businesses today. A wide-spread use case for real-time data processing is dashboarding. A typical architecture to support such a use case is based on a data stream processor, a data store with low latency read/write access, and a visualization framework.
In this blog post, we demonstrate how to build a real-time
dashboard solution for stream data analytics using Apache Flink, Elasticsearch,
and Kibana. The following figure depicts our system architecture.
In our architecture, Apache Flink executes stream analysis jobs
that ingest a data stream, apply transformations to analyze, transform, and
model the data in motion, and write their results to an Elasticsearch index.
Kibana connects to the index and queries it for data to visualize. All
components of our architecture are open source systems under the Apache License
2.0. We show how to implement a Flink DataStream program that analyzes a stream
of taxi ride events and writes its results to Elasticsearch and give
instructions on how to connect and configure Kibana to visualize the analyzed
data in real-time.
Why use Apache Flink for stream processing?
Before we dive into the details of implementing our demo application, we discuss some of the features that make Apache Flink an outstanding stream processor. Apache Flink 0.10, which was recently released, comes with a competitive set of stream processing features, some of which are unique in the open source domain. The most important ones are:
- Support for event time and out of order streams: In reality, streams of events rarely arrive in the order that they are produced, especially streams from distributed systems and devices. Until now, it was up to the application programmer to correct this “time drift”, or simply ignore it and accept inaccurate results, as streaming systems (at least in the open source world) had no support for event time (i.e., processing events by the time they happened in the real world). Flink 0.10 is the first open source engine that supports out of order streams and which is able to consistently process events according to their timestamps.
- Expressive and easy-to-use APIs in Scala and Java: Flink's DataStream API ports many operators which are well known from batch processing APIs such as map, reduce, and join to the streaming world. In addition, it provides stream-specific operations such as window, split, and connect. First-class support for user-defined functions eases the implementation of custom application behavior. The DataStream API is available in Scala and Java.
- Support for sessions and unaligned windows: Most streaming systems have some concept of windowing, i.e., a grouping of events based on some function of time. Unfortunately, in many systems these windows are hard-coded and connected with the system’s internal checkpointing mechanism. Flink is the first open source streaming engine that completely decouples windowing from fault tolerance, allowing for richer forms of windows, such as sessions.
- Consistency, fault tolerance, and high availability: Flink guarantees consistent state updates in the presence of failures (often called “exactly-once processing”), and consistent data movement between selected sources and sinks (e.g., consistent data movement between Kafka and HDFS). Flink also supports worker and master failover, eliminating any single point of failure.
- Low latency and high throughput: We have clocked Flink at 1.5 million events per second per core, and have also observed latencies in the 25 millisecond range for jobs that include network data shuffling. Using a tuning knob, Flink users can navigate the latency-throughput trade off, making the system suitable for both high-throughput data ingestion and transformations, as well as ultra low latency (millisecond range) applications.
- Connectors and integration points: Flink integrates with a wide variety of open source systems for data input and output (e.g., HDFS, Kafka, Elasticsearch, HBase, and others), deployment (e.g., YARN), as well as acting as an execution engine for other frameworks (e.g., Cascading, Google Cloud Dataflow). The Flink project itself comes bundled with a Hadoop MapReduce compatibility layer, a Storm compatibility layer, as well as libraries for machine learning and graph processing.
- Developer productivity and operational simplicity: Flink runs in a variety of environments. Local execution within an IDE significantly eases development and debugging of Flink applications. In distributed setups, Flink runs at massive scale-out. The YARN mode allows users to bring up Flink clusters in a matter of seconds. Flink serves monitoring metrics of jobs and the system as a whole via a well-defined REST interface. A build-in web dashboard displays these metrics and makes monitoring of Flink very convenient.
The combination of these features
makes Apache Flink a unique choice for many stream processing applications.
Building a demo application with Flink, Elasticsearch, and Kibana
Our demo ingests a stream of taxi ride events and identifies
places that are popular within a certain period of time, i.e., we compute every
5 minutes the number of passengers that arrived at each location within the
last 15 minutes by taxi. This kind of computation is known as a sliding window
operation. We share a
Scala implementation of this application
(among others) on
Github. You can easily run the application
from your IDE by cloning the repository and importing the code. The
repository's README file provides more
detailed instructions.
Analyze the taxi ride event stream with Apache Flink
For the demo application, we generate a stream of taxi ride events from a public dataset of the New York City Taxi and LimousineCommission (TLC). The data set consists of records about taxi trips in New York City from 2009 to 2015. We took some of this data and converted it into a data set of taxi ride events by splitting each trip record into a ride start and a ride end event. The events have the following schema:
rideId: Long time: DateTime // start or end time isStart: Boolean // true = ride start, false = ride end location: GeoPoint // lon/lat of pick-up or drop-off location passengerCnt: short travelDist: float // -1 on start events
We implemented a custom SourceFunction
to serve a DataStream[TaxiRide]
from the ride
event data set. In order to generate the stream as realistically as possible,
events are emitted by their timestamps. Two events that occurred ten minutes
after each other in reality are ingested by Flink with a ten minute lag. A
speed-up factor can be specified to “fast-forward” the stream, i.e., with a
speed-up factor of 2.0, these events are served five minutes apart. Moreover, the source function adds a configurable random delay to each
event to simulate the real-world jitter. Given this stream of taxi ride events,
our task is to compute every five minutes the number of passengers that arrived
within the last 15 minutes at locations in New York City by taxi.
As a first step we obtain a StreamExecutionEnvironment
and set the TimeCharacteristic
to EventTime
. Event time mode guarantees consistent results even in case of historic
data or data which is delivered out-of-order.
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Next, we define the data source that generates a DataStream[TaxiRide]
with at most 60
seconds serving delay (events are out of order by max. 1 minute) and a speed-up
factor of 600 (10 minutes are served in 1 second).
// Define the data source val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource( “./data/nycTaxiData.gz”, 60, 600.0f))
Since we are only interested in locations that people travel to (and not where they come from) and because the original data is a little bit messy (locations are not always correctly specified), we apply a few filters to first cleanse the data.
val cleansedRides = rides // filter for ride end events .filter( !_.isStart ) // filter for events in NYC .filter( r => NycGeoUtils.isInNYC(r.location) )
The location of a taxi ride event is defined as a pair of continuous longitude/latitude values. We need to map them into a finite set of regions in order to be able to aggregate events by location. We do this by defining a grid of approx. 100x100 meter cells on the area of New York City. We use a utility function to map event locations to cell ids and extract the passenger count as follows:
// map location coordinates to cell Id, timestamp, and passenger count val cellIds: DataStream[(Int, Long, Short)] = cleansedRides .map { r => ( NycGeoUtils.mapToGridCell(r.location), r.time.getMillis, r.passengerCnt ) }
After these preparation steps, we have the data that we would like to aggregate. Since we want to compute the passenger count for each location (cell id), we start by keying (partitioning by key) the stream by cell id (_._1
). Subsequently, we define a sliding time window and run a <code>WindowFunction</code>; by calling apply()
:
val passengerCnts: DataStream[(Int, Long, Int)] = cellIds // key stream by cell Id .keyBy(_._1) // define sliding window on keyed stream .timeWindow(Time.minutes(15), Time.minutes(5)) // count events in window .apply { ( cell: Int, window: TimeWindow, events: Iterable[(Int, Short)], out: Collector[(Int, Long, Int)]) => out.collect( ( cell, window.getEnd, events.map( _._2 ).sum ) ) }
The timeWindow()
operation groups stream events into finite sets of records on which a window or aggregation function can be applied. For our application, we call apply() to process the windows using a WindowFunction
. The WindowFunction
receives four parameters, a Tuple that contains the key of the window, a Window object that contains details such as the start and end time of the window, an Iterable
over all elements in the window, and a Collector
to collect the records emitted by the WindowFunction
. We want to count the number of passengers that arrive within the window’s time bounds. Therefore, we have to emit a single record that contains the grid cell id, the end time of the window, and the sum of the passenger counts which is computed by extracting the individual passenger counts from the iterable (events.map( _._2)
) and summing them (.sum
).
Finally, we translate the cell id back into a GeoPoint
(referring to the center of the cell) and print the result stream to the standard output. The final env.execute()
call takes care of submitting the program for execution.
val cntByLocation: DataStream[(Int, Long, GeoPoint, Int)] = passengerCnts // map cell Id back to GeoPoint .map( r => (r._1, r._2, NycGeoUtils.getGridCellCenter(r._1), r._3 ) ) cntByLocation // print to console .print() env.execute(“Total passenger count per location”)
If you followed the instructions to import the demo code into your
IDE, you can run the
SlidingArrivalCount.scala
program by executing its main()
methods. You
will see Flink’s log messages and the computed results being printed to the
standard output.
You might wonder why the the program produces results much faster than once every five minutes per location. This is due to the event time processing mode. Since all time-based operations (such as windows) are based on the timestamps of the events, the program becomes independent of the speed at which the data is served. This also means that you can process historic data which is read at full speed from some data store and data which is continuously produced with exactly the same program.
Our streaming program will run for a few minutes until the
packaged data set is completely processed but you can terminate it at any time.
As a next step, we show how to write the result stream into an Elasticsearch
index.
Prepare the Elasticsearch
- Download Elasticsearch 1.7.3 as .tar (or .zip) archive here.
- Extract
the archive file:
tar xvfz elasticsearch-1.7.3.tar.gz
- Enter
the extracted directory and start Elasticsearch
cd elasticsearch-1.7.3 ./bin/elasticsearch
-
Create an index
called “nyc-idx”:
curl -XPUT "http://localhost:9200/nyc-idx"
- Create
an index mapping called “popular-locations”:
curl -XPUT "http://localhost:9200/nyc-idx/_mapping/popular-locations" -d' { "popular-locations" : { "properties" : { "cnt": {"type": "integer"}, "location": {"type": "geo_point"}, "time": {"type": "date"} } } }'
The SlidingArrivalCount.scala
program is prepared to write data to the Elasticsearch index you just created
but requires a few parameters to be set at the beginning of the main()
function. Please set the parameters as follows:
val writeToElasticsearch = true val elasticsearchHost = // look up the IP address in the Elasticsearch logs val elasticsearchPort = 9300
Now, everything is set up to fill our index with data. When you run the program by executing the main() method again, the program will write the resulting stream to the standard output as before but also insert the records into the nyc-idx
Elasticsearch index.
If you later want to clear the nyc-idx index, you can simply drop the mapping by running
curl -XDELETE 'http://localhost:9200/nyc-idx/popular-locations'
and create the mapping again with the previous command.
Visualizing the results with Kibana
In order to visualize the data that is inserted into Elasticsearch, we install Kibana 4.1.3 which is compatible with Elasticsearch 1.7.3. The setup is basically the same as for Elasticsearch.
1. Download Kibana 4.1.3 for your environment here.
2. Extract the archive file.
3.
Enter the extracted folder and start Kibana by running
the start script:
./bin/kibana
4.
Open
http://localhost:5601 in your browser to
access Kibana.
Next we need to configure an index pattern. Enter the index
name “nyc-idx” and click on “Create”. Do not uncheck the “Index contains
time-based events” option. Now, Kibana knows about our index and we can start to
visualize our data.
First click on the “Discover” button at the top of the page.
You will find that Kibana tells you “No results found”.
This is because Kibana restricts time-based events by default
to the last 15 minutes. Since our taxi ride data stream starts on January, 1st
2013, we need to adapt the time range that is considered by Kibana. This is
done by clicking on the label “Last 15 Minutes” in the top right corner and
entering an absolute time range starting at 2013-01-01 and ending at 2013-01-06.
We have told Kibana where our data is and the valid time range
and can continue to visualize the data. For example we can visualize the
arrival counts on a map. Click on the “Visualize” button at the top of the
page, select “Tile map”, and click on “From a new search”.
See the following screenshot for the tile map configuration (left-hand side).
Another interesting visualization is to plot the number of
arriving passengers over time. Click on “Visualize” at the top, select
“Vertical bar chart”, and select “From a new search”. Again, have a look at the
following screenshot for an example for how to configure the chart.
Kibana offers many more chart types and visualization options
which are out of the scope of this post. You can easily play around with this
setup, explore Kibana’s features, and implement your own Flink DataStream
programs to analyze taxi rides in New York City.
We’re done and hope you had some fun
In this blog post we demonstrated how to build a real-time
dashboard application with Apache Flink, Elasticsearch, and Kibana. By
supporting event-time processing, Apache Flink is able to produce meaningful
and consistent results even for historic data or in environments where events
arrive out-of-order. The expressive DataStream API with flexible window
semantics results in significantly less custom application logic compared to
other open source stream processing solutions. Finally, connecting Flink with
Elasticsearch and visualizing the real-time data with Kibana is just a matter of
a few minutes. We hope you enjoyed running our demo application and had fun
playing around with the code.
Fabian Hueske is a PMC member of Apache Flink. He is contributing to Flink since its earliest days when it started as research project as part of his PhD studies at TU Berlin. Fabian did internships with IBM Research, SAP Research, and Microsoft Research and is a co-founder of data Artisans, a Berlin-based start-up devoted to foster Apache Flink. He is interested in distributed data processing and query optimization.