Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support asynchronous ready #403

Merged
merged 46 commits into from
Nov 26, 2020
Merged

Support asynchronous ready #403

merged 46 commits into from
Nov 26, 2020

Conversation

gengliqi
Copy link
Member

@gengliqi gengliqi commented Oct 15, 2020

Description

In the past, ready must be used synchronously. It means the application must persist some data in ready and then it can be allowed to get the next ready.
As etcd-io/etcd#12257 mentioned:

  1. it can cause high fsync frequency. In the practice of TiKV, we observed the fsync frequency can reach the limit of hardware very easily, which can cause unstable latency and hurt performance. Also high fsync frequency is also expensive in the cloud.
  2. sync size is unpredictable, which can waste IO when syncing small data size.

This PR aims to support asynchronous ready.

First of all, this PR changes the ready workflow and divides it into two types, i.e. sync and async.

The expected sync ready workflow is

1. get ready
2. handle ready
    1. send msg(need not to wait for persisting entries)
    2. apply committed entries(need not to wait for persisting entries) or snapshot
    3. handle read_state, persist hard state and entries
3. if application applies committed entries synchronously, call `advance`, otherwise call `advance_append`
(call `advance_apply_to` after applying). These two functions return LightReady
4. handle LightReady
    1. send msg
    2. apply committed entries
    3. update commit index to hardstate if any
5. call `advance_apply` if applying synchronously
5. get next ready, back to step 1

The expected async ready workflow is

1. get ready
2. handle ready
    1. send msg(need not to wait for persisting entries)
    2. apply committed_entries(need not to wait for persisting entries) or snapshot
    3. handle read_state, persist hardstate and entries(need not to wait for success)
3. record the number of the ready and call `advance_append_async`
    * if this ready has been persisted synchronously, call `advance_append` instead 
      of `advance_append_async` to get LightReady and then handle it
4. get next ready, back to step 1

If a ready has been persisted, call `on_persist_ready` and pass the number.

Actually, not just for the async ready, this PR can also decrease the latency of the sync ready.

According to the previous manual, the application must apply committed entries after it has persisted hard state and entries. 
Think of this case:

1. leader gets a new proposal
2. get ready
    1. persist the entries and send append msg to followers
3. leader gets some append response and this proposal can be committed
4. get ready
    1. persist the hard state(for commit index)
    2. apply committed entries

We can find out that leader should wait for two persistences before really applying. 
The latter one is just for persisting a commit index.
Now step 4 can be 
1. apply committed entries
2. persist the hard state(for commit index)
So the latency can be shorter than before.

Note that if this machine crashes before persisting the hard state, the applied index will be greater than the commit index in hard state. 
We will discuss it next.

There are some subtle implementation details:

  1. The commit index can be smaller than applied index which is impossible in previous implementation. Although commit index persistence has one correctness dependency for conf change(Append Invariant raft: improve the availability related to member change etcd-io/etcd#7625 (comment)), the behavior here(i.e. make apply earlier) does not break it.
  2. The leader forwards matched of its progress when a log is proposed. In previous implementation, it's correct because we need to persist entries and then send msg to others. But now it's not correct so the matched of leader's progress can be forward after calling on_persist_ready.
  3. All of the committed entries must be persisted so persisted is maintained and it's used for fetching the committed entries(as the upper bound).
  4. Remove the ready_since and has_ready_since and use commit_since_index instead. It's also used for fetching the committed entries(as the lower bound).
  5. Since matched logic is changed and persisted is added, some tests are failed. The method I chose is to add a persist function in Interface and call it in Network::send and some other places.

Remaining Work

  • Update two examples
  • Add more tests for async ready
  • Add doc for async ready(I will do this in next PR)

src/raft.rs Outdated
let raft_state = store.initial_state()?;
let mut raft_state = store.initial_state()?;
// If commit index is smaller than applied index, forward it.
raft_state.hard_state.commit = cmp::max(raft_state.hard_state.commit, c.applied);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's appropriate to add the assumption. It can break consistency silently if users don't write implementation correctly. Users should guarantee only applying committed logs. Hence, they should ensure the invariant applied <= commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the hard state may be persisted after applying some entries, it can not guarantee that commit in hard state must greater than applied index. There is an example case in the description above.
If users really write something wrong, it's hard to help users find all these bugs because raft-rs must depend on some behaviors outside to guarantee correctness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the hard state may be persisted after applying some entries

If users try to do that, then they should be responsible to give a correct commit index. For example, recording the dependent commit index and commit term with the apply index.

...raft-rs must depend on some behaviors outside to guarantee correctness.

I don't think raft-rs depends on apply index > commit index.

Copy link
Member Author

@gengliqi gengliqi Oct 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users try to do that, then they should be responsible to give a correct commit index. For example, recording the dependent commit index and commit term with the apply index.

For async ready, commit index may be forwarded after calling on_persist_last_ready. Then the result has committed entries but no new hard state. It's redundant to put hard state in PersistLastReadyResult because the only change is commit variable.

I don't think raft-rs depends on apply index > commit index.

Yes. raft-rs does not depend on applied index > commit index.
But it depends on the fact that users must apply the committed entries, which must be satisfied.
If so, it's correct to forward the commit index to applied index during initialization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...It's redundant to put hard state in PersistLastReadyResult because the only change is commit variable.

It's not. Unless you move the commit out of hard state, otherwise it should be updated.

...it depends on the fact that users must apply the committed entries, which must be satisfied...

Indeed, so we have to check commit index >= apply index.

...If so, it's correct to forward the commit index to applied index during initialization.

The problem is only users know if it's so. You can't just assume everyone implement it in this way.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check commit-index >= apply-index in raft-rs while change the commit-index of hard-state in initial_state, which is implemented in TiKV.

src/raft_log.rs Outdated Show resolved Hide resolved
Signed-off-by: gengliqi <[email protected]>
src/raw_node.rs Outdated
///
/// Since Ready must be persisted in order, calling this function implicitly means
/// all readys collected before have been persisted.
pub fn on_persist_last_ready(&mut self) -> PersistLastReadyResult {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, the async ready convert the process into a pipeline manner with two steps. If there is always getting a new ready(means advancing last ready number), then it hardly can reach the point where the last ready is persisted.

Can we just return PersistLastReadyResult for every on_persist_ready() and remove on_persist_last_ready?

Copy link
Member Author

@gengliqi gengliqi Oct 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that these two functions are used for different purposes.
The on_persist_ready aims at batching the message and committed entries. Users can get a Ready after calling several on_persist_ready and step.
The on_persist_last_ready is a fast path for users. If they have already persisted the last Ready, they can get the PersistLastReadyResult without getting Ready again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistLastReadyResult seem like the same Ready do, can we make on_persist_last_ready to be replaced with:

self.on_persist_ready(self.max_number);
self.ready()

Copy link
Member Author

@gengliqi gengliqi Nov 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, but I think using PersistLastReadyResult instead of ready is better because I want to tell users that only committed entries and messages are needed to be handled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to tell users that only committed entries and messages are needed to be handled.

I not sure is it better because it forces the user to write some new logic to handle PersistLastReadyResult instead of reusing the exiting logic of handling Ready

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @Connor1996 and @NingLin-P that this extra method make logic complicated and not helpful as it seems.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine this case:

  1. The application calls several ready and advance_append_sync
  2. After calling the last ready's advance_append_sync, it finds it's time to call fsync
  3. If there is no on_persist_last_ready, it should call on_persist_ready and then call ready to get the next ready and the only meaningful things in Ready are committed_entries and messages.

In TiKV, we have lots of logic during handling Ready which is very wasteful to get a ready again and do the same logic like normal ready does just for applying committed entries and sending messages.

Signed-off-by: gengliqi <[email protected]>
@gengliqi gengliqi force-pushed the async-ready-v2 branch 3 times, most recently from 87a03d0 to 8caf845 Compare October 21, 2020 06:21
Signed-off-by: gengliqi <[email protected]>
src/raft_log.rs Outdated
let committed = self.committed;
if committed + 1 > offset {
match self.slice(offset, committed + 1, None) {
let high = cmp::min(persisted_idx, self.committed) + 1;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could persisted_idx be smaller than self.committed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If it is a follower, it may receive some committed entries and they are not persisted in local.

Signed-off-by: gengliqi <[email protected]>
/// messages that are ready to be applied or be sent to other peers.
#[derive(Default, Debug, PartialEq)]
pub struct PersistLastReadyResult {
commit_index: Option<u64>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it could be None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the fn commit_index annotation above.

/// It will be None state if there is no update.
/// It is not required to save it to stable storage.
#[inline]
pub fn commit_index(&self) -> Option<u64> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that we shall check whether there are committed entries by Option

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, it is not used to check whether there are committed entries, it is used to inform the application the commit index is changed.
Actually, it implicitly means that this peer is a leader if the commit index is changed after persisting the entries.

Signed-off-by: gengliqi <[email protected]>
src/raft_log.rs Outdated
}
}
}
fn test_restore_() {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove it. The new test is test_restore_snap at the end of this file.

let committed = self.committed;
if committed + 1 > offset {
match self.slice(offset, committed + 1, None) {
let high = cmp::min(self.committed, self.persisted) + 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case self.committed > self.persisted is not covered in this file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I merge the test_has_next_ents and test_next_ents and add more cases

Signed-off-by: gengliqi <[email protected]>
Signed-off-by: gengliqi <[email protected]>
self.commit_ready(rd);
}

/// Advance apply to the index of the last committed entries given before.
#[inline]
pub fn advance_apply(&mut self) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just keeping the original interface and implementations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, what's another function's name which uses commit_since_index for sync applying?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is just keep one function that accepts explicit index is enough. Application always knows what index has been applied.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function which uses commit_since_index is convenient for users. For the previous implementation, users don't need to record the applied index if the logs are applied synchronously. As our examples, this function is used after handling the LightReady.

src/raw_node.rs Outdated
light_rd
}

/// Advance append the ready value synchronously.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Advance append the ready value synchronously.
/// Advances the ready without applying committed entries. `advance_apply` should be used later to update applying progress.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appending synchronously does not mean applying synchronously.
Maybe the "advance_apply should be used later to update applying progress." can be removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment doesn't mention whether to apply synchronously, it just suggests the advance is split into two step, and apply is excluded from the first step.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

src/raw_node.rs Outdated Show resolved Hide resolved
src/raw_node.rs Outdated Show resolved Hide resolved
src/raw_node.rs Outdated Show resolved Hide resolved
src/raw_node.rs Outdated Show resolved Hide resolved
pub fn advance(&mut self, rd: Ready) -> LightReady {
let applied = self.commit_since_index;
let light_rd = self.advance_append(rd);
self.advance_apply_to(applied);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.advance_apply_to(applied);
self.advance_apply();

Looks like it is the same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not the same because the commit_since_index maybe updated in advance_append.

Comment on lines +401 to +405
if raft.state == StateRole::Leader && !raft.msgs.is_empty() {
// Leader can send messages immediately to make replication concurrently.
// For more details, check raft thesis 10.2.1.
rd.messages.push(mem::take(&mut raft.msgs));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move these lines to L480 in RawNode::ready to make the logic clear(near by how non-Leader peer's msgs is handling )? !raft.msgs.is_empty() should only be true there since we not allow calling step between RawNode::ready and RawNode::commit_ready

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can also be true after persisting some logs, which can trigger log commitment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jay is right. You can see the implementation of Raft::on_persist_entries for details.


if let Some(snapshot) = &raft.raft_log.unstable_snapshot() {
rd.snapshot = snapshot.clone();
assert!(self.commit_since_index <= rd.snapshot.get_metadata().index);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert!(self.commit_since_index <= rd.snapshot.get_metadata().index);
assert!(self.commit_since_index < rd.snapshot.get_metadata().index);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If request snapshot is used, commit_since_index can be larger than snapshot index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this design is a little bit strange? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design is for online recovery. Supposing some regions are corrupted while others in the same node are not, using request snapshot allows TiKV to fix those corrupted regions without bringing unavailability to others.

src/raw_node.rs Show resolved Hide resolved
src/raw_node.rs Show resolved Hide resolved
Comment on lines +401 to +405
if raft.state == StateRole::Leader && !raft.msgs.is_empty() {
// Leader can send messages immediately to make replication concurrently.
// For more details, check raft thesis 10.2.1.
rd.messages.push(mem::take(&mut raft.msgs));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can also be true after persisting some logs, which can trigger log commitment.


if let Some(snapshot) = &raft.raft_log.unstable_snapshot() {
rd.snapshot = snapshot.clone();
assert!(self.commit_since_index <= rd.snapshot.get_metadata().index);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If request snapshot is used, commit_since_index can be larger than snapshot index.

src/raw_node.rs Outdated Show resolved Hide resolved
Signed-off-by: gengliqi <[email protected]>
Signed-off-by: gengliqi <[email protected]>
Signed-off-by: gengliqi <[email protected]>
Copy link

@Little-Wallace Little-Wallace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants