Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: Streaming KV protocol #8360

Open
bdarnell opened this issue Aug 7, 2016 · 6 comments
Open

kv: Streaming KV protocol #8360

bdarnell opened this issue Aug 7, 2016 · 6 comments
Labels
A-kv-client Relating to the KV client and the KV interface. C-investigation Further steps needed to qualify. C-label will change. T-kv KV Team X-nostale Marks an issue/pr that should be ignored by the stale bot
Milestone

Comments

@bdarnell
Copy link
Contributor

bdarnell commented Aug 7, 2016

To fix #7933, we are removing an abstraction-violating optimization that allowed us to detect when a command had been proposed to raft (and was therefore likely but not guaranteed to execute before any subsequently-proposed command). We could restore this optimization if we moved the kv Batch request to a streaming model, and reported the status of the request as it proceeded through the system.

Reporting the status of a batch could also allow us to reduce time-related retries in DistSender, since an accepted batch indicates that we have sent the request to a node that is up and believes it is able to handle the request. Streaming ScanResponses could also reduce the memory impact of large scans.

Jira issue: CRDB-6162

@petermattis petermattis added this to the Later milestone Feb 22, 2017
@bdarnell bdarnell added C-investigation Further steps needed to qualify. C-label will change. A-kv-client Relating to the KV client and the KV interface. labels Apr 26, 2018
@nvanbenschoten
Copy link
Member

I gave this a shot using two different approaches to test out how we might want to go about this in the future. Both approaches relied on server-side streaming in gRPC using the following service description:

service Internal {
  rpc Batch (BatchRequest) returns (stream BatchResponse) {}
}

This made the server-side changes pretty straightforward because each request was still run on its own goroutine with its own stack. This would not have been the case with client-side or bidirectional streaming, which would require an extra routing layer at the node/server/replica level.

The changes to the client and specifically its Sender interface were more invasive.

Iterator model

My first approach followed gRPC's lead and turned the Sender interface into an iterator-like model by returning ResponseStream interfaces from each implementor. The ResponseStream interface looked like:

type ResponseStream interface {
	Recv() (*BatchResponse, error)
}

and Sender now looked like:

type Sender interface {
	Send(context.Context, roachpb.BatchRequest) ResponseStream
}

Notably, gRPC's own Internal_BatchClient implemented this interface directly. The nice part about this model is that it didn't require any extra goroutines, as control flow was completely limited to recursive calls to Recv. The big downside was that it required a large amount of code to be moved around into iterator/stream structs that maintained state that previously sat on a Sender's stack. These streams also became very difficult to merge. This is to be expected with an approach like this, but nevertheless, it made the code much more difficult to understand and a lot harder to write. If I wasn't a believer in generators and continuations before, I am now.

Goroutine model:

My second approach was to use goroutines and channels in a more Go-like style. However, I still wanted to maintain a proper call stack, so I only used channels for responses and retained a Send method on the Sender interface. The Sender interface was adapted to look like

type Sender interface {
	Send(context.Context, roachpb.BatchRequest) <-chan *roachpb.BatchResponse
}

The nice part about this approach was that it required much less invasive code changes except at the boundaries of Senders. It also had an advantage over iterator model because streams were possible to merge using select statements, although arbitrary numbers of streams could only be merged using reflect. Select (which has pretty miserable performance). However, this came at a cost of a very large number of goroutines and channels, as there's no way to introduce filtering or mapping into a channel pipeline without an extra goroutine and channel. This could have been addressed by using "worker" goroutines at each stage of the pipeline that operates across requests, but this requires extra bookkeeping and sacrifices a call stack as a result.

DistSender+BatchRequest Impedance Mismatch

The biggest issue I ran into with either approach was in DistSender. The challenge wasn't actually a consequence of the approaches themselves, but rather a consequence of an impedance mismatch between DistSender's current contract and the idea of a streaming KV interface. DistSender contains a lot of logic to ensure that BatchRequests can be fanned out to multiple Replicas and then glued back together into a single BatchResponse. This is all done to hide that fact that batches often need to distribute individual requests across ranges so that each BatchRequest results in a single corresponding BatchResponse. This entire process is at odds with the idea of streaming KV responses, which fundamentally assumes that partial responses exist not only from different requests in a batch but even within a single request itself.

With this in mind, it wasn't clear how DistSender would function in an architecture like this. For instance, picture a world where every request has two responses: an evaluation result and a proposal result. Should DistSender wait for all evaluation results to return before gluing them all together and returning them all at once? Should it stream each individual response from a range upon receiving it, even if the range wasn't processing the entire batch at once? In that case, who should be in-charge of stitching the entire BatchResponse together? Should BatchResponses even exist? Do we need it in a pipelined world?

This is the extent of my current progress pulling on this thread. I don't have loyalty to any one approach and would love to get input from others on ways to proceed. Also, for context about why I'm interested in this, it's primarily related to transactional pipelining (this comment is probably most relevant). With some amount of response streaming, the current transactional pipelining proposal would not need to rely exclusively on QueryIntent requests to determine whether a proposal succeeded or not. Instead, it could keep a response stream open for evaluated-but-not-yet-replicated writes and record when they have succeeded.

@a-robinson
Copy link
Contributor

Also, for context about why I'm interested in this, it's primarily related to transactional pipelining

It'd also do wonders for performance given that gRPC scales significantly better for streaming interfaces than unary requests.

@nvanbenschoten
Copy link
Member

gRPC scales significantly better for streaming interfaces than unary requests

Yes, absolutely, although for these tests I was only using server-side streaming. My understanding is that the majority of the benefit of gRPC streaming is that it doesn't need to set up a new connection for each incomming request. With server-side streaming only (i.e. not bidirectional), I think we still fall back to a unique gRPC connection per request. That said, in the context of transactional pipelining, a single request/connection for a pair of (evaluation result, proposal result) responses is still a significant improvement over a request for each of the two responses.

@bdarnell
Copy link
Contributor Author

It'd also do wonders for performance given that gRPC scales significantly better for streaming interfaces than unary requests.

I don't think that applies here. The raft transport benefits from using a long-lived stream because a single RPC call is equivalent to (open stream, send message, get response, close stream). I would not expect a stream per KV RPC to perform any better than the current model. (Moving to long-lived streams could give performance improvements, but then we'd have to do a lot of the bookkeeping that GRPC does to match up requests and responses ourselves).

@andreimatei
Copy link
Contributor

I think this issue deserves to continue to exist because it has good notes. The benefits of having multiple responses per RPC I believe have only increased since txn pipelining.

@andreimatei andreimatei reopened this Apr 23, 2021
@jlinder jlinder added the T-kv KV Team label Jun 16, 2021
@github-actions
Copy link

We have marked this issue as stale because it has been inactive for
18 months. If this issue is still relevant, removing the stale label
or adding a comment will keep it active. Otherwise, we'll close it in
10 days to keep the issue queue tidy. Thank you for your contribution
to CockroachDB!

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Oct 10, 2023
@github-project-automation github-project-automation bot moved this to Closed in KV Aug 28, 2024
@yuzefovich yuzefovich reopened this Nov 19, 2024
@yuzefovich yuzefovich added X-nostale Marks an issue/pr that should be ignored by the stale bot and removed X-stale no-issue-activity labels Nov 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-kv-client Relating to the KV client and the KV interface. C-investigation Further steps needed to qualify. C-label will change. T-kv KV Team X-nostale Marks an issue/pr that should be ignored by the stale bot
Projects
No open projects
Archived in project
Development

No branches or pull requests

8 participants