-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvserver/rangefeed: add unbuffered registration #136310
Conversation
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
8294e88
to
b55d3f3
Compare
7351476
to
ba4096f
Compare
Not sure what everyone's preference are given the recent test failures in rangefeed. I'll let the review begin but hold off on merging until master stabilizes. |
09a04e6
to
6d5a0bb
Compare
So far the test failures have been pretty minor and easy to debug. So I think this plan makes sense. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this a first pass, mostly just reading through the comments.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @tbg and @wenyihu6)
-- commits
line 16 at r1:
I don't think we want to advertise this cluster setting yet.
-- commits
line 21 at r1:
Minor, but I think the standard format is Co-authored-by: Steven Danna <[email protected]>
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 26 at r1 (raw file):
// to the BufferedStream, depending on the BufferedStream to be non-blocking. As // a result, it does not require a long-lived goroutine to volley event when // reading from raft.
It looks to me like we might have lost some updates to this comment that I pushed up earlier. That's OK. I have them locally though, let me know if you think this is more clear or less clear. I'll add other comment updates as well, but please consider all of them suggestions, not requirements.
+// unbufferedRegistration is similar to bufferedRegistration but does not
+// internaly buffer events deliverd via publish(). Rather, when caught up, it
+// forwards events directly to the BufferedStream. It assumes that
+// BufferedStream's SendBuffered method is non-blocking. As a result, it does
+// not require a long-lived goroutine to move event between an internal buffer
+// and the sender.
+//
+// Note that UnbufferedRegistration needs to ensure that events sent to
+// BufferedStream from the processor during a running catchup scan are sent to
+// the client in order. To achieve this, events from catch-up scans are sent
+// using UnbufferedSend. While the catch-up scan is ongoing, live updates
+// delivered via publish() are temporarily buffered in catchUpBuf to hold them
+// until catch-up scan is complete. After the catch-up scan is done, we know
+// that catch-up scan events have been sent to grpc stream successfully.
+// Catch-up buffer is then drained by our hopefully short-lived output loop
+// goroutine. Once the catchup buffer is fully drained publish begins sending to
+// the buffered sender directly. unbufferedRegistration is responsible for
+// correctly handling memory reservations for data stored in the catchUpBuf.
+//
+// All errors are delivered to this registration via Disonnect() which is
+// non-blocking. Disconnect sends an error to the stream and invokes any
+// necessary cleanup.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 59 at r1 (raw file):
// True if this catchUpBuf has overflowed, live raft events are dropped. // This will cause the registration to disconnect with an error once // catch-up scan is done and catchUpBuf is drained and published. Once set
once the catch-up scan is done
.
And maybe Once catchUpOverflowed is true, it will always be true
.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 65 at r1 (raw file):
// It is set to nil if catchUpBuf has been drained (either catch-up scan // succeeded or failed). If no catch-up iter is provided, catchUpBuf will be // nil since the initialization.
Similarly, here is my alternative comment if you are interested.
+ // catchUpBuf hold events published to this registration while the catch up
+ // scan is running. It is set to nil once it has been drained (because
+ // either catch-up scan succeeded or failed). If no catch-up iter is
+ // provided, catchUpBuf will be nil from the point of initialization.
+ catchUpBuf chan *sharedEvent
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 69 at r1 (raw file):
// Fine to repeatedly cancel context. Management of the catch-up // runOutputLoop context goroutine.
+ // catchUpScanCancelFn is called to tear down the goroutine responsible for
+ // the catch-up scan. Can be called more than once.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 73 at r1 (raw file):
// disconnection indicates that the registration is marked for // disconnection and disconnection is taking place.
Perhaps mention that once disconnected is true, events sent via publish are ignored.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 76 at r1 (raw file):
disconnected bool // catchUpIter is created by replcia under raftMu lock when registration is
nit: replica
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 83 at r1 (raw file):
// Used for testing only. Indicates that all events in catchUpBuf have been // sent to BufferedStream. caughtUp bool
I recently made a change to this in the buffered registration, not sure if we need the same change here because we don't have the "blocking send" behavior that the buffered registration has.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 119 at r1 (raw file):
br.mu.caughtUp = true if br.mu.catchUpIter != nil { // Send to underlying stream directly if catch-up scan is not needed.
+ // A nil catchUpIter indicates we don't need a catch-up scan. We avoid
+ // initializing catchUpBuf in this case, which will result in publish sending
+ // all events to the underying stream immediately.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 127 at r1 (raw file):
// publish sends a single event either to catch up buffer or to BufferedStream // directly depending on whether catch up scan has done. publish is responsible // for using and releasing alloc.
+// publish sends a single event to this registration. It is called by the
+// processor if the event overlaps the span this registration is interested in.
+// Events are either stored in catchUpBuf or sent to BufferedStream directly,
+// depending on whether catch up scan has done. publish is responsible for using
+// and releasing alloc.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 171 at r1 (raw file):
// multiple times, but subsequent errors would be discarded. It may be invoked // to shut down rangefeed from the rangefeed level or from the node level // StreamManager.
+// Disconnect is called to shut down the registration. It is safe to run
+// multiple times, but subsequent errors are discarded. It is invoked by both
+// the processor in response to errors from the replica and by the StreamManager
+// in response to shutdowns.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 212 at r1 (raw file):
// The contract is that it will empty catch-up buffer and set it to nil when // this goroutine ends. Once set to nil, no more events should be put in // catch-up buffer.
+// runOutputLoop runs a catch up scan if required and then moves any events
+// in catchUpBuf from the buffer into the sender.
+//
+// It is expected to be relatively short-lived.
+//
+// Once the catchUpBuf is drained, it will be set to nil, indicating to publish
+// that it is now safe to deliver events directly to the sender.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 225 at r1 (raw file):
if err := ubr.maybeRunCatchUpScan(ctx); err != nil { // Important to disconnect before draining.
This comment may want to explain why it is important.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 245 at r1 (raw file):
// Used for testing only. Wait for this registration to completely drain its // catchUpBuf. func (ubr *unbufferedRegistration) waitForCaughtUp(ctx context.Context) error {
Since this is only used for testing, I would move it to the bottom of the file so it doesn't get in the way when reading through the code.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 320 at r1 (raw file):
defer ubr.mu.Unlock() // TODO(wenyihu6): Check if we can just discard without holding the lock first // ? We shouldn't be reading from the buffer at the same time
I recall discussing this and thought we had answered this question for ourselves. If not, what do we need to do to answer this question?
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 367 at r1 (raw file):
}() // In the future, we might want a separate queue for catch up scans.
Let's turn this into an item in our follow up list and remove the comment.
pkg/kv/kvserver/rangefeed/sender_helper_test.go
line 139 at r1 (raw file):
default: panic("unknown event type") }
Not necessarily for this PR, but I wonder if a DebugString() or some other method on the event type would be helpful here.
Code quote:
for _, ev := range eventList {
switch {
case ev.Val != nil:
fmt.Fprintf(&str, "\t\tvalue")
case ev.Checkpoint != nil:
fmt.Fprintf(&str, "\t\tcheckpoint")
case ev.SST != nil:
fmt.Fprintf(&str, "\t\tsst")
case ev.DeleteRange != nil:
fmt.Fprintf(&str, "\t\tdelete")
case ev.Metadata != nil:
fmt.Fprintf(&str, "\t\tmetadata")
case ev.Error != nil:
fmt.Fprintf(&str, "\t\terror")
default:
panic("unknown event type")
}
pkg/kv/kvserver/rangefeed/scheduled_processor.go
line 386 at r1 (raw file):
if ubr, ok := r.(*unbufferedRegistration); ok { ubr.drainCatchUpBuffer(ctx) }
Rather than this type switch and calling this directly, could we instead put the code that is currently in drainCatchUpBuffer
in drainAllocations
?
b509fa9
to
952d91f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @stevendanna and @tbg)
Previously, stevendanna (Steven Danna) wrote…
I don't think we want to advertise this cluster setting yet.
Removed.
Previously, stevendanna (Steven Danna) wrote…
Minor, but I think the standard format is
Co-authored-by: Steven Danna <[email protected]>
Done.
pkg/kv/kvserver/rangefeed/scheduled_processor.go
line 386 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Rather than this type switch and calling this directly, could we instead put the code that is currently in
drainCatchUpBuffer
indrainAllocations
?
Good idea, done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 26 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
It looks to me like we might have lost some updates to this comment that I pushed up earlier. That's OK. I have them locally though, let me know if you think this is more clear or less clear. I'll add other comment updates as well, but please consider all of them suggestions, not requirements.
+// unbufferedRegistration is similar to bufferedRegistration but does not +// internaly buffer events deliverd via publish(). Rather, when caught up, it +// forwards events directly to the BufferedStream. It assumes that +// BufferedStream's SendBuffered method is non-blocking. As a result, it does +// not require a long-lived goroutine to move event between an internal buffer +// and the sender. +// +// Note that UnbufferedRegistration needs to ensure that events sent to +// BufferedStream from the processor during a running catchup scan are sent to +// the client in order. To achieve this, events from catch-up scans are sent +// using UnbufferedSend. While the catch-up scan is ongoing, live updates +// delivered via publish() are temporarily buffered in catchUpBuf to hold them +// until catch-up scan is complete. After the catch-up scan is done, we know +// that catch-up scan events have been sent to grpc stream successfully. +// Catch-up buffer is then drained by our hopefully short-lived output loop +// goroutine. Once the catchup buffer is fully drained publish begins sending to +// the buffered sender directly. unbufferedRegistration is responsible for +// correctly handling memory reservations for data stored in the catchUpBuf. +// +// All errors are delivered to this registration via Disonnect() which is +// non-blocking. Disconnect sends an error to the stream and invokes any +// necessary cleanup.
Done. Apologies, I might have missed your push when pulling and rebasing. I've addressed your comments here. Let me know if there's anything else from your branch you'd like to merge, and feel free to force-push to this branch if needed.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 59 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
once the catch-up scan is done
.And maybe
Once catchUpOverflowed is true, it will always be true
.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 65 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Similarly, here is my alternative comment if you are interested.
+ // catchUpBuf hold events published to this registration while the catch up + // scan is running. It is set to nil once it has been drained (because + // either catch-up scan succeeded or failed). If no catch-up iter is + // provided, catchUpBuf will be nil from the point of initialization. + catchUpBuf chan *sharedEvent
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 69 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
+ // catchUpScanCancelFn is called to tear down the goroutine responsible for + // the catch-up scan. Can be called more than once.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 73 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Perhaps mention that once disconnected is true, events sent via publish are ignored.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 76 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
nit: replica
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 119 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
+ // A nil catchUpIter indicates we don't need a catch-up scan. We avoid + // initializing catchUpBuf in this case, which will result in publish sending + // all events to the underying stream immediately.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 127 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
+// publish sends a single event to this registration. It is called by the +// processor if the event overlaps the span this registration is interested in. +// Events are either stored in catchUpBuf or sent to BufferedStream directly, +// depending on whether catch up scan has done. publish is responsible for using +// and releasing alloc.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 171 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
+// Disconnect is called to shut down the registration. It is safe to run +// multiple times, but subsequent errors are discarded. It is invoked by both +// the processor in response to errors from the replica and by the StreamManager +// in response to shutdowns.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 212 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
+// runOutputLoop runs a catch up scan if required and then moves any events
+// in catchUpBuf from the buffer into the sender.
+//
+// It is expected to be relatively short-lived.
+//
+// Once the catchUpBuf is drained, it will be set to nil, indicating to publish
+// that it is now safe to deliver events directly to the sender.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 225 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
This comment may want to explain why it is important.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 245 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Since this is only used for testing, I would move it to the bottom of the file so it doesn't get in the way when reading through the code.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 320 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
I recall discussing this and thought we had answered this question for ourselves. If not, what do we need to do to answer this question?
Right, I think I was concerned about the potential issue of calling publishCatchUpBuffer
and drainCatchUpBuffer
concurrently, where one is draining while the other is draining and publishing. If drainCatchUpBuffer doesn't hold a lock, two goroutines could read from the same channel and we might be missing events. This seems impossible given our current implementation given they are only called in the same goroutine. So I'm inclined to allow draining the buffer without holding the lock first and drain again & setting it to nil after holding the lock, reducing contention. That said, we can revisit this improvement after this PR, allowing us more time to think it through and add additional tests. wdyt?
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 367 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Let's turn this into an item in our follow up list and remove the comment.
Done, added in #135332.
pkg/kv/kvserver/rangefeed/sender_helper_test.go
line 139 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Not necessarily for this PR, but I wonder if a DebugString() or some other method on the event type would be helpful here.
Done, added in #135332.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @stevendanna and @tbg)
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 83 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
I recently made a change to this in the buffered registration, not sure if we need the same change here because we don't have the "blocking send" behavior that the buffered registration has.
Good point, added to #135332 given this is testing only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please make sure @stevendanna is happy too. This came out nicely!
Reviewed 9 of 12 files at r1, 3 of 3 files at r2, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @stevendanna and @wenyihu6)
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 73 at r1 (raw file):
Previously, wenyihu6 (Wenyi Hu) wrote…
Done.
disconnection
should be disconnected
here in the first occurrence.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 81 at r2 (raw file):
// catchUpIter is created by replica under raftMu lock when registration is // created. It is detached by output loop for processing and closed when
"detached" implies "set to nil here"? Mind explicitly saying this.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 119 at r2 (raw file):
stream: stream, } br.mu.Locker = &syncutil.Mutex{}
why not embed a syncutil.Mutex
in the struct and be done with it? That's how this works in every other place I've seen.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 131 at r2 (raw file):
} // publish sends a single event to this registration. It is called by the
Just wanted to say that I appreciate all the impl details in the comments. In an ideal world we never break abstractions etc but here things are just so much more approachable with this information included. Thank you for that.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 134 at r2 (raw file):
// processor if the event overlaps the span this registration is interested in. // Events are either stored in catchUpBuf or sent to BufferedStream directly, // depending on whether catch-up scan has done. publish is responsible for using
is done
Code quote:
has done
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 227 at r2 (raw file):
func (ubr *unbufferedRegistration) runOutputLoop(ctx context.Context, forStacks roachpb.RangeID) { ubr.mu.Lock() defer ubr.drainAllocations(ctx)
this is a no-op on success, right?
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 316 at r2 (raw file):
defer ubr.mu.Unlock() if err := publish(); err != nil {
Comment here would be helpful - we did the hopefully large publish outside the lock, so now we have to clear again under lock.
pkg/kv/kvserver/rangefeed/scheduled_processor.go
line 383 at r2 (raw file):
// buffer. If it fails to start, we should drain it here. // TODO(wenyihu6 during code review): Double check if runOutputLoop is // guaranteed to be invoked if err == nil here. Seems true.
Yes, the only reason tasks get rejected is because the stopper is, well, rejecting to start them.
cb21a6f
to
6ff6167
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @stevendanna and @tbg)
pkg/kv/kvserver/rangefeed/scheduled_processor.go
line 383 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
Yes, the only reason tasks get rejected is because the stopper is, well, rejecting to start them.
Thanks, removed the comment.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 73 at r1 (raw file):
Previously, tbg (Tobias Grieger) wrote…
disconnection
should bedisconnected
here in the first occurrence.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 81 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
"detached" implies "set to nil here"? Mind explicitly saying this.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 119 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
why not embed a
syncutil.Mutex
in the struct and be done with it? That's how this works in every other place I've seen.
I was also confused about this - I did this way because bufferedRegistration
uses syncutil.Locker. After discussing it, we decided to leave them consistent. I don't see any specific reasons why buffered reg was done this way. I just pushed a commit that updates both types of registrations to use syncutil.Mutex directly. I can split the first commit into a separate PR.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 134 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
is done
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 227 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
this is a no-op on success, right?
Yes, added a comment here to clarify.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 316 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
Comment here would be helpful - we did the hopefully large publish outside the lock, so now we have to clear again under lock.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 12 of 12 files at r7.
Reviewable status:complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @stevendanna and @wenyihu6)
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 119 at r2 (raw file):
Previously, wenyihu6 (Wenyi Hu) wrote…
I've updated buffered & unbuffered reg to use
sync.Mutex
. I'm not super happy with how often we have to disabledeferunlockcheck
. We could refactor the code to address this, or revert to usingsyncutil.Lock
which seems to bypass this linter check. Wdyt?
I'd leave as is. The lints are unfortunate, but that's a limitation of the linting that we can just live with.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 389 at r8 (raw file):
for re := retry.StartWithCtx(ctx, opts); re.Next(); { ubr.mu.Lock() // nolint:deferunlockcheck
I'm surprised you're seeing the lint fire here by the way. How is this different, from, say, this:
/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go#L659-L661
r.mu.Lock()
r.mu.cancelFns[reqKind][ba] = cancel
r.mu.Unlock()
If you also don't know the answer, perhaps ping #test-eng and see if there's something we're missing.
The only difference I see is that the above mutex is an RWMutex
:
/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go#L462-L463
mu struct {
syncutil.Mutex
which likely avoids the lint, which is not by design.
I think if this limitation in the lint were fixed, it'd be failing all over the place, and we would decide that the linter, not the code, is wrong.
I'll ping #test-eng with this, but think you should just write idiomatic code and suppress the linter where necessary, which you have done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 12 of 12 files at r8, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @stevendanna and @wenyihu6)
Pinged about this lint here: #108599 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @tbg and @wenyihu6)
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 389 at r8 (raw file):
Previously, tbg (Tobias Grieger) wrote…
I'm surprised you're seeing the lint fire here by the way. How is this different, from, say, this:
/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go#L659-L661
r.mu.Lock() r.mu.cancelFns[reqKind][ba] = cancel r.mu.Unlock()If you also don't know the answer, perhaps ping #test-eng and see if there's something we're missing.
The only difference I see is that the above mutex is anRWMutex
:/pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go#L462-L463
mu struct { syncutil.Mutexwhich likely avoids the lint, which is not by design.
I think if this limitation in the lint were fixed, it'd be failing all over the place, and we would decide that the linter, not the code, is wrong.
I'll ping #test-eng with this, but think you should just write idiomatic code and suppress the linter where necessary, which you have done.
The difference is that len()
is a function and the linter disallows function calls inside the locked region. I opened a PR to fix this once: #111236 but no one seemed that keen on it at the time.
0a62df0
to
29db3c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @stevendanna and @tbg)
pkg/kv/kvserver/rangefeed/buffered_registration.go
line 215 at r7 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Do these comments work at the function level so that we could ignore this check for the whole function?
Good idea, done.
pkg/kv/kvserver/rangefeed/scheduled_processor.go
line 383 at r8 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Looking at the code for RunAsyncTaskEx, this appears to be true.
Done.
pkg/kv/kvserver/rangefeed/unbuffered_registration.go
line 21 at r8 (raw file):
Previously, stevendanna (Steven Danna) wrote…
s/do/does/
Done.
Previously, stevendanna (Steven Danna) wrote…
Added to #135332. |
fyi I split the first commit into a separate PR to simplify the review here - #137007 |
73ee233
to
239b295
Compare
@stevendanna how are the rangefeed tests on master looking? are we in a good spot to merge this PR? Happy to take a look at the failing rangefeed tests if it would help. |
Only open test failure that I'm aware of is actually more of a leader leases problem. So I think we are good to merge this as long as CI is green. |
This patch adds unbufferedRegistration. UnbufferedRegistration is like BufferedRegistration but uses BufferedSender to buffer live raft updates instead of a using fixed size channel and having a dedicated per-registration goroutine to volley events to underlying gRPC stream. Instead, there is only one BufferedSender for each incoming node.MuxRangefeed gRPC call. BufferedSender is responsible for buffering and sending its updates to the underlying gRPC stream in a dedicated goroutine O(node). Resolved: cockroachdb#110432 Release note: none Co-authored-by: Steven Danna <[email protected]>
Thanks for the series of the review! bors r+ feels especially satisfying on this one 🥳 bors r=tbg,stevendanna |
This patch adds unbufferedRegistration.
UnbufferedRegistration is like BufferedRegistration but uses BufferedSender to
buffer live raft updates instead of a using fixed size channel and having a
dedicated per-registration goroutine to volley events to underlying gRPC
stream. Instead, there is only one BufferedSender for each incoming
node.MuxRangefeed gRPC call. BufferedSender is responsible for buffering and
sending its updates to the underlying gRPC stream in a dedicated goroutine
O(node).
Resolved: #110432
Release note: none
Co-authored-by: Steven Danna [email protected]