# How we upgraded an old, 3PB large, Elasticsearch cluster without downtime. Part 2 - Two consistent clusters

This is part 2 in our series on how we upgraded our Elasticsearch cluster without downtime and minimal user impact.

As described in Part 1, our requirements were to both provide a smooth transition between two different versions of our system, while still keeping the opportunities for a rollback open.

With that in mind it was obvious from the beginning that we would have to run two Elasticsearch clusters in parallel and then manage a seamless transition between them. This blog post will describe how we solved the indexing consistency and data migration parts of that problem.

## Everything changes

The requirements on our system are quite unusual compared to a typical log ingesting Elasticsearch cluster, because in our system no data is immutable. Any document in any index can potentially receive an update at any point in time. This means that we have to take very good care to not lose any updates and always apply them in the correct order.

We receive 5000 new documents and over 10000 updates per second in our cluster, which in turn contains over 400 billion documents and 3.5PB of data including replicas.

Managing all of this data is hard, even when you are only operating one cluster. Luckily, Elasticsearch provides built-in optimistic locking functionality that we have used to great extent. Before the upgrade project started we did not have any major problems with consistency, at least that we knew of at that time.

## Update use cases

Documents in our system receive updates for a number of different reasons. Three common examples are due to new engagement events (i.e a like or a comment on a social post), or due to a customer specific annotations on a document (i.e a tag or comment attached to the document) or because of compliance reasons (we have to hide and/or delete content based on certain rules from our content providers.

Figure 1. Typical index volume patterns over ~3 weeks

To accommodate all of that our system supports four basic operations on documents and fields. All operations are applied on a given document id and ids are guaranteed to be unique across the whole cluster.

• Upsert

• Add a new document or completely replace an existing one if it already exists

• Note that deletes are also modeled as Upserts. A delete is simply implemented as an Upsert that writes an “empty” tombstone document replacing the existing one.

• Add a new document, if it’s not already present. Otherwise keep the existing one and drop this new one silently.
• SetField

• Replace the value of one field in a document, keep the other fields intact. If the document doesn’t exist then throw an error.
• AppendToField / RemoveFromField

• For array fields we also support a field level append/remove functionality. This is a convenience functionality that is easier to use than the SetField operation if you are working with sets of values. In all other aspects the semantics of these operations are similar to SetField.

Before the upgrade all of these operations were applied by fetching the existing document from Elasticsearch, modifying it, then writing the modified document back to Elasticsearch. The synchronization and versioning of these operations was handled inside Elasticsearch. As long as we followed the rules of optimistic locking we were more or less safe against data loss and/or other unpredictable behavior. But as expected, all of this breaks down as soon as we introduce a second cluster.

Figure 2. An overview of the services implementing the four operations

## Adding another cluster

By introducing a second cluster to the architecture, suddenly a large category of new problems emerged. If we fail an update, or even worse, lose track of one, it can have an immediately visible impact to our customers.

The goal we had for the migration was to be able to route search requests back and forth between different clusters without our customers noticing. This meant that we had to make sure that all the received document operations were applied in both clusters, in the same effective order. If any operation got lost in one cluster it would cause very confusing user experiences where a user added tag would just “disappear” when the users requests were routed to that cluster. With our volume, an error condition that has a small chance of happening, say once every millionth time, will still occur every hour somewhere in the system.

We also have to solve the common enough situation where two operations get applied on the same document at almost the same time. For example an Upsert closely followed by a SetField. In that scenario we actually risk losing data entirely if we are not careful. If we applied those two operations in different orders in the two clusters they will end up with different results.

Figure 3. The updates are arriving in an incorrect order, leading to the update with ver. 1 getting lost in the end result

We needed to make sure that eventually both clusters converged to the same end result of any sequence of operations, even if they were applied in different order for the different clusters.

We knew that we were in for a challenge.

## Consistency architecture

One of the first realizations we made was that we first needed to limit the number of components that had direct access to Elasticsearch. The current state would make the migration to the new cluster too complex, with too much coordination required to get everything switched over at the same time. We settled on two such components, one for high volume batch streaming data and one with a REST api.

The most important change we had to do was to move the versioning out of Elasticsearch, since it would be very hard to make sure that each individual update was always applied in the same order in both clusters. To do this we also decided to stop support all the operations described above in the components that talks to Elasticsearch, but instead just implementing two “low level” operations namely

• GetDocument

• Fetches a document and its version number by documentId

• The version returned here needs to be provided to UpsertIfLatest
• UpsertIfLatest

• Performs an upsert on a document if the version is determined to be the latest

• If the version is not later than the existing throw an error and inform the caller

For example the SetField high level operation can then be implemented like this:

fun SetField(String field, String newValue, String docId) {
var success = false
while (!success) {
val document = GetDocument(docId)
document[field] = newValue
document.version = GetNextVersion(document.version)
success = UpsertIfLatest(docId, document)
}
}


This means that we had to introduce a Document Service that used the Storage components to implement the high level APIs (AddIfNotExists/ SetField/ AppendToField/ RemoveFromField). The Document Service creates versioned upserts that get duplicated to Storage components that talk to separate clusters. The Storage components can make the decision if an incoming upsert is the latest version or not independently per cluster.

An additional complication was that the two different versions of Elasticsearch had different implementations of their internal versioning and optimistic locking. So we had to invent a third, unified sequence number based causal consistency versioning scheme on top of the other two. The sequence numbers that we managed “outside” of the clusters were stored in a number of version fields on all documents.

With that said, we will not dive into further details here, but all in all it took a team of 4 people over 1 year to carefully and gradually re-architect our entire indexing and update pipeline to end up in a situation that eventually looked like this:

Figure 4. An overview of the services implementing the four operations in the consistency architecture

## Consistency architecture benefits

All the work to support two clusters was not just a “necessary evil” caused by the upgrade. It turned out to give value a lot sooner. Some of the extra benefits we gained were

• Improved automatic tests suites and our monitoring on consistency

• Improved the separation of concern across a number of components that previously were a bit too entangled

• Found and fixed a number of edge case bugs we had been having in the old cluster architecture.

• We support a “two cluster architecture” which can be reused in the future.

• This can be used for future upgrades of the cluster.

• We could also use it to set up another cluster in another part of the world for resiliency, data locality or latency reasons.

## Data migration between clusters

In parallel to the consistent architecture design and implementation, another decision we had to make was how to copy the data from the old Elasticsearch cluster into the new one.

We considered many different options, from implementing new components that could read Elasticsearch snapshots, and write them to the new cluster, to using the index from remote functionality. In the end, we decided to export all of the data (1PB) using the scan and scroll API and then store compressed batches of JSON files on S3. Then we let these JSON documents go through our normal ingestion pipeline from the beginning to end.

This allowed us to not only replicate the data, but also improve it at the same time. We could remove custom code that accounted for older data, by adding fields that had been introduced since they were indexed. This resulted in a more consistent, slimmer and more comprehensible information model for the new cluster which was a great improvement by itself.

We estimated that we could run the exporting, reprocessing and reindexing pipeline at ~50,000 documents/second. This meant that the whole backfill (400 billion documents) would be ongoing for at least 3 months assuming we could keep it going without interruptions.

Since the data copying would take such a long time we also decided that we wanted to architect the rest of the system so that we could take advantage of “partial” data that got available in the new system gradually. This would avoid a big bang switch at the very end. This decision influenced our plans and design further on, and turned out to be very valuable in the end, but more on that in a later blog post.

Figure 5 An overview of the backfill speed over a period of two months

The peak indexing throughput during the migration was 130,000 documents/second. But as can be seen in Figure 5 above, we were not able to maintain that throughput for very long for various reasons. And when we started to serve a large portion of end user queries in the new cluster, we also had to slow down indexing in order to not impact search latency more than necessary.

We can also mention that our estimate on the average throughput turned out to be very accurate. The whole rollout took ~110 days from starting indexing until the new cluster had all the data.

This concludes the second part in our blog post series on how we upgraded our Elasticsearch cluster. Stay tuned for the next post that will be published some time next week.

To keep up-to-date, please follow us on Twitter or Instagram.