Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sequence Numbers to write operations #10708

Closed
58 of 64 tasks
bleskes opened this issue Apr 21, 2015 · 16 comments
Closed
58 of 64 tasks

Add Sequence Numbers to write operations #10708

bleskes opened this issue Apr 21, 2015 · 16 comments
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. >feature Meta resiliency

Comments

@bleskes
Copy link
Contributor

bleskes commented Apr 21, 2015

Introduction

An Elasticsearch shard can receive indexing, update, and delete commands. Those changes are applied first on the primary shard, maintaining per doc semantics and are then replicated to all the replicas. All these operations happen concurrently. While we maintain ordering on a per doc basis, using versioning support there is no way to order them with respect to each other. Having such a per shard operation ordering will enable us to implement higher level features such as Changes API (follow changes to documents in a shard and index) and Reindexing API (take all data from a shard and reindex it into another, potentially mutating the data). Internally we could use this ordering to speed up shard recoveries, by identifying which specific operations need to be replayed to the recovering replica instead of falling back to a file based sync.

To get such ordering, each operation will be assigned a unique and ever increasing Sequence Number (in short, seq#). This sequence number will be assigned on the primary and replicated to all replicas. Seq# are to be indexed in Lucene to allow sorting, range filtering etc.

Warning, research ahead

What follows in this ticket is the current thinking about how to best implement this feature. It may change in subtle or major ways as the work continues. Is is important to implement this infrastructure in a way that is correct, resilient to failures, and without slowing down indexing speed. We feel confident with the approach described below, but we may have to backtrack or change the approach completely.

What is a Sequence

Applying an operation order on a primary is a simple question of incrementing a local counter for every operation. However, this is not sufficient to guarantee global uniqueness and monotonicity under error conditions where the primary shard can be isolated by a network partition. For those, the identity of the current primary needs to be baked into each operation. For example, late to arrive operations from an old primary can be detected and rejected.

In short, each operation is assigned two numbers:

  • a term - this number is incremented with every primary assignment and is determined by the cluster master. This is very similar to the notion of a term in Raft, a view-number in Viewstamped Replication or an epoch in Zab.
  • a seq# - this number is incremented by the primary with each operation it processes.

To achieve ordering, when comparing two operations , o1 & o2, we say that o1 < o2 if and only if s1.seq# < s2.seq# or (s1.seq# == s2.seq# and s1.term < s2.term). Equality and greater than are defined in a similar fashion.

For reasons explained later on, we maintain for each shard copy two special seq#:

  1. local checkpoint# - this is the highest seq# for which all lower seq# have been processed . Note that this is not the highest seq# the shard has processed due to the concurrent indexing, which means that some changes can be processed while previous more heavy ones can still be on going.
  2. global checkpoint# (or just checkpoint#) - the highest seq# for which the local shard can guarantee that all previous (included) seq# have been processed on all active shard copies (i.e., primary and replicas).

Those two numbers will be maintained in memory but also persisted in the metadata of every lucene commit.

Changes to indexing flow on primaries

Here is a sketch of the indexing code on primaries. Much of it is identical to the current logic. Changes or additions are marked in bold .

  1. Validate write consistency based on routing tables.
  2. Incoming indexing request is parsed first (rejected upon mapping/parsing failures)
  3. Under uid lock:
    1. Versioning is resolved to a fixed version to be indexed.
    2. Operation is assigned a seq# and a term
    3. Doc is indexed into Lucene.
    4. Doc is put into translog.
  4. Replication
    1. Failures in step 3 above are also replicated (eg due to failure of lucene tokenization)
    2. Send docs to all assigned replicas.
    3. Replicas respond with their current local checkpoint#.
    4. When all respond (or have failed), send answer to client.
  5. Checkpoint update:
    1. Update the global `checkpoint# to the highest seq# for which all active replicas have processed all lower seq# (inclusive). This is based on information received in 4.3 .
    2. If changed, send new global checkpoint# to replicas (can be folded into a heartbeat/next index req).

Changes to indexing flow on replicas

As above, this is sketch of the indexing code on replicas. Changes with the current logic are marked as bold.

  1. Validate request
    1. Seq#'s term is >= locally known primary term.
  2. Under uid lock:
    1. Index into Lucene if seq# > local copy and doesn't represent an error on primary.
    2. Add to local translog.
  3. Respond with the current local checkpoint#

Global Checkpoint# increment on replicas

The primary advances its global checkpoint# based on its knowledge of its local and replica's local checkpoint#. Periodically it shares its knowledge with the replicas

  1. Validate source:
    1. source's primary term is == locally known primary term.
  2. Validate correctness:
    1. Check that all sequence# below the new global checkpoint# were processed and local checkpoint# is of the same primary term. If not, fail shard.
  3. Set the shard’s copy of global checkpoint#, if it's lower than the incoming global checkpoint.

Note that the global checkpoint is a local knowledge of that is update under the mandate of the primary. It may be that the primary information is lagging compared to a replica. This can happen when a replica is promoted to a primary (but still has stale info).

First use case - faster replica recovery

Have an ordering of operations allows us to speed up the recovery process of an existing replica and synchronization with the primary. At the moment, we do file based sync which typically results in over-copying of data. Having a clearly marked checkpoint# allows us to limit the sync operation to just those documents that have changed subsequently. In many cases we expect to have no documents to sync at all. This improvement will be tracked in a separate issue.

Road map

Basic infra

Replica recovery (no rollback)

A best effort doc based replica recovery, based on local last commit. By best effort we refer to having no guarantees on the primary
translog state and the likelihood of doc based recovery to succeed and not requiring a file sync

  • Move local checkpoint to max seq# in commit when opening engine Tighten sequence numbers recovery #22212
    We currently have no guarantee that all ops above the local checkpoint baked into the commit will be replayed. That means that delete operations with a seq# > local checkpoint will not be replayed. To work around it (for now), we will move the local checkpoint artificially (at the potential expense of correctness) (@jasontedor)
  • Review correctness of POC and extract requirements for the primary side (@jasontedor) replaced with TLA+ work
  • Use seq# checkpoints for replica recovery (Introduce sequence-number-based recovery #22484 , @jasontedor)

Translog seq# based API

Currently translog keeps all operations that are not persisted into the last lucene commit. This doesn't imply that it can serve all operations from a given seq# and up. We want to move seq# based recovery where a lucene commit indicates what seq# a fully baked into it and the translog recovers from there.

Primary recovery (no rollback)

Primary promotion

Live replica/primary sync (no rollback)

Primary recovery with rollback

Needed to deal with discrepancies between translog and commit point that can result of failure during primary/replica sync

Replica recovery with rollback

Needed to throw away potential wrong doc versions that ended up in lucene. Those "wrong doc versions" may still be in the translog of the replica but since we ignore the translog on replica recovery they will be removed.

Live replica/primary sync with rollback

  • Allow a shard to rollback to a seq# from before last known checkpoint# based on NRT readers
  • Index all operations missing from the rollback point up to the global checkpoint from local translog

Seq# as versioning

Shrunk indices

Shrunk indices have mixed histories.

Adopt Me

  • Properly store seq# in lucene: we expect to use the seq# for sorting, during collision checking and for doing range searches. The primary term will only be used during collision checking when the seq# of the two document copies is identical. Mapping this need to lucene means that the seq# it self should be stored both as a numeric doc value and as numerical indexed field (BKD). The primary term should be stored as a doc value field and doesn't need an indexed variant. We also considered the alternative of encoding both term and seq# into a single numeric/binary field as it may save on a the disk lookup implied by two separate fields. Since we expect the primary term to be rarely retrieved, we opted for the simplicity of the two doc value fields solution. We also expect it to mean better compression. (@dakrone) Add internal _primary_term field to store shard's primary term #21480
  • Add primary term to DocWriteResponse (@jasontedor) Add primary term to doc write response #24171
  • how do we deal with the BWC aspects in the case that - a primary is running on a new node will one replica is on an old node and one replica is on a new one. In that case the primary will maintain seq# and checkpoints for itself and the replica on the new node. However if the primary fails it may be the case that the old replica is elected as primary. That means that the other replica will suddenly stop receiving sequence numbers. It is not clear if this really a problem and if so what the best approach to solve it. (@dakrone, Promote replica on the highest version node #25277)
  • Introduce new shard states to indicated an ongoing primary sync on promotion. See Live primary-replica resync (no rollback) #24841 (review) . We now have an alternative plan for this - see Introduce promoting index shard state #28004 (comment)
  • Delay primary relocation if primary has not yet synced with replicas . See Live primary-replica resync (no rollback) #24841 (review)

TBD

  • A primary that's allocated when in sync replicas aren't can advance the global checkpoint to a region that's unsafe - the primary doesn't about if it's local ops, which are above the global checkpoint actually exists on the other replicas (@ywelsch Introduce primary/replica mode for GlobalCheckPointTracker #25468).
  • File based recovery (both local and remote) can recreate deleted docs. If a delete is out of order with a previous index operation, we may replay the index operation on recovery, but not the delete. This has to do with the fact that we trim the translog/capture a starting point at an arbitrary generation, and replay all of it. Current solution - change the indexing plan on non-primary origin to never index to lucene below the local checkpoint (5.x is addressed in Engine - Do not store operations that are not index into lucene in the translog (5.x only) #25592)
  • How to deal with failures to sync the global check point? this is interesting as we will rely on the checkpoint to be eventually updated on all shard copies.
  • Throwing back local checkpoint to global checkpoint may leave us in a situation we don't have all ops in the translog to do primary/replica sync. This is because we don't have (yet) any guarantee that the translog has all ops above the global checkpoint. That shard will be problematic when promoted to primary (it doesn't have the data to do a sync), causing replica's local checkpoints to get stuck. This will change when we have a custom deletion policy but we may want to double check that and fail the primary if it can't sync it's replica.
  • When indexing stops, sync global checkpoint faster (now we wait on idle shard) (@jasontedor, Introduce global checkpoint background sync #26591)
  • Make local check point storage lazy intitialized to protect against memory usage during recovery (@jasontedor, Lazy initialize checkpoint tracker bit sets #27179)
  • Snapshot and restore may create a primary that violates all our checkpointing logic. We should quantify the scenarios this can happen and discuss appropriate solutions. A similar issue occurs with force allocate empty & stale primaries.
  • When indexing on a replica InternalEngine loads the primary term to resolve a potential conflict between two indexing operations to the same doc with the same seq#. Once we have proper rollbacks that should never be needed. Instead we should assert that the term is identical (and potentially that the doc it self is identical).
  • Make replica shard allocation code aware of sequence-numbers, so that it selects a node with an existing shard copy not purely based on the number of matching Lucene segments, but also based on the number of operations that it's missing from the primary. Similarly, do the same to cancel existing recoveries if a node with an existing shard copy comes back that has better matching stats. (@dnhatn, Sequence number based replica allocation #46959)

Completed Miscellaneous

  • Review feasibility of old indices (done and implemented in Add BWC layer to seq no infra and enable BWC tests #22185 ) (@bleskes)
  • Remove Shadow replicas (@dakrone Remove shadow replicas #23906)
  • If minimum of all local checkpoints is less than global checkpoint on the primary, do we fail the shard? No, this can happen when replicas pull back their local checkpoints to their version of the global checkpoint
  • Failed shards who's local checkpoint is lagging with more than 10000 (?) ops behind the primary . This is a temporary measure to allow merging into master without closing translog gaps during primary promotion on a live shard. Those will require the replicas to pick them up, which will take a replica/primary live sync
@shikhar
Copy link
Contributor

shikhar commented May 5, 2015

First use case - faster replica recovery

I'd argue the first use case is making replication semantics more sound :)

bleskes added a commit that referenced this issue Oct 21, 2015
Every shard group in Elasticsearch has a selected copy called a primary. When a primary shard fails a new primary would be selected from the existing replica copies. This PR introduces `primary terms` to track the number of times this has happened. This will allow us, as follow up work and among other things, to identify operations that come from old stale primaries. It is also the first step in road towards sequence numbers.

Relates to #10708
Closes #14062
@clintongormley clintongormley added :Sequence IDs Meta and removed :Core/Infra/Core Core issues without another label labels Nov 18, 2015
bleskes added a commit that referenced this issue Nov 19, 2015
Adds a counter to each write operation on a shard. This sequence numbers is indexed into lucene using doc values, for now (we will probably require indexing to support range searchers in the future).

On top of this, primary term semantics are enforced and shards will refuse write operation coming from an older primary.

Other notes:
- The add SequenceServiceNumber is just a skeleton and will be replaced with much heavier one, once we have all the building blocks (i.e., checkpoints).
- I completely ignored recovery - for this we will need checkpoints as well.
- A new based class is introduced for all single doc write operations. This is handy to unify common logic (like toXContent).
- For now, we don't use seq# as versioning. We could in the future.

Relates to #10708
Closes #14651
bleskes added a commit to bleskes/elasticsearch that referenced this issue Nov 22, 2015
…operations

The work for elastic#10708 requires tighter integration with the current shard routing of a shard. As such, we need to make sure it is set before the IndexService exposes the shard to external operations.
bleskes added a commit that referenced this issue Nov 23, 2015
…operations

The work for #10708 requires tighter integration with the current shard routing of a shard. As such, we need to make sure it is set before the IndexService exposes the shard to external operations.

Closes #14918
bleskes added a commit that referenced this issue Nov 23, 2015
…operations

The work for #10708 requires tighter integration with the current shard routing of a shard. As such, we need to make sure it is set before the IndexService exposes the shard to external operations.

Closes #14918
bleskes added a commit that referenced this issue Dec 15, 2015
This PR introduces the notion of a local checkpoint on the shard level. A local check point is defined as a the highest sequence number for which all previous operations (i.e. with a lower seq#) have been processed.

relates to #10708

Closes #15390

formatting
@rkonda
Copy link

rkonda commented Mar 8, 2016

It's not clear as to what would happen in the following split brain scenario (scenario-1):

  1. split occurs, forming two networks
  2. the network that didn't have a master, elects a master (call this network-2)
  3. the master will elect a new primary (in network-2)
  4. the primary in network-2 now has incremented term value (say 11). The primary in network-1 continues to have the same term value (10 in this example)
  5. The connection between the networks is re-established.

In this case we need a strategy for reconciling the differences in the indexes, if there were change operations in both the networks. Does a strategy like that exist today? So far it seems like this situation is preventable by using min_master_nodes. However in case min_master_nodes is not set appropriately, some default strategy should come into effect I would think.

An example strategy could be:

  1. Keep logs of write operations in both networks for a configurable amount of time. If the networks' connectivity is restored within this time period: (a) Drop all nodes in network-2 to read-only replica status (b) Attempt to reconcile the differences, and use network-1's state if the differences are not reconcilable. (c) Remove read-only status
    If the connectivity isn't restored within that time, when connection is restored, all indices in network-2 that have competing primaries in network-1 will lose their shards, and replicas are created from network-1.

Another interesting situation (scenario-2) to consider:

  1. Continuing with the scenario described above until (4) ...
  2. network-1 has another split, creating network-1 and network-1a. Network-1a gives term value of 11 to the new primary in that network.
  3. network-1 completely fails, and connectivity between network-1a and network-2 are restored. Now we may have a scenario where the subsequent change operations might not fail but still lead to different indexes in the replicas, with some operations failing some of the time, creating a messy situation.

This would happen if there is no reconciliation strategy in effect.

I do see that the sequence numbering method will keep shards that have connectivity to both the networks, in integral state, in the case of scenario-1. In the case of scenario-2, it is possible that the same shard gets operations with same term values from multiple primaries, and that again could create faulty index in that replica.

I am still trying to understand Elasticsearch's cluster behavior. It's possible that I might have made assumptions that aren't correct.

@bleskes
Copy link
Contributor Author

bleskes commented Mar 8, 2016

In this case we need a strategy for reconciling the differences in the indexes, if there were change operations in both the networks. Does a strategy like that exist today?

The current strategy, which seq# will keep enforcing but in easier/faster way, is that all replicas are "reset" to be an exact copy of the primary currently chosen by the master. As you noted, this falls apart when there are two residing masters in the cluster. Indeed, the only way to prevent this is by setting minimum master nodes - which is the number one most important setting to set in ES (tell it what the expected cluster size is)

If min master nodes is not set and a split brain occurs, resolution will come when one of the masters steps down (either by manual intervention or by detecting the other one). In that case all replicas will "reset" to the primary designated by the left over master.

Drop all nodes in network-2 to read-only replica status

This is similar to what ES does - nodes with no master will only serve read requests and block writes (by default, it can be configured to block reads).

it is possible that the same shard gets operations with same term values from multiple primaries, and that again could create faulty index in that replica.

If the term is the same from both primaries, the replica will accept them according to the current plan. The situation will be resolved when the network restores and the left over primary and replica sync but indeed there are potential troubles there. I have some ideas on how to fix this specific secondary failure (split brain is the true issue, after which all bets are off) but there are bigger fish to catch first :)

@rkonda
Copy link

rkonda commented Mar 8, 2016

Thank you very much for your clarification. I rather enjoy all these discussions and your comments.

The current strategy, which seq# will keep enforcing but in easier/faster way, is that all replicas are "reset" to be an exact copy of the primary currently chosen by the master.

The situation will be resolved when the network restores and the left over primary and replica sync

I would like to clearly understand the reset/sync scenarios. What triggers reset/sync?

I can think of a couple of "normal" operation scenarios

  1. I would think that whenever a node joins a network, the master would initiate a sync/reset.
  2. If a replica fails for a request, I suppose the primary should keep attempting a sync/reset, otherwise the replica might keep diverging, and at some point the master has to decommission that replica, otherwise the reads would be inconsistent.

In the case of split brain, with multi-network replicas (assuming min master nodes is set), primary-1 has been assuming that this replica R (on this third node, say N-3) has been failing (because of its allegiance to primary-2 ) but still is in the network. Hence it would attempt sync/reset. How does this protocol work? Should master-1 attempt to decommission R at some point, going by assumption (2)?

This problem will occur in a loop if R is decommissioned but another replica is installed on N-3 in its place, by the same protocol. There will be contention on N-3 for "reset"-ing replica shards by both the masters.

I suppose one way to resolve this is by letting a node choose a master if there are multiple masters. If we did this, then whenever a node loses its master, it would choose the other master, and there will be a sync/reset and all is well.

However if the node chooses its master, the other master will lose quorum, and hence cease to exist, which is a good resolution for this issue in my opinion.

@bleskes
Copy link
Contributor Author

bleskes commented Mar 8, 2016

The two issues you mention indeed trigger a primary/replica sync. I'm not sure I follow the rest, I would like to ask you to continue the discussion on discuss.elastic.co . We try to keep github for issues and work items. Thx!

@rkonda
Copy link

rkonda commented Mar 8, 2016

@makeyang
Copy link
Contributor

makeyang commented Apr 5, 2016

any plan to release this?
it seems after this release, u guys will make ES a AP system? will u provide config paramters to allow users to control ES to be a AP or CP system eventually?

@bleskes
Copy link
Contributor Author

bleskes commented Apr 5, 2016

@makeyang this will be released as soon as it is done. There's still a lot of work to do.

it seems after this release, u guys will make ES a AP system? will u provide config paramters to allow users to control ES to be a AP or CP system eventually?

ES is currently and will stay CP in the foreseeable future. If a node is partitioned away from the cluster it will serve read requests (configurable) but will block writes, in which case we drop availability. Of course in future there are many options but currently there are no concrete plans to make it any different.

bleskes added a commit that referenced this issue Jan 11, 2019
Add documentation to describe the new sequence number powered optimistic concurrency control

Relates #36148
Relates #10708
bleskes added a commit that referenced this issue Jan 11, 2019
Add documentation to describe the new sequence number powered optimistic concurrency control

Relates #36148
Relates #10708
bleskes added a commit that referenced this issue Jan 29, 2019
The update request has a lesser known support for a one off update of a known document version. This PR adds an a seq# based alternative to power these operations.

Relates #36148 
Relates #10708
bleskes added a commit that referenced this issue Jan 29, 2019
…y control (#37857)

The delete and update by query APIs both offer protection against overriding concurrent user changes to the documents they touch. They currently are using internal versioning. This PR changes that to rely on sequences numbers and primary terms.

Relates #37639 
Relates #36148 
Relates #10708
bleskes added a commit to bleskes/elasticsearch that referenced this issue Feb 1, 2019
…y control (elastic#37857)

The delete and update by query APIs both offer protection against overriding concurrent user changes to the documents they touch. They currently are using internal versioning. This PR changes that to rely on sequences numbers and primary terms.

Relates elastic#37639
Relates elastic#36148
Relates elastic#10708
bleskes added a commit to bleskes/elasticsearch that referenced this issue Feb 1, 2019
…ic#37872)

The update request has a lesser known support for a one off update of a known document version. This PR adds an a seq# based alternative to power these operations.

Relates elastic#36148
Relates elastic#10708
bleskes added a commit that referenced this issue Feb 1, 2019
…37872 (#38155)

* Move update and delete by query to use seq# for optimistic concurrency control (#37857)

The delete and update by query APIs both offer protection against overriding concurrent user changes to the documents they touch. They currently are using internal versioning. This PR changes that to rely on sequences numbers and primary terms.

Relates #37639
Relates #36148
Relates #10708

* Add Seq# based optimistic concurrency control to UpdateRequest (#37872)

The update request has a lesser known support for a one off update of a known document version. This PR adds an a seq# based alternative to power these operations.

Relates #36148
Relates #10708

* Move watcher to use seq# and primary term for concurrency control (#37977)

* Adapt minimum versions for seq# power operations

After backporting #37977, #37857 and #37872
bleskes added a commit that referenced this issue Feb 4, 2019
This commit moves the usage of internal versioning for CAS operations to use sequence numbers and primary terms

Relates to #36148
Relates to #10708
bleskes added a commit that referenced this issue Feb 4, 2019
This commit moves the usage of internal versioning for CAS operations to use sequence numbers and primary terms

Relates to #36148
Relates to #10708
bleskes added a commit that referenced this issue Feb 4, 2019
@bleskes
Copy link
Contributor Author

bleskes commented Feb 5, 2019

The work prescribed in this issue now completed and will be part of the coming 6.7 and 7.0 releases. There are still some small follow ups we want to do, but they do no need to be tracked as part of this issue. We now consider this completed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. >feature Meta resiliency
Projects
None yet
Development

No branches or pull requests

8 participants