Geosharded Recommendations Part 3 - Consistency

Geosharded Recommendations Part 3: Consistency

Geosharding Team

By:

  • Devin Thomson: Lead, Backend Engineer
  • Xiaohu Li: Manager, Backend Engineering
  • Daniel Geng: Backend Engineer
  • Frank Ren: Director, Backend Engineering

Intro

In the previous posts (here and here), we covered the sharding mechanism and the architecture of a scalable, geosharded search cluster. In this final installment we are going to describe data consistency problems seen at scale, and how to solve them.

Consistency

When dealing with a distributed system with several datastores, the question of consistency must be addressed. In our use-case, we have a mapping datastore to map a document id to a geoshard, and the geosharded indexes themselves.

What can happen if you don’t design for consistency? Failed writes and/or stale data. We’ll address the following solutions to consistency issues:

  • Ensure guaranteed write ordering.
  • Ensure strongly consistent reads from all datastores.

Guaranteed Ordering

In a geosharded index design, documents can move from index to index. In the Tinder world, the simplest example would be a user taking advantage of the “Passport” feature, where they put themselves somewhere else on the Earth and swipe on local users immediately. The document must correspondingly be moved to that geoshard so that the local users can find the Passporting user and matches can be created. It’s quite common that multiple writes for the same document are occurring within milliseconds of each other. Without guaranteed ordering, the following can occur:

  1. User Passports to another geoshard B.

  2. User Passports back to the original geoshard A less than 200ms later (this is possible).

  3. Message sent via action 2 is pulled out of the queue before action 1.

     a. The doc is not in geoshard B so the move   
        process fails.
    
  4. Message sent via action 1 is pulled out of the queue.

     a. The document is moved to geoshard B.
    

It’s clear that this is a very bad state. The user has indicated they want to move back to their original location, but the document is in the other location.

Kafka provides a scalable solution to this problem. Partitions may be specified for a topic that allows parallelism with consistent hashing of keys to specific partitions. Documents with the same keys will always be sent to the same partitions, and consumers can acquire locks on the partitions they are consuming to avoid any contention. We determined that moving to Kafka was necessary to remove this variable.

1--2-

A note on other options - many queueing technologies use a “best-effort” ordering, which will not satisfy our requirements, or they provide a FIFO queue implementation but only capable of very low throughput. This is not an issue in Kafka, but depending on the traffic pattern another technology can be suitable.

Datastore Consistency

Elasticsearch is classified as a near real-time search engine. What this means in practice is that writes are queued into an in-memory buffer (and a transaction log for error recovery) before being “refreshed” to a segment on the filesystem cache and becoming searchable. The segment will eventually be “flushed” to disk and stored permanently, but it’s not necessary to be searchable. See this page for details.

2--1-

In Figure 1, the write has been added to the in-memory buffer but is not yet searchable.
In Figure 2, the in-memory buffer has been refreshed as a new segment in the filesystem cache, now searchable.

We can once more refer to the Passport example for what can go wrong:

  1. User Passports to another geoshard B.

  2. User Passports back to geoshard A less than 200ms later.

  3. Message sent via action 1 is processed from the queue first.

    a. Geoshard mapping is updated in the mapping
       datastore successfully.
    b. Document is moved “successfully” to    
       geoshard B.
             i. The write has been pushed to the 
                 in-memory buffer, but has yet to 
                 be refreshed.
    
  4. Message sent via action 2 is processed from the queue next.

    a. The mapping datastore query for the 
       current geoshard returns geoshard B, 
       and is updated back to geoshard A.
    b. The write fails to move the document back 
       to geoshard A because the move to geoshard 
       B was never refreshed and searchable.
    

The state is inconsistent between the mapping datastore and search index, and the document will remain in geoshard B.

The solution to this is using a workflow that guarantees strong consistency within search index. The most natural API for moving a document from index to index is the Reindex API, however that relies on the same realtime search expectation and is thus unacceptable. Elasticsearch does provide the Get API, however, which by default comes with functionality that will refresh the index if attempting to fetch a document that has a pending write that has yet to be refreshed.

The updated flow:

  1. User Passports to another geoshard B.

  2. User Passports back to geoshard A less than 200ms later.

  3. Message sent via action 1 is processed from the queue first.

     a. The mapping datastore query for the     
        current geoshard returns geoshard A, and 
        is updated to geoshard B.
     b. The document is retrieved from geoshard A
        using the GET api (forcing a refresh 
        if not already done).
     c. The document is indexed into geoshard B.
     d. The document is removed from geoshard A.
    
  4. Message sent via action 2 is processed from the queue next.

     a. The mapping datastore query for the                   
        current geoshard returns  
        geoshard B, and is updated to geoshard A.
     b. The document is retrieved from geoshard B 
        using the GET api 
        (forcing a refresh if not already done).
     c. The document is indexed into geoshard A.
     d. The document is removed from geoshard B.
    

Using a GET api that refreshes the index if there are pending writes for the document being fetched eliminates the consistency issue. A slight increase in application code to perform a GET + Index rather than just a Reindex is well worth the trouble avoided.

A final note - the mapping datastore may also have an eventually consistent data model. If this is the case then the same considerations must also be taken (ensure strongly consistent reads), else the mapping may point to the document being in a different geoshard than it actually is in, resulting in failed future writes.

Expect Failure

Even with the best possible design, issues will happen. Perhaps something upstream failed processing midway, causing a document to never be indexed or moved properly. Perhaps the process that performs the write operations to the search index crashes midway due to some hardware problem. In any event, it’s critical to be prepared for the worst. Outlined below are some strategies to mitigate failures.

Retrying

To ensure successful writes during an unexpected period of high latency or failure, it’s necessary to have some sort of retry logic in place. This should always be applied using an exponential backoff algorithm with jitter (see this blog post for details). Tuning the retry logic depends on the application - for example if writes are happening within a request initiated from a client application then latency may be a major concern.

If writes are happening asynchronously from a worker reading from a kafka topic, as mentioned before, write latency is less of a concern. Kafka (and most streaming solutions) offer checkpointing to ensure that in the event of a process crash the application can resume processing from a reasonable starting point. Note that this is not possible from a synchronous request and the client application will have to retry, potentially blocking the client application flow.

Refeeding

As mentioned above, in some cases something can fail upstream and cause the data to become inconsistent between the search datastore and other datastores. To mitigate this, the application can refeed the search datastore from the “source of truth” datastore. One strategy would be to refeed in the same process that writes to the search datastore, such as when a document is expected to be present but is not. Another would be to periodically refeed using a background job to bring the search datastore back in sync. You will need to analyze the cost of whatever approach you take, as refeeding too often may put undue cost on your system, but refeeding too infrequently may lead to unacceptable levels of consistency.

Final Takeaways

  • For a location based service that faces load challenge, consider geo-sharding.
  • S2 is a good library for geo-sharding, and hilbert curve is awesome for preserving locality.
  • Consider how to measure load(load score) before trying to shard for better load balance.
  • Performance test is critical piece for rolling out new infrastructure.
  • Consider leveraging randomness to solve hard problem in engineering practice.
  • Ensure data consistency through guaranteed write ordering and strongly consistent reads.
  • Expect failures and design accordingly.