Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: support fully control fsync frequency in raft #12257

Closed
BusyJay opened this issue Aug 25, 2020 · 8 comments · Fixed by etcd-io/raft#8
Closed

Proposal: support fully control fsync frequency in raft #12257

BusyJay opened this issue Aug 25, 2020 · 8 comments · Fixed by etcd-io/raft#8

Comments

@BusyJay
Copy link
Contributor

BusyJay commented Aug 25, 2020

Support fully control fsync frequency in raft

Summary

Let application decides when to sync logs and make leader aware of the safe index
to commit.

Motivation

The current way of driving raft is a loop of following process

1. get ready from raft
2. persist ready
3. advance ready in raft

Only one ready is allowed to be processed at a time, and the leader expects all pending logs in the ready are synced to disk before handling next one.

The mechanism is simple but has several problems:

  1. it can cause high fsync frequency. In the practice of TiKV, we observed the fsync frequency can reach the limit of hardware very easily, which can cause unstable latency and hurt performance. Also high fsync frequency is also expensive in the cloud.
  2. sync size is unpredictable, which can waste IO when syncing small data size.

So we need a way to control the fsync frequency without hurting correctness and also get maximum performance.

Detailed design

Since all nodes are communicating with messages, so application can stash all messages and send them out only after logs are synced. In this way, application is free to fetch and advance as many readiness as they want and sync whenever they want.

It works great for followers. However, for leaders, both Etcd and TiKV have follow the raft thesis 10.2.1 that sends out messages of leader before syncing to make leader and follower write in parallel. If leader batches up more than two readiness, it can commit an un-synced logs accidentally. For example, if leader has logs 3, 4 in the first ready, after advancing and receiving followers' ACK, it can broadcast commit to all followers in the second ready. This is because leader assumes logs are synced in the first ready. To fix the problem, we need to stop updating leader's progress until logs are synced to disk. So when leader calculates commit index only those really sync progress are considered.

It's possible quorum followers sync logs before leader, in such case, leader is also safe to consider the logs are committed, but it should not mark them ready to apply until logs are synced, otherwise it can corrupt the state between raft and application.

Note that batching changes to hard state doesn't affect the correctness. Changes to term and vote can only happen when the node is follower. So messages will not be sent out before changes are synced, which is same as before. Changes to commit can happen when the node is leader, however, commit index is not required to be store at all according to raft thesis 3.8.

To conclude, we need to make those changes to raft library:

  1. Don't update leader's progress when logs are appended,
  2. Add a method to allow application inform raft about the synced index, which should change its progress and trigger logs commitment.

All others are left to applications to guarantee choose different processing logic for different roles.

Drawbacks

It's a little complicated for applications to do the batch.

Alternatives

Pause handling raft ready until application is ready to sync data. This approach doesn't need to make any changes to raft library, but it can cause write pulse to disk. Our benches show that the approach doesn't perform as well as the proposed one.

@BusyJay
Copy link
Contributor Author

BusyJay commented Sep 11, 2020

@xiang90 @gyuho any opinions?

@stale
Copy link

stale bot commented Dec 10, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed after 21 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Dec 10, 2020
@BusyJay
Copy link
Contributor Author

BusyJay commented Dec 21, 2020

FYI, TiKV implemented the proposal as tikv/raft-rs#403 and tikv/tikv#8855. Our experiment showed that it had 10% improvement in throughput and less jitters.

@stale
Copy link

stale bot commented Mar 21, 2021

This issue has been automatically marked as stale because it has not had recent activity. It will be closed after 21 days if no further activity occurs. Thank you for your contributions.

@ahrtr
Copy link
Member

ahrtr commented Nov 12, 2021

It looks like a useful feature, would like to get more feedback.

@gengliqi
Copy link

gengliqi commented Jan 29, 2022

This optimization has landed in TiKV through tikv/tikv#10289.
The most important benefit is not just to control fsync frequency but to reduce the commit duration of raft log.

FYI, I participated KubeCon NA 2021 before and talked about this optimization in detail.
PPT: https://docs.google.com/presentation/d/1Jsha70eEGUOWQOGR5VtDkplGMZYMBMmVh67At6o_4lk/edit?usp=sharing
Video: https://www.youtube.com/watch?v=3BefLqw-4Go

@stale
Copy link

stale bot commented Apr 30, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed after 21 days if no further activity occurs. Thank you for your contributions.

@stale
Copy link

stale bot commented Oct 15, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed after 21 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Oct 15, 2022
@ahrtr ahrtr added stage/tracked and removed stale labels Oct 15, 2022
nvanbenschoten added a commit to nvanbenschoten/etcd that referenced this issue Oct 26, 2022
Fixes etcd-io#12257.

This change adds opt-in support to raft to perform local storage writes
asynchronously from the raft state machine handling loop.

A new AsyncStorageWrites configuration instructs the raft node to write to its
local storage (raft log and state machine) using a request/response message
passing interface instead of the default `Ready`/`Advance` function call
interface. Local storage messages can be pipelined and processed asynchronously
(with respect to `Ready` iteration), facilitating reduced interference between
Raft proposals and increased batching of log appends and state machine
application. As a result, use of asynchronous storage writes can reduce
end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new
`MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a
`LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same
target must be reliably processed in order. In other words, they can't be
dropped (like messages over the network) and those targeted at the same thread
can't be reordered. Messages to different targets can be processed in any order.

`MsgStorageAppend` carries Raft log entries to append, election votes to persist,
and snapshots to apply. All writes performed in response to a `MsgStorageAppend`
are expected to be durable. The message assumes the role of the Entries,
HardState, and Snapshot fields in Ready.

`MsgStorageApply` carries committed entries to apply. The message assumes
the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes. The storage
threads are not responsible for understanding the response messages, only
for delivering them to the correct target after performing the storage
write.

\## Design Considerations

- There must be no regression for existing users that do not enable `AsyncStorageWrites`.
  For instance, CommittedEntries must not wait on unstable entries to be stabilized in
  cases where a follower is given committed entries in a MsgApp.
- Asynchronous storage work should use a message passing interface, like the
  rest of this library.
- The Raft leader and followers should behave symmetrically. Both should be able
  to use asynchronous storage writes for log appends and entry application.
- The LocalAppendThread on a follower should be able to send MsgAppResp messages
  directly to the leader without passing back through the raft state machine
  handling loop.
- The `unstable` log should remain true to its name. It should hold entries
  until they are stable and should not rely on an intermediate reliable cache.
- Psuedo-targets should be assigned to messages that target the local storage
  systems to denote required ordering guarantees.
- Code should be maximally unified across `AsyncStorageWrites=false` and
  `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of
  `AsyncStorageWrites=true` where the library hides the possibility of asynchrony.
- It should be possible to apply snapshots asynchronously, even though a
  snapshot touches both the Raft log state and the state machine. The library
  should make this easy for users to handle by delaying all committed entries
  until after the snapshot has applied, so snapshot application can be handled
  by 1) flushing the apply thread, 2) send the `MsgStorageAppend` that contains
  a snapshot to the `LocalAppendThread` to be applied.

\## Usage

When asynchronous storage writes is enabled, the responsibility of code using
the library is different from what is presented in raft/doc.go (which has been
updated to include a section about async storage writes). Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 in doc.go).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order.Messages to different targets can be processed in
any order. Each local storage message carries a slice of response messages that
must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

```go
for {
	select {
	case <-s.Ticker:
	n.Tick()
	case rd := <-s.Node.Ready():
	for _, m := range rd.Messages {
		switch m.To {
		case raft.LocalAppendThread:
			toApply <- m
		case raft.LocalApplyThread:
			toAppend <-m
		default:
			sendOverNetwork(m)
		}
	}
	case <-s.done:
	return
	}
}
```

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

```go
// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}
```

\## Compatibility

The library remains backwards compatible with existing users and the change does
not introduce any breaking changes. Users that do not set `AsyncStorageWrites`
to true in the `Config` struct will not notice a difference with this change.
This is despite the fact that the existing "synchronous storage writes"
interface was adapted to share a majority of the same code. For instance,
`Node.Advance` has been adapted to transparently acknowledge an asynchronous log
append attempt and an asynchronous state machine application attempt, internally
using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to
a process and nodes using asynchronous storage writes appear to behave no
differently from the outside. Clusters are free to mix nodes running with and
without asynchronous storage writes.

\## Performance

The bulk of the performance evaluation of this functionality thus far has been
done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking
harness developed to experiment with Raft proposal pipeline optimization. The
harness can be used to run single-node benchmarks or multi-node benchmarks. It
supports plugable raft logs, storage engines, network transports, and pipeline
implementations.

To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine
(`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3)
a pipeline implementation on top of the new asynchronous storage writes
functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:
- **basic** (P1): baseline stock raft usage, similar to the code in `doc.go`
- **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes
  two significant variations to the basic pipeline. The first is that it sends
  MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69)
  for explanation), allowing log appends to occur in parallel across replicas.
  The second is that it acknowledges committed log entries before applying then
  (see [commit](cockroachdb/cockroach@87aaea7)
  for explanation).
- **async append + async apply + early ack** (P3): A pipelining using asynchronous storage
  writes with a separate append thread and a separate apply thread. Also uses the same
  early acknowledgement optimization from above to ack committed entries before handing
  them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with
gp3 EBS volumes (16000 IOPS, 1GB/s throughput).

![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg)

The comparison demonstrates two different benefits of asynchronous storage
writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For
instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms,
P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a
reduction in average latency of 28% from the optimized pipeline that does not
use asynchronous storage writes. This matches expectations outlined in
cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. This is
because asynchronous storage writes can improve batching for both log appends
and log application. In this experiment, we saw the average append batch size
under saturation increase from 928 to 1542, which is a similar ratio to the
increase in peak throughput. We see a similar different for apply batch sizes.

There is more benchmark to do. For instance, we'll need to thoroughly verify
that this change does not negatively impact the performance of users of this
library that do not use asynchronous storage writes.

Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten added a commit to nvanbenschoten/etcd that referenced this issue Oct 27, 2022
Fixes etcd-io#12257.

This change adds opt-in support to raft to perform local storage writes
asynchronously from the raft state machine handling loop.

A new AsyncStorageWrites configuration instructs the raft node to write to its
local storage (raft log and state machine) using a request/response message
passing interface instead of the default `Ready`/`Advance` function call
interface. Local storage messages can be pipelined and processed asynchronously
(with respect to `Ready` iteration), facilitating reduced interference between
Raft proposals and increased batching of log appends and state machine
application. As a result, use of asynchronous storage writes can reduce
end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new
`MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a
`LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same
target must be reliably processed in order. In other words, they can't be
dropped (like messages over the network) and those targeted at the same thread
can't be reordered. Messages to different targets can be processed in any order.

`MsgStorageAppend` carries Raft log entries to append, election votes to persist,
and snapshots to apply. All writes performed in response to a `MsgStorageAppend`
are expected to be durable. The message assumes the role of the Entries,
HardState, and Snapshot fields in Ready.

`MsgStorageApply` carries committed entries to apply. The message assumes
the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes. The storage
threads are not responsible for understanding the response messages, only
for delivering them to the correct target after performing the storage
write.

\## Design Considerations

- There must be no regression for existing users that do not enable `AsyncStorageWrites`.
  For instance, CommittedEntries must not wait on unstable entries to be stabilized in
  cases where a follower is given committed entries in a MsgApp.
- Asynchronous storage work should use a message passing interface, like the
  rest of this library.
- The Raft leader and followers should behave symmetrically. Both should be able
  to use asynchronous storage writes for log appends and entry application.
- The LocalAppendThread on a follower should be able to send MsgAppResp messages
  directly to the leader without passing back through the raft state machine
  handling loop.
- The `unstable` log should remain true to its name. It should hold entries
  until they are stable and should not rely on an intermediate reliable cache.
- Pseudo-targets should be assigned to messages that target the local storage
  systems to denote required ordering guarantees.
- Code should be maximally unified across `AsyncStorageWrites=false` and
  `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of
  `AsyncStorageWrites=true` where the library hides the possibility of asynchrony.
- It should be possible to apply snapshots asynchronously, even though a
  snapshot touches both the Raft log state and the state machine. The library
  should make this easy for users to handle by delaying all committed entries
  until after the snapshot has applied, so snapshot application can be handled
  by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains
  a snapshot to the `LocalAppendThread` to be applied.

\## Usage

When asynchronous storage writes is enabled, the responsibility of code using
the library is different from what is presented in raft/doc.go (which has been
updated to include a section about async storage writes). Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 in doc.go).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order. Each local storage message carries a slice of response messages that
must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

```go
for {
	select {
	case <-s.Ticker:
	n.Tick()
	case rd := <-s.Node.Ready():
	for _, m := range rd.Messages {
		switch m.To {
		case raft.LocalAppendThread:
			toAppend <- m
		case raft.LocalApplyThread:
			toApply <-m
		default:
			sendOverNetwork(m)
		}
	}
	case <-s.done:
	return
	}
}
```

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

```go
// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}
```

\## Compatibility

The library remains backwards compatible with existing users and the change does
not introduce any breaking changes. Users that do not set `AsyncStorageWrites`
to true in the `Config` struct will not notice a difference with this change.
This is despite the fact that the existing "synchronous storage writes"
interface was adapted to share a majority of the same code. For instance,
`Node.Advance` has been adapted to transparently acknowledge an asynchronous log
append attempt and an asynchronous state machine application attempt, internally
using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to
a process and nodes using asynchronous storage writes appear to behave no
differently from the outside. Clusters are free to mix nodes running with and
without asynchronous storage writes.

\## Performance

The bulk of the performance evaluation of this functionality thus far has been
done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking
harness developed to experiment with Raft proposal pipeline optimization. The
harness can be used to run single-node benchmarks or multi-node benchmarks. It
supports plugable raft logs, storage engines, network transports, and pipeline
implementations.

To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine
(`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3)
a pipeline implementation on top of the new asynchronous storage writes
functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:
- **basic** (P1): baseline stock raft usage, similar to the code in `doc.go`
- **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes
  two significant variations to the basic pipeline. The first is that it sends
  MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69)
  for explanation), allowing log appends to occur in parallel across replicas.
  The second is that it acknowledges committed log entries before applying them
  (see [commit](cockroachdb/cockroach@87aaea7)
  for explanation).
- **async append + async apply + early ack** (P3): A pipelining using asynchronous storage
  writes with a separate append thread and a separate apply thread. Also uses the same
  early acknowledgement optimization from above to ack committed entries before handing
  them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with
gp3 EBS volumes (16000 IOPS, 1GB/s throughput).

![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg)

The comparison demonstrates two different benefits of asynchronous storage
writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For
instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms,
P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a
reduction in average latency of 28% from the optimized pipeline that does not
use asynchronous storage writes. This matches expectations outlined in
cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. This is
because asynchronous storage writes can improve batching for both log appends
and log application. In this experiment, we saw the average append batch size
under saturation increase from 928 to 1542, which is a similar ratio to the
increase in peak throughput. We see a similar difference for apply batch sizes.

There is more benchmarking to do. For instance, we'll need to thoroughly verify
that this change does not negatively impact the performance of users of this
library that do not use asynchronous storage writes.

Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten added a commit to nvanbenschoten/etcd that referenced this issue Oct 27, 2022
Fixes etcd-io#12257.

This change adds opt-in support to raft to perform local storage writes
asynchronously from the raft state machine handling loop.

A new AsyncStorageWrites configuration instructs the raft node to write to its
local storage (raft log and state machine) using a request/response message
passing interface instead of the default `Ready`/`Advance` function call
interface. Local storage messages can be pipelined and processed asynchronously
(with respect to `Ready` iteration), facilitating reduced interference between
Raft proposals and increased batching of log appends and state machine
application. As a result, use of asynchronous storage writes can reduce
end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new
`MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a
`LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same
target must be reliably processed in order. In other words, they can't be
dropped (like messages over the network) and those targeted at the same thread
can't be reordered. Messages to different targets can be processed in any order.

`MsgStorageAppend` carries Raft log entries to append, election votes to persist,
and snapshots to apply. All writes performed in response to a `MsgStorageAppend`
are expected to be durable. The message assumes the role of the Entries,
HardState, and Snapshot fields in Ready.

`MsgStorageApply` carries committed entries to apply. The message assumes
the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes. The storage
threads are not responsible for understanding the response messages, only
for delivering them to the correct target after performing the storage
write.

\## Design Considerations

- There must be no regression for existing users that do not enable `AsyncStorageWrites`.
  For instance, CommittedEntries must not wait on unstable entries to be stabilized in
  cases where a follower is given committed entries in a MsgApp.
- Asynchronous storage work should use a message passing interface, like the
  rest of this library.
- The Raft leader and followers should behave symmetrically. Both should be able
  to use asynchronous storage writes for log appends and entry application.
- The LocalAppendThread on a follower should be able to send MsgAppResp messages
  directly to the leader without passing back through the raft state machine
  handling loop.
- The `unstable` log should remain true to its name. It should hold entries
  until they are stable and should not rely on an intermediate reliable cache.
- Pseudo-targets should be assigned to messages that target the local storage
  systems to denote required ordering guarantees.
- Code should be maximally unified across `AsyncStorageWrites=false` and
  `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of
  `AsyncStorageWrites=true` where the library hides the possibility of asynchrony.
- It should be possible to apply snapshots asynchronously, even though a
  snapshot touches both the Raft log state and the state machine. The library
  should make this easy for users to handle by delaying all committed entries
  until after the snapshot has applied, so snapshot application can be handled
  by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains
  a snapshot to the `LocalAppendThread` to be applied.

\## Usage

When asynchronous storage writes is enabled, the responsibility of code using
the library is different from what is presented in raft/doc.go (which has been
updated to include a section about async storage writes). Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 in doc.go).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order. Each local storage message carries a slice of response messages that
must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

```go
for {
	select {
	case <-s.Ticker:
		n.Tick()
	case rd := <-s.Node.Ready():
		for _, m := range rd.Messages {
			switch m.To {
			case raft.LocalAppendThread:
				toAppend <- m
			case raft.LocalApplyThread:
				toApply <-m
			default:
				sendOverNetwork(m)
			}
		}
	case <-s.done:
		return
	}
}
```

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

```go
// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}
```

\## Compatibility

The library remains backwards compatible with existing users and the change does
not introduce any breaking changes. Users that do not set `AsyncStorageWrites`
to true in the `Config` struct will not notice a difference with this change.
This is despite the fact that the existing "synchronous storage writes"
interface was adapted to share a majority of the same code. For instance,
`Node.Advance` has been adapted to transparently acknowledge an asynchronous log
append attempt and an asynchronous state machine application attempt, internally
using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to
a process and nodes using asynchronous storage writes appear to behave no
differently from the outside. Clusters are free to mix nodes running with and
without asynchronous storage writes.

\## Performance

The bulk of the performance evaluation of this functionality thus far has been
done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking
harness developed to experiment with Raft proposal pipeline optimization. The
harness can be used to run single-node benchmarks or multi-node benchmarks. It
supports plugable raft logs, storage engines, network transports, and pipeline
implementations.

To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine
(`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3)
a pipeline implementation on top of the new asynchronous storage writes
functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:
- **basic** (P1): baseline stock raft usage, similar to the code in `doc.go`
- **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes
  two significant variations to the basic pipeline. The first is that it sends
  MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69)
  for explanation), allowing log appends to occur in parallel across replicas.
  The second is that it acknowledges committed log entries before applying them
  (see [commit](cockroachdb/cockroach@87aaea7)
  for explanation).
- **async append + async apply + early ack** (P3): A pipelining using asynchronous storage
  writes with a separate append thread and a separate apply thread. Also uses the same
  early acknowledgement optimization from above to ack committed entries before handing
  them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with
gp3 EBS volumes (16000 IOPS, 1GB/s throughput).

![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg)

The comparison demonstrates two different benefits of asynchronous storage
writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For
instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms,
P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a
reduction in average latency of 28% from the optimized pipeline that does not
use asynchronous storage writes. This matches expectations outlined in
cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. This is
because asynchronous storage writes can improve batching for both log appends
and log application. In this experiment, we saw the average append batch size
under saturation increase from 928 to 1542, which is a similar ratio to the
increase in peak throughput. We see a similar difference for apply batch sizes.

There is more benchmarking to do. For instance, we'll need to thoroughly verify
that this change does not negatively impact the performance of users of this
library that do not use asynchronous storage writes.

Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten added a commit to nvanbenschoten/etcd that referenced this issue Oct 27, 2022
Fixes etcd-io#12257.

This change adds opt-in support to raft to perform local storage writes
asynchronously from the raft state machine handling loop.

A new AsyncStorageWrites configuration instructs the raft node to write to its
local storage (raft log and state machine) using a request/response message
passing interface instead of the default `Ready`/`Advance` function call
interface. Local storage messages can be pipelined and processed asynchronously
(with respect to `Ready` iteration), facilitating reduced interference between
Raft proposals and increased batching of log appends and state machine
application. As a result, use of asynchronous storage writes can reduce
end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new
`MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a
`LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same
target must be reliably processed in order. In other words, they can't be
dropped (like messages over the network) and those targeted at the same thread
can't be reordered. Messages to different targets can be processed in any order.

`MsgStorageAppend` carries Raft log entries to append, election votes to persist,
and snapshots to apply. All writes performed in response to a `MsgStorageAppend`
are expected to be durable. The message assumes the role of the Entries,
HardState, and Snapshot fields in Ready.

`MsgStorageApply` carries committed entries to apply. The message assumes
the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes. The storage
threads are not responsible for understanding the response messages, only
for delivering them to the correct target after performing the storage
write.

\## Design Considerations

- There must be no regression for existing users that do not enable `AsyncStorageWrites`.
  For instance, CommittedEntries must not wait on unstable entries to be stabilized in
  cases where a follower is given committed entries in a MsgApp.
- Asynchronous storage work should use a message passing interface, like the
  rest of this library.
- The Raft leader and followers should behave symmetrically. Both should be able
  to use asynchronous storage writes for log appends and entry application.
- The LocalAppendThread on a follower should be able to send MsgAppResp messages
  directly to the leader without passing back through the raft state machine
  handling loop.
- The `unstable` log should remain true to its name. It should hold entries
  until they are stable and should not rely on an intermediate reliable cache.
- Pseudo-targets should be assigned to messages that target the local storage
  systems to denote required ordering guarantees.
- Code should be maximally unified across `AsyncStorageWrites=false` and
  `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of
  `AsyncStorageWrites=true` where the library hides the possibility of asynchrony.
- It should be possible to apply snapshots asynchronously, even though a
  snapshot touches both the Raft log state and the state machine. The library
  should make this easy for users to handle by delaying all committed entries
  until after the snapshot has applied, so snapshot application can be handled
  by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains
  a snapshot to the `LocalAppendThread` to be applied.

\## Usage

When asynchronous storage writes is enabled, the responsibility of code using
the library is different from what is presented in raft/doc.go (which has been
updated to include a section about async storage writes). Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 in doc.go).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order. Each local storage message carries a slice of response messages that
must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

```go
for {
	select {
	case <-s.Ticker:
		n.Tick()
	case rd := <-s.Node.Ready():
		for _, m := range rd.Messages {
			switch m.To {
			case raft.LocalAppendThread:
				toAppend <- m
			case raft.LocalApplyThread:
				toApply <-m
			default:
				sendOverNetwork(m)
			}
		}
	case <-s.done:
		return
	}
}
```

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

```go
// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}
```

\## Compatibility

The library remains backwards compatible with existing users and the change does
not introduce any breaking changes. Users that do not set `AsyncStorageWrites`
to true in the `Config` struct will not notice a difference with this change.
This is despite the fact that the existing "synchronous storage writes"
interface was adapted to share a majority of the same code. For instance,
`Node.Advance` has been adapted to transparently acknowledge an asynchronous log
append attempt and an asynchronous state machine application attempt, internally
using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to
a process and nodes using asynchronous storage writes appear to behave no
differently from the outside. Clusters are free to mix nodes running with and
without asynchronous storage writes.

\## Performance

The bulk of the performance evaluation of this functionality thus far has been
done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking
harness developed to experiment with Raft proposal pipeline optimization. The
harness can be used to run single-node benchmarks or multi-node benchmarks. It
supports plugable raft logs, storage engines, network transports, and pipeline
implementations.

To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine
(`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3)
a pipeline implementation on top of the new asynchronous storage writes
functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:
- **basic** (P1): baseline stock raft usage, similar to the code in `doc.go`
- **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes
  two significant variations to the basic pipeline. The first is that it sends
  MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69)
  for explanation), allowing log appends to occur in parallel across replicas.
  The second is that it acknowledges committed log entries before applying them
  (see [commit](cockroachdb/cockroach@87aaea7)
  for explanation).
- **async append + async apply + early ack** (P3): A pipelining using asynchronous storage
  writes with a separate append thread and a separate apply thread. Also uses the same
  early acknowledgement optimization from above to ack committed entries before handing
  them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with
gp3 EBS volumes (16000 IOPS, 1GB/s throughput).

![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg)

The comparison demonstrates two different benefits of asynchronous storage
writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For
instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms,
P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a
reduction in average latency of 28% from the optimized pipeline that does not
use asynchronous storage writes. This matches expectations outlined in
cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. This is
because asynchronous storage writes can improve batching for both log appends
and log application. In this experiment, we saw the average append batch size
under saturation increase from 928 to 1542, which is a similar ratio to the
increase in peak throughput. We see a similar difference for apply batch sizes.

There is more benchmarking to do. For instance, we'll need to thoroughly verify
that this change does not negatively impact the performance of users of this
library that do not use asynchronous storage writes.

Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten added a commit to nvanbenschoten/etcd that referenced this issue Nov 2, 2022
Fixes etcd-io#12257.

This change adds opt-in support to raft to perform local storage writes
asynchronously from the raft state machine handling loop.

A new AsyncStorageWrites configuration instructs the raft node to write to its
local storage (raft log and state machine) using a request/response message
passing interface instead of the default `Ready`/`Advance` function call
interface. Local storage messages can be pipelined and processed asynchronously
(with respect to `Ready` iteration), facilitating reduced interference between
Raft proposals and increased batching of log appends and state machine
application. As a result, use of asynchronous storage writes can reduce
end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new
`MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a
`LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same
target must be reliably processed in order. In other words, they can't be
dropped (like messages over the network) and those targeted at the same thread
can't be reordered. Messages to different targets can be processed in any order.

`MsgStorageAppend` carries Raft log entries to append, election votes to persist,
and snapshots to apply. All writes performed in response to a `MsgStorageAppend`
are expected to be durable. The message assumes the role of the Entries,
HardState, and Snapshot fields in Ready.

`MsgStorageApply` carries committed entries to apply. The message assumes
the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes. The storage
threads are not responsible for understanding the response messages, only
for delivering them to the correct target after performing the storage
write.

\## Design Considerations

- There must be no regression for existing users that do not enable `AsyncStorageWrites`.
  For instance, CommittedEntries must not wait on unstable entries to be stabilized in
  cases where a follower is given committed entries in a MsgApp.
- Asynchronous storage work should use a message passing interface, like the
  rest of this library.
- The Raft leader and followers should behave symmetrically. Both should be able
  to use asynchronous storage writes for log appends and entry application.
- The LocalAppendThread on a follower should be able to send MsgAppResp messages
  directly to the leader without passing back through the raft state machine
  handling loop.
- The `unstable` log should remain true to its name. It should hold entries
  until they are stable and should not rely on an intermediate reliable cache.
- Pseudo-targets should be assigned to messages that target the local storage
  systems to denote required ordering guarantees.
- Code should be maximally unified across `AsyncStorageWrites=false` and
  `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of
  `AsyncStorageWrites=true` where the library hides the possibility of asynchrony.
- It should be possible to apply snapshots asynchronously, even though a
  snapshot touches both the Raft log state and the state machine. The library
  should make this easy for users to handle by delaying all committed entries
  until after the snapshot has applied, so snapshot application can be handled
  by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains
  a snapshot to the `LocalAppendThread` to be applied.

\## Usage

When asynchronous storage writes is enabled, the responsibility of code using
the library is different from what is presented in raft/doc.go (which has been
updated to include a section about async storage writes). Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 in doc.go).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order. Each local storage message carries a slice of response messages that
must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

```go
for {
	select {
	case <-s.Ticker:
		n.Tick()
	case rd := <-s.Node.Ready():
		for _, m := range rd.Messages {
			switch m.To {
			case raft.LocalAppendThread:
				toAppend <- m
			case raft.LocalApplyThread:
				toApply <-m
			default:
				sendOverNetwork(m)
			}
		}
	case <-s.done:
		return
	}
}
```

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

```go
// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}
```

\## Compatibility

The library remains backwards compatible with existing users and the change does
not introduce any breaking changes. Users that do not set `AsyncStorageWrites`
to true in the `Config` struct will not notice a difference with this change.
This is despite the fact that the existing "synchronous storage writes"
interface was adapted to share a majority of the same code. For instance,
`Node.Advance` has been adapted to transparently acknowledge an asynchronous log
append attempt and an asynchronous state machine application attempt, internally
using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to
a process and nodes using asynchronous storage writes appear to behave no
differently from the outside. Clusters are free to mix nodes running with and
without asynchronous storage writes.

\## Performance

The bulk of the performance evaluation of this functionality thus far has been
done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking
harness developed to experiment with Raft proposal pipeline optimization. The
harness can be used to run single-node benchmarks or multi-node benchmarks. It
supports plugable raft logs, storage engines, network transports, and pipeline
implementations.

To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine
(`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3)
a pipeline implementation on top of the new asynchronous storage writes
functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:
- **basic** (P1): baseline stock raft usage, similar to the code in `doc.go`
- **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes
  two significant variations to the basic pipeline. The first is that it sends
  MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69)
  for explanation), allowing log appends to occur in parallel across replicas.
  The second is that it acknowledges committed log entries before applying them
  (see [commit](cockroachdb/cockroach@87aaea7)
  for explanation).
- **async append + async apply + early ack** (P3): A pipelining using asynchronous storage
  writes with a separate append thread and a separate apply thread. Also uses the same
  early acknowledgement optimization from above to ack committed entries before handing
  them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with
gp3 EBS volumes (16000 IOPS, 1GB/s throughput).

![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg)

The comparison demonstrates two different benefits of asynchronous storage
writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For
instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms,
P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a
reduction in average latency of 28% from the optimized pipeline that does not
use asynchronous storage writes. This matches expectations outlined in
cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. This is
because asynchronous storage writes can improve batching for both log appends
and log application. In this experiment, we saw the average append batch size
under saturation increase from 928 to 1542, which is a similar ratio to the
increase in peak throughput. We see a similar difference for apply batch sizes.

There is more benchmarking to do. For instance, we'll need to thoroughly verify
that this change does not negatively impact the performance of users of this
library that do not use asynchronous storage writes.

Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten added a commit to nvanbenschoten/etcd that referenced this issue Nov 2, 2022
Fixes etcd-io#12257.

This change adds opt-in support to raft to perform local storage writes
asynchronously from the raft state machine handling loop.

A new AsyncStorageWrites configuration instructs the raft node to write to its
local storage (raft log and state machine) using a request/response message
passing interface instead of the default `Ready`/`Advance` function call
interface. Local storage messages can be pipelined and processed asynchronously
(with respect to `Ready` iteration), facilitating reduced interference between
Raft proposals and increased batching of log appends and state machine
application. As a result, use of asynchronous storage writes can reduce
end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new
`MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a
`LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same
target must be reliably processed in order. In other words, they can't be
dropped (like messages over the network) and those targeted at the same thread
can't be reordered. Messages to different targets can be processed in any order.

`MsgStorageAppend` carries Raft log entries to append, election votes to persist,
and snapshots to apply. All writes performed in response to a `MsgStorageAppend`
are expected to be durable. The message assumes the role of the Entries,
HardState, and Snapshot fields in Ready.

`MsgStorageApply` carries committed entries to apply. The message assumes
the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes. The storage
threads are not responsible for understanding the response messages, only
for delivering them to the correct target after performing the storage
write.

\## Design Considerations

- There must be no regression for existing users that do not enable `AsyncStorageWrites`.
  For instance, CommittedEntries must not wait on unstable entries to be stabilized in
  cases where a follower is given committed entries in a MsgApp.
- Asynchronous storage work should use a message passing interface, like the
  rest of this library.
- The Raft leader and followers should behave symmetrically. Both should be able
  to use asynchronous storage writes for log appends and entry application.
- The LocalAppendThread on a follower should be able to send MsgAppResp messages
  directly to the leader without passing back through the raft state machine
  handling loop.
- The `unstable` log should remain true to its name. It should hold entries
  until they are stable and should not rely on an intermediate reliable cache.
- Pseudo-targets should be assigned to messages that target the local storage
  systems to denote required ordering guarantees.
- Code should be maximally unified across `AsyncStorageWrites=false` and
  `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of
  `AsyncStorageWrites=true` where the library hides the possibility of asynchrony.
- It should be possible to apply snapshots asynchronously, even though a
  snapshot touches both the Raft log state and the state machine. The library
  should make this easy for users to handle by delaying all committed entries
  until after the snapshot has applied, so snapshot application can be handled
  by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains
  a snapshot to the `LocalAppendThread` to be applied.

\## Usage

When asynchronous storage writes is enabled, the responsibility of code using
the library is different from what is presented in raft/doc.go (which has been
updated to include a section about async storage writes). Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 in doc.go).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order. Each local storage message carries a slice of response messages that
must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

```go
for {
	select {
	case <-s.Ticker:
		n.Tick()
	case rd := <-s.Node.Ready():
		for _, m := range rd.Messages {
			switch m.To {
			case raft.LocalAppendThread:
				toAppend <- m
			case raft.LocalApplyThread:
				toApply <-m
			default:
				sendOverNetwork(m)
			}
		}
	case <-s.done:
		return
	}
}
```

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

```go
// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}
```

\## Compatibility

The library remains backwards compatible with existing users and the change does
not introduce any breaking changes. Users that do not set `AsyncStorageWrites`
to true in the `Config` struct will not notice a difference with this change.
This is despite the fact that the existing "synchronous storage writes"
interface was adapted to share a majority of the same code. For instance,
`Node.Advance` has been adapted to transparently acknowledge an asynchronous log
append attempt and an asynchronous state machine application attempt, internally
using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to
a process and nodes using asynchronous storage writes appear to behave no
differently from the outside. Clusters are free to mix nodes running with and
without asynchronous storage writes.

\## Performance

The bulk of the performance evaluation of this functionality thus far has been
done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking
harness developed to experiment with Raft proposal pipeline optimization. The
harness can be used to run single-node benchmarks or multi-node benchmarks. It
supports plugable raft logs, storage engines, network transports, and pipeline
implementations.

To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine
(`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3)
a pipeline implementation on top of the new asynchronous storage writes
functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:
- **basic** (P1): baseline stock raft usage, similar to the code in `doc.go`
- **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes
  two significant variations to the basic pipeline. The first is that it sends
  MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69)
  for explanation), allowing log appends to occur in parallel across replicas.
  The second is that it acknowledges committed log entries before applying them
  (see [commit](cockroachdb/cockroach@87aaea7)
  for explanation).
- **async append + async apply + early ack** (P3): A pipelining using asynchronous storage
  writes with a separate append thread and a separate apply thread. Also uses the same
  early acknowledgement optimization from above to ack committed entries before handing
  them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with
gp3 EBS volumes (16000 IOPS, 1GB/s throughput).

![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg)

The comparison demonstrates two different benefits of asynchronous storage
writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For
instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms,
P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a
reduction in average latency of 28% from the optimized pipeline that does not
use asynchronous storage writes. This matches expectations outlined in
cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. This is
because asynchronous storage writes can improve batching for both log appends
and log application. In this experiment, we saw the average append batch size
under saturation increase from 928 to 1542, which is a similar ratio to the
increase in peak throughput. We see a similar difference for apply batch sizes.

There is more benchmarking to do. For instance, we'll need to thoroughly verify
that this change does not negatively impact the performance of users of this
library that do not use asynchronous storage writes.

Signed-off-by: Nathan VanBenschoten <[email protected]>
nvanbenschoten added a commit to nvanbenschoten/etcd that referenced this issue Dec 12, 2022
Fixes etcd-io#12257.

This change adds opt-in support to raft to perform local storage writes
asynchronously from the raft state machine handling loop.

A new AsyncStorageWrites configuration instructs the raft node to write to its
local storage (raft log and state machine) using a request/response message
passing interface instead of the default `Ready`/`Advance` function call
interface. Local storage messages can be pipelined and processed asynchronously
(with respect to `Ready` iteration), facilitating reduced interference between
Raft proposals and increased batching of log appends and state machine
application. As a result, use of asynchronous storage writes can reduce
end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new
`MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a
`LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same
target must be reliably processed in order. In other words, they can't be
dropped (like messages over the network) and those targeted at the same thread
can't be reordered. Messages to different targets can be processed in any order.

`MsgStorageAppend` carries Raft log entries to append, election votes to persist,
and snapshots to apply. All writes performed in response to a `MsgStorageAppend`
are expected to be durable. The message assumes the role of the Entries,
HardState, and Snapshot fields in Ready.

`MsgStorageApply` carries committed entries to apply. The message assumes
the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes. The storage
threads are not responsible for understanding the response messages, only
for delivering them to the correct target after performing the storage
write.

\## Design Considerations

- There must be no regression for existing users that do not enable `AsyncStorageWrites`.
  For instance, CommittedEntries must not wait on unstable entries to be stabilized in
  cases where a follower is given committed entries in a MsgApp.
- Asynchronous storage work should use a message passing interface, like the
  rest of this library.
- The Raft leader and followers should behave symmetrically. Both should be able
  to use asynchronous storage writes for log appends and entry application.
- The LocalAppendThread on a follower should be able to send MsgAppResp messages
  directly to the leader without passing back through the raft state machine
  handling loop.
- The `unstable` log should remain true to its name. It should hold entries
  until they are stable and should not rely on an intermediate reliable cache.
- Pseudo-targets should be assigned to messages that target the local storage
  systems to denote required ordering guarantees.
- Code should be maximally unified across `AsyncStorageWrites=false` and
  `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of
  `AsyncStorageWrites=true` where the library hides the possibility of asynchrony.
- It should be possible to apply snapshots asynchronously, even though a
  snapshot touches both the Raft log state and the state machine. The library
  should make this easy for users to handle by delaying all committed entries
  until after the snapshot has applied, so snapshot application can be handled
  by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains
  a snapshot to the `LocalAppendThread` to be applied.

\## Usage

When asynchronous storage writes is enabled, the responsibility of code using
the library is different from what is presented in raft/doc.go (which has been
updated to include a section about async storage writes). Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 in doc.go).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order. Each local storage message carries a slice of response messages that
must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

```go
for {
	select {
	case <-s.Ticker:
		n.Tick()
	case rd := <-s.Node.Ready():
		for _, m := range rd.Messages {
			switch m.To {
			case raft.LocalAppendThread:
				toAppend <- m
			case raft.LocalApplyThread:
				toApply <-m
			default:
				sendOverNetwork(m)
			}
		}
	case <-s.done:
		return
	}
}
```

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

```go
// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}
```

\## Compatibility

The library remains backwards compatible with existing users and the change does
not introduce any breaking changes. Users that do not set `AsyncStorageWrites`
to true in the `Config` struct will not notice a difference with this change.
This is despite the fact that the existing "synchronous storage writes"
interface was adapted to share a majority of the same code. For instance,
`Node.Advance` has been adapted to transparently acknowledge an asynchronous log
append attempt and an asynchronous state machine application attempt, internally
using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to
a process and nodes using asynchronous storage writes appear to behave no
differently from the outside. Clusters are free to mix nodes running with and
without asynchronous storage writes.

\## Performance

The bulk of the performance evaluation of this functionality thus far has been
done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking
harness developed to experiment with Raft proposal pipeline optimization. The
harness can be used to run single-node benchmarks or multi-node benchmarks. It
supports plugable raft logs, storage engines, network transports, and pipeline
implementations.

To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine
(`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3)
a pipeline implementation on top of the new asynchronous storage writes
functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:
- **basic** (P1): baseline stock raft usage, similar to the code in `doc.go`
- **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes
  two significant variations to the basic pipeline. The first is that it sends
  MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69)
  for explanation), allowing log appends to occur in parallel across replicas.
  The second is that it acknowledges committed log entries before applying them
  (see [commit](cockroachdb/cockroach@87aaea7)
  for explanation).
- **async append + async apply + early ack** (P3): A pipelining using asynchronous storage
  writes with a separate append thread and a separate apply thread. Also uses the same
  early acknowledgement optimization from above to ack committed entries before handing
  them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with
gp3 EBS volumes (16000 IOPS, 1GB/s throughput).

![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg)

The comparison demonstrates two different benefits of asynchronous storage
writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For
instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms,
P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a
reduction in average latency of 28% from the optimized pipeline that does not
use asynchronous storage writes. This matches expectations outlined in
cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. This is
because asynchronous storage writes can improve batching for both log appends
and log application. In this experiment, we saw the average append batch size
under saturation increase from 928 to 1542, which is a similar ratio to the
increase in peak throughput. We see a similar difference for apply batch sizes.

There is more benchmarking to do. For instance, we'll need to thoroughly verify
that this change does not negatively impact the performance of users of this
library that do not use asynchronous storage writes.

Signed-off-by: Nathan VanBenschoten <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
5 participants