-
Notifications
You must be signed in to change notification settings - Fork 97
TokuMX Replication
This document describes the user experience and behavior of replication in TokuMX.
First, quick glossary of how some terms relate between MongoDB and MySQL (going from MongoDB -> MySQL):
- collection = table
- primary = master
- secondary = slave
- opLog = binary log for master
At a high level, vanilla MongoDB replication works as follows:
- Replication setups, called replica sets, have one primary instance and multiple secondary instances.
- All writes are made to the primary instance, and replicated asynchronously to the secondary instances.
- The secondary instances are read-only. To modify a secondary instance, one must take the secondary out of the replica set.
- If the primary goes down for some reasons, one of the secondaries gets promoted to be the new primary. In other words, they have automatic failover. Whatever data that was on the primary that did not make it to the secondaries is lost.
First, some notes on the data:
- On the primary, the replication data is stored in a collection that is called the opLog. This is the first big difference between MySQL and MongoDB. In MySQL, the master stores replication data in flat files called the binary log. In MongoDB, this information is stored in another dictionary. Using this in TokuMX, that means writing to the opLog can be done with the same transaction that does the actual work, so keeping the opLog consistent with the state of collections is trivial. In MySQL, two-phase commit is required.
- The replication is basically row based replication. Individual inserts, updates, and deletes are replicated. If an update statement updates 100 rows, then 100 individual entries are placed in the opLog. For updates and deletes, I don't think they log the entire row, just the id field and the necessary deltas.
- Because statements are not transactional, statements write to the opLog as they modify collections. If a crash or error occurs, then nothing is rolled back. What MongoDB ensures is that the state of the opLog reflects the data in the collections.
- The locking that protects access to the opLog is a database-level lock. In vanilla MongoDB, all collections are protected by a database-level lock.
When data is inserted into a primary, the following happens:
- For each row modification (be it insert, delete, or update), the collection is updated
- The opLog is updated to reflect that change.
As far as the system goes, that data is now in MongoDB. It is yet to be durable and may dissappear on a crash, but the data exists and will not be automatically rolled back.
As data is inserted, threads are made aware that there is now new data to be sent to secondaries. This is the "long-polling tailable cursor" magic.
Secondaries in MongoDB do not have the equivalent of a relay log. As data comes in, they place it in an in-memory queue. As data is stored in the queue, secondaries use multiple threads to apply data to the collections in parallel and to the oplog. The parallelism is on a per-collection or per database basis. They can do this because they do not have multi statement transactions that touch multiple collections. Leif doesn't know how this per-collection parallelism works with the per-database locking they have. It may be done on a per-database basis. Zardosht is not sure anymore.
As data is applied, notifications are sent to the primary to say the data has been stored. When users run getLastError() with a write concern asking to ensure that data has been replicated to secondaries, this is the assurance they have.
Here are some design properties that they have:
- MongoDB's oplog is idempotent. Idempotency has been very tricky for them to get right, but they rely on it. They use idempotency to deal with being non-transactional.
- Upon coming up from a crash, (I thin) they use idempotency to fill gaps in their opLog. If they have gaps in their opLog, they find a safe point known to have no gaps before it, and start replicating from there. If some data gets replicated twice, that is ok, because they rely on idempotency.
- Because they are not transactional, MongoDB does not get screwed by a large update statement the way that MySQL does with row based replication. Yes, much data is written to the opLog, but because they write it as they do the work, there is not a large stall at the end of the transaction to replicate all of the data the way there is with MySQL.
At a high level, here is how a secondary is created:
- the position in the opLog is recorded
- They iterate over the opLog and all collections, copying them.
- When the copying to the secondary is complete, they replicate from the primary starting at the recorded position
Here is where the existing replication algorithm depends on the idempotency of applying opLog data to secondaries. Because MongoDB is not transactional, the state of the copied collections is not a snapshot from the time the position in the opLog was recorded. The state of each collection is undefined. However, if playing their opLog is idempotent, then they can start at the recorded position, catch up with the primary, and be assured that their secondary is in sync with their primary.
The requirement of idempotency has caused them issues and has been difficult to get right.
I am not sure what happens when a secondary fails, but I imagine it is brought back up and is caught up with the primary. Because the secondary is guaranteed to be behind the primary, this seems straightforward.
Where things get interesting is if the primary goes down. In this case, a secondary must step up and become primary.
For the purposes of this section, consider two kinds of secondaries:
- those that can become a primary in the event of a failover (as defined by the user)
- and those that cannot become a primary.
When the primary goes down, all secondaries have data up to some position in the opLog. Note that this does not mean all data has been applied, just that the data resides in the opLog. Here is what is done:
- the secondary that is the furthest along is chosen. (If there is a tie, based on user settings, the tie is broken, but that is not very interesting).
- If this secondary is NOT eligible to become the new primary, then some eligible secondary connects to this secondary, is brought up to date by this secondary via replication, and that eligible secondary becomes the new primary.
The new primary finishes applying all the data in its opLog to its collections, and then work can resume.
Now what happens to the old failed primary? Note that the secondary that became the new primary may have lost some data that the primary had. So the old primary may not be able to just stick it self as a secondary, because some of its data may not be on the new primary. Here is what MongoDB does, according to an email from a 10Gen employee, Chad Tindel:
There are many subtleties in this process, but for a high-level overview:
- Suppose that A has decided to initiate a rollback. First, it finds the common point at which its oplog and B's oplog diverge (using oplog entry hashes, "h"). Call this point t0. A now needs to roll back all of the operations in A's oplog later than t0. It iterates through these oplog entries to identify the complete set of documents that would be affected by the rollback. Then, it queries B for this complete set of documents in their current state (suppose the time now is t1), and saves them in their appropriate collections. It then applies B's oplog entries as normal from t0 to t1. Once it has completed applying these, it considers itself as having a consistent view of the data.
Note that this design relies on idempotency. MongoDB claims that the primary coming up and having invalid data like this is a rare scenario.
There are four big challenges that we need to address:
- running replication on the primary with our own opLog. This involves writing data to the opLog that reflects the state of collections and sending it over to the secondary.
- creating a secondary from a primary and getting the secondary up and running
- handling crashes, both on the primary and on the secondary. If the primary crashes, we must handle automatic failover to the secondary
- having the secondary run in parallel. This item may not be immedietely necessary given how awesome fractal trees are, but because MySQL 5.6 and MongoDB already have this, our not having this may look bad.
Here are our high level design goals:
- to be as parallel as possible, with as little mutex contention as possible
- to be as transactional as possible. If we are to provide TokuMX users transactional semantics of any kind, then the replication system ought to honor it.
Let's start digging into each of these items.
For the purposes of this section, let's disregard how we get a secondary initially sycned up with a primary, and just assume that it is possible.
Arguably the biggest difference between TokuMX and MongoDB is that TokuMX is transactional. Individual statements are atomic. The opLog must reflect that. So, if a statement does 100 updates successfully, then all 100 should show up in the opLog, and if the statement fails, then none of them should end up in the opLog. Here is what we do:
- as a transaction does work, all operations are logged in a buffer local to the transaction. If the buffer gets too big, then its contents will spill into another dictionary, call it the localOpRef (for local operations reference) dictionary, so we don't overuse memory.
- When the transaction is ready to commit, the transaction gets a global transaction ID (GTID) from a GTID Manager. GTID's are handed out in increasing order. This GTID will identify this transaction on all machines in the replica set, now and forever.
- The key in the oplog dictionary is prefixed by the GTID.
- The transaction proceeds to write to the opLog all operations performed within that transaction, that is now associated to the assigned GTID. The writing is done as follows:
- If the transaction's buffer spilled into the localOpRef dictionary, then the remaining opLog information is written to the localOpRef dictionary, and a reference to the localOpRef is stored in the opLog. This is done so not a lot of time is wasted reading information from the localOpRef and copying to the opLog.
- If the transaction's buffer did not spill, then the opLog information is written directly to the opLog.
- Note that all operations for a transactions are logically contiguous in the opLog.
- The transaction commits, thereby committing the data in the oplog and in the system.
- After commit, the transaction notifies the GTID manager that this GTID has committed
To replicate data to secondaries, the primary follows these simple rules:
- only committed data is replicated
- all data is replicated to secondaries in increasing GTID order. We never "go backwards" in the opLog to replicate data. As an example, suppose GTID 'A' < GTID 'B' < GTID 'C'. If A and C have committed, but B has not, then only A may be replicated. C cannot be replicated, because B has yet to commit and be replicated. Once B has committed and replicated, then C may be replicated.
Here is how we do this. At all times, the GTID manager maintains the minimum GTID that has yet to commit. Secondaries may replicate up to but not including this GTID. Whenever the minimum GTID yet to commit changes, secondaries are somehow signaled to replicate more data. So, if a large transaction is the minimum GTID yet to commit for a long time, replication lag may occur.
Note that this above rule is slightly tweaked when discussing crash safety and automatic failover, but the basic idea is the same.
Let's discuss the benefits and drawbacks of this system. The big benefit is transactions can write to the oplog in parallel. The only serialized piece (at the TokuMX layer) is the GTID generation. The drawback is that large transactions may suck. They cause replication lag, because a large transaction will cause a lot of data to be transferred after commit, and because currently the commit of a TokuDB transaction that does a lot of work takes a long time. Because replication is row-based, large transactions cause lots of bandwidth and disk usage. In the future, if necessary, we shift a bunch of the work done at the TokuDB layer onto background threads to reduce latency. MongoDB does not have the lag issue, because they are not transactional. They write to and replicate data as it is written to the opLog, not waiting for any transaction or statement to complete. Large transactions behave better than binary logging in MySQL in this way. On the primary, a large transaction will not stall other transactions by blocking access to the opLog, whereas in MySQL, a large transaction will block access to the binary log.
Because applying our opLog will not be idempotent, we cannot rely on the same algorithm that MongoDB uses. So what are our options. Let's start first with an enterprise feature, using a hot backup of a primary (or possibly another secondary) to create a secondary. For simplicity, let's assume the machine we backup is the primary.
Suppose the primary opLog has GTIDs A, B, and C, and we take a hot backup of the primary. Suppose the backup has A and C committed, but not B. When the backup is started, its opLog will contain A and C, but have no record of B. For this backup to be a valid secondary, we need a way to tell the primary to start replicating at a point that ensures we pick up B. We cannot tell the primary to start replicating at C, because then B is missed. So we need to tell the primary to start replicating from a point where it is known the backup has all GTID's prior to that point applied. The point does not need to be optimal, as the backup can filter out and not apply GTIDs that have already been applied (e.g. A or C in the case above).
Here is how we select that point. On a background thread on the primary, once every short period of time (1 second, half a second, whatever), the primary writes the minimum uncommitted GTID to a TokuDB dictionary, call it the replInfo dictionary (or perhaps collection). The replInfo dictionary is guaranteed to be picked up by the backup. The backup then uses this dictionary to determine the point which replication should start. This point in time will be a little behind optimal, but if the period is short enough, it should be at most seconds behind. If we are taking a hot backup of a secondary, the same algorithm applies, as the secondary will also have a minimum uncommitted GTID. Just note that this is the minimum uncommitted GTID applied to the opLog, NOT applied to collections (on secondaries, data is replicated and committed to the opLog first, then later is applied to collections).
With this data, making a hot backup be a secondary seems simple. Just take the hot backup, plug it in as a secondary, and start replication from this recorded point.
Now suppose we are not using hot backup. Here is an alternative algorithm to creating a secondary:
- Make a snapshot transaction on the primary.
- grab lock tree locks on metadata dictionaries to ensure collections cannot be modified, as adding/dropping collections/indexes may cause issues. TokuDB fileops is not MVCC. If there is a race condition between these first two steps, I am sure something can be done to address it. We won't speculate on that here.
- Copy the opLog, replInfo dictionary, and all collections over to the secondary
- use the replInfo dictionary to determine where to start replication, as this snapshot will have the same issues that the hot backup has.
For this section, let's presume that secondaries MAY do work in parallel, but we will not discuss how. The goal for this section is to discuss the protocol of receiving data from the primary ensuring crash safety. Note that failover is not an issue here yet. Failover is the act of recovering from a primary going down.
Here is how a secondary works:
- one thread gets a GTID and data from the primary and transactionally writes it to the opLog.
- when the transaction commits, the data is now considered to be on secondary, and any secondary write concern that the primary is waiting on with respect to that GTID is satisfied
- another thread notices added GTIDs and spawns threads to apply them to collections. By some algorithm, assume some GTIDs may be applied to collections in parallel. This implies that GTIDs may be committed to collections out of order.
Because clients can only do read queries on secondaries, there are several optimizations we can perform on slaves for writes:
- we can bypass the lock tree
- we can skip uniqueness checks, because the primary will have already verified uniqueness
- no opLog operation should require a query like updates do in MySQL replication, because the opLog should contain all necessary data to apply the operation without a query.
In summary, applying writes on slaves should be very fast.
Now let's discuss what happens when a secondary crashes.
Because GTIDs are added to the secondary's opLog in order, the secondary knows the end of the opLog is the position where the primary must start replicating. Hence, we know the minimum uncommitted GTID is the end of the opLog, and that there are no gaps in the opLog that must be filled by the primary.
Because the application of GTIDs to collections may happen out of order, there is not a defined location in the opLog where entries before that position have been applied to collections and entries after that position have not been applied. Here is a familiar example. The secondary's opLog has GTIDs A, B, and C, where A and C have been applied, but B has not. Upon recovering from a crash, the secondary must find a way to apply B, but not C.
Here is what we do:
- On all machines, primary and secondaries, each GTID will have a boolean byte, call it the "Applied" variable for a GTID, stored in the opLog to state whether that GTID has been applied to collections or not.
- On a primary, when adding data to the opLog for a GTID's Applied value is set to true as part of the transaction doing the work.
- On a secondary, the transaction adding the replication data for a GTID will set the value to false as part of that transaction
- When a secondary has a transaction apply the GTID to the collections, that transaction will also change the GTID's applied value in the opLog from false to true.
- On a background thread, once every short period of time (e.g. 1 second), the replInfo dictionary will be updated with the minimum unapplied GTID, that will be maintained in memory.
- Upon recovering from a crash, we read a conservative value for minimum unapplied GTID. Starting from that value, we read forward in the opLog, and for each GTID, if its applied value is false, we apply it, if it is true, we don't apply it.
This is how we get the secondary back up and running after a crash.
Suppose the primary crashes. There are two possible options, both of which I think are valid for the user to want:
- The primary goes through crash recovery and comes back as the primary
- Automatic failover happens and an existing secondary becomes the new primary
Let's discuss the first option, which is simpler. If the user wants to wait for the primary to undergo recovery and come back as the primary, then we must ensure that the recovered primary is still ahead of all secondaries. There cannot be a secondary that contains data that the primary failed to recover, otherwise we are inconsistent. To ensure this, we must make the conditions under which a GTID may be replicated stricter than we had above. Above, we stated that a GTID may be replicated from the primary to a secondary if we are sure that all prior GTIDs have committed and have been replicated. We now need to add another condition: that the recovery log has been fsynced to disk to ensure that should the primary crash, this GTID will be recovered. This is not hard.
Here is how we ensure all replicated GTIDs have been synced to disk and will survive a crash. We tweak the algorithm mentioned in "replication on the primary". Instead of allowing all GTID's up to the minimum uncommitted GTID to be replicated, we allow all GTID's up to the minimum uncommitted GTID before the last call to log_flush to be committed. That is, if we are flushing logs on a period, then before we do so, we record the minimum uncommitted GTID, and after the call to log_flush, this recorded value is the new eligible maximum for replicated GTIDs.
Now let's discuss the second case.
We have two types of secondary indexes:
- Running Secondary: This machine was successfully running as a secondary and there are no gaps of missing GTID's in its opLog.
- Synchronizing Secondary: This machine was in the process of synching with the primary, because it was newly created, and may have gaps of missing GTIDs in its opLog.
For simplicity of an initial design, let's say that if a primary goes down, then any synchronizing secondary is unrecoverable and cannot be integrated into the replica set. Those machines are lost and must be rebuilt (or resynced) from scratch. So, now we are dealing with a simpler problem. Given a number of running secondaries, how do we elect a new primary? As follows. We elect the secondary that has the largest committed GTID. That secondary is the furthest ahead, so that secondary becomes the new primary. If there is a tie, then the tie is broken based on user settings, which are uninteresting here. If the secondary that is furthest ahead is deemed ineligible by the user to become the new primary (the reason does not matter, user says so), then some eligible secondary is connected to this ineligible secondary and is caught up to match the ineligible secondary. The eligible and caught up secondary then becomes the new primary.
Once a new primary has been selected, that primary must bring its collections fully up to date with its opLog. Then, the new primary may accept writes.
Now let's address another question: what happens with the old primary that crashed? How can it be re-integrated into the replica set as a secondary?
The answer depends on the state of the data in the old primary after recovering from a crash. When a primary fails over to a secondary, some data that was committed on the primary may have never made it to the secondary that was promoted. If none of that data persists on the old primary after recovery, then the old primary can seamlessly step in as a secondary. However, if any of that data is on the old primary, then the primary must rollback that data before it can step in as a secondary, to put itself in sync with the new primary.
If we can identify a spot in the old primary's opLog to rollback to, then with point in time recovery, we play the opLog backwards, deleting elements from the opLog while reversing the operations it has stored, until we get to a point in the opLog where the old primary can be integrated as a secondary. Although, given the fact that MongoDB has some rollback, maybe this is not an enterprise feature.
Another challenge we have is being able to identify the point in the old primary's opLog we need to rollback to, or to verify whether rollback is necessary.
Here is how we address this challenge. Define the GTID to be two 8 byte integers, (primarySeqNumber, GTSeqNumber). The primarySeqNumber changes ONLY when the primary changes. This includes restarts of the primary, or switching to another machine via failover. The GTSeqNumber increases with each transaction. So, if we see GTIDs of (10, 100), (10, 101), (10, 102), (11, 0), (11, 1), (11, 2) ... where (10) and (11) are the only values for (primarySeqNumber), we know there was a failover or restart between (10, 101) and (11, 0). The idea is that no GTID in the system will ever be assigned twice. We also store a hash in each oplog entry that is function of the previous operation and the contents of this operation.
Taking an old primary, we can examine the GTIDs at the end of its opLog, and scan backwards until we find one that shows up in both the old primary and new primary. Once we find the greatest GTID that shows up in the old primary and the new primary, we have found our point in time where we must rollback to. Once we rollback to this point in time, we can integrate the old primary as a slave.
MySQL 5.6 introduced parallel slave replication. MongoDB also has threads running replication on parallel on secondaries. We need to do something. As of now, this is an open problem. Here are some interesting links where MariaDB's Kristian Nielsen analyzes in some places:
- https://lists.launchpad.net/maria-developers/msg04837.html <-- argument for making the master give information on how parallel an item can be
- https://mariadb.atlassian.net/browse/MDEV-26 <-- MariaDB GTID design
I (Zardosht) will do more research and fill this piece with more information. As of now, this piece is not designed.
With the design above, here is a proposal for an enterprise feature: point in time recovery. With point in time recovery, via a tool we write, a user can specify a location in the opLog to revert to. The reverting can either delete opLog entries while going backwards, or can add entries to the opLog that are the inverse of previous operations. This feature does not exist in MySQL without the existence of a backup. In MySQL, one can only take a backup and recover to a point in time going forward from the backup. This proposed feature does not require a backup.
For such a feature to work, we have the following restrictions:
- any operation that is stored in the opLog must be applyable and reversable. If the operation is not reversable (e.g., logging a delete with just its primary key and not the full row), then point in time recovery will not work.
- no deleting of files may show up in the oplog, because we don't have the ability to reverse a deleted file.
An open question is whether tools to examine the opLog can/should be enterprise.
- when inserting into opLog on primary, we don't need the lock tree, we can use DB_PRELOCKED_WRITE
- if opLog overhead is high, John did some work to make serial insertions fast on a branch somewhere where we automatically pin the leaf node and don't descend down the tree, perhaps we can resurrect it if necessary
- Will there be any user experience issues given how different the oplog is? Do users implement poor-man triggers with the opLog today? What else do they do with the opLog?
- We need a way to trim the opLog, something simple, does not need to be fancy
- No idea how to potentially do something like MySQL multi-master replication or master-master replication
- opLog is a capped collection, so they automatically keep it a certain size
- fileops is tricky with point in time recovery and with creating a slave
- Random idea that complicates some things possibly:
- On a secondary, GTIDs don't need to be added to the opLog in order, they can be added in parallel? Probably not much to gain here because the bulk of the work ought to be in applying it to collections.
- Possible optimization: find a way to do operations on a secondary in bulk. Don't apply just one GTID to the opLog per transaction, apply a bunch. Don't apply just one GTID to collections, apply a bunch. Make the relative cost of creating and destroying transactions cheaper.