Application Scaling with Elasticsearch @ StockTwits
In this article, I want to share with you how at StockTwits we overhauled our message sharing system that took us from frequent downtime and general slowness to lightning fast requests and very happy users – all while allowing us to continue to scale in the future as traffic increases.
Our Use Case: Mo Cashtags, Mo Problems
StockTwits is the largest social network dedicated to the finance community, with 1.5 million monthly active visitors. One of our primary features is the ability to view chronological streams of message posts based on specific filters. More specifically, we need the ability to query our collection of posts and get only posts that have a “cashtag” mentioned in them ($AAPL, $GOOG, etc). We also need the ability to query for posts from users that a specific user is following. This needs to be done quickly and at scale, regardless of how much our traffic grows.
An $AAPL message stream on StockTwits.
The initial architecture for this use case involved heavy database queries on MySQL and a lot of complex caching with Redis. Imagine you are a user and every stream you want to look at is an individual set in Redis. If you have 100,000 followers, that means every time you post a message, that message would have to get inserted in 100,000 individual sets in Redis so your followers can see your posts.
This worked in the beginning, but was no longer scalable as our user base continued to grow. As we added more streams, the complexity snowballed: more code, more maintenance, more points of failure. We were recently forced to rethink how our entire system was architected and realized that it is a perfect use case to implement Elasticsearch.
The Solution: Two Indexes, One Query
Imagine we have 2 indexes. One called “messages,” which has a body and user_id field on it, and another called “friendships,” which has a following field that is an array of user id’s that a particular user is following.
Message:
body: The text of the message
user_id: id of the user who shared the message
Friendships:
following: Array of user ids of which the user is following
Since each document in Elasticsearch has an _id
value, we can assign each message document the id value we have in our MySQL database for easier reference in the future. Each friendship document is better suited by using the user_id value from our database as the _id
value for the document.
This is pretty straight forward, but how do you query these indexes to get a stream of messages that a single user is following?
Let’s say @johnsmith is logged in. His user_id is 123 and he is requesting his stream of messages from the users he follows. All we have to do is query the messages index with a get request to the url http://yourhost:9200/messages/message/_search with the JSON query attached:
{ "query": { "bool": { "filter": [{ "terms": { "user_id": { "index": "friendships", "type": "friendship", "path": "following", "id": 123 }, } }] } }, "size": 15, "sort": [{ "id": { "order": "desc" }}] }
Let’s take a deeper look at how this query works. We are asking our messages index to give us a filtered set of messages in which the user_id field must be equal to any ids that user 123 is following, similar to an IN query in SQL. It knows the set of following user ids for user 123 because we have specifically told it to route through the friendships index and use the following field for the set of following user ids. The size field is used to limit the result set Elasticsearch gives back and the sort field says we want to sort in descending order.
This is pretty straightforward when looking at it from the outside but there is some caching magic Elasticsearch is doing under the hood of this. Before Elasticsearch 2.0, it was up to the developer to provide caching logic for these queries.
For example, if we wanted to cache the following ids that this query uses, we used to have to provide a _cache_key
field with the name of the key and expire that cache manually when the list changed. The problem with this was there was a point at which we would be over-caching and taking a performance hit, thinking our efforts would net a performance gain. Since there is no easy way for the developer to know the point at which they are over-caching, Elasticsearch has taken that burden off of the developer and into their own hands by making sure that only reused filters are cached. This is why it’s important to stay up-to-date on the Elasticsearch versions and utilize deprecation logging as efficiency upgrades are happening rapidly.
As you can see, the query DSL is powerful, and most filter queries will be much more trivial than this one. It’s mostly a matter of structuring the query and setting up the indexes accordingly.
Make it Faster: Enter Aliases
The above approach works well for us but we noticed our queries were getting slower as our index grew, regardless of how many nodes we scaled out to. Even though this wasn’t causing any problems, at 80,000 posts per day and growing we knew eventually it would.
In addition, we later found out that this approach has a major capacity issue as it is never a good idea to let one index grow indefinitely. The solution was to break up the message index into monthly chunks and use the alias approach for querying a set of indexes. For example, we alias the most recent three indexes as “messages_last_3_months” and then query on that alias to get a result set from the last three months.
It is important to note that the more indexes you query at once, the more shards have to be queried, increasing the overall search load for that query. This may or may not have implications depending on your data and traffic size, but it needs to be monitored closely. In our case, we were able to keep the amount of indexes being queried concurrently to a minimum, and didn’t prove to be an issue. [Editor's note: This will improved in Elasticsearch 5.0! Read more here.]
Partitioning the message index did come with a bit of work as we had to lay out the logic for when we need to query on one, three, six, or all indexes, but the net gain was tremendous, which is submillisecond queries. Given our total indexed documents of about 51 million and 60GB in size, it blew us away when we first witnessed this kind of performance in production at the scale we are at. We have 5 nodes with each node running on a machine that has 16 cores, 64GB of memory and 160GB of disk. Our CPU usage usually hovers around 4-8%, so 16 cores is a bit overkill here. A seven day overview from Marvel is shown below. You’ll notice we don’t get high traffic during the weekends.
As you can see from a sample of our server data, we are running stable and healthy. Our indication that we may need to scale is when our JVM usage starts hitting at or above 75%, which looks like might be soon as our growth continues.
Summary: Less Code, More Nodes, More Features
Aside from our query performance gain, the most notable gain here is the amount of code we were able to scrap. Our architecture’s complexity is highly simplified from our previous design and the learning curve of this design is less steep. The time it takes between a user posting a message and being able to see it in any stream is the time it takes to get indexed plus the time it takes to search, which as we can see from our cluster statistics, is negligible.
When it comes time to scale again, we simply add another node to our cluster. To top it off, we’ve now set ourselves up for full-text search. With a simple query to our Elasticsearch set up, our application has outstanding full-text search capabilities.
Not only have we solved our main scaling issue, but we have also added additional features to our application that Elasticsearch provides, such as fast filter queries, custom aggregations, full-text search, and autocomplete.
Eric Alford (@ericalford) is the Director of Engineering at StockTwits. Initially trained in information assurance, his roots are in web and mobile application development. He enjoys long walks on the internet and poking things with sticks.