Skip to content

Commit

Permalink
Merge pull request #123676 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1.0-rc-123519

release-24.1.0-rc: rangefeed: fix sendMetadata deadlock
  • Loading branch information
celiala authored May 7, 2024
2 parents 418905b + bb9a627 commit 6205244
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,13 @@ func (s *eventStream) onMetadata(ctx context.Context, metadata *kvpb.RangeFeedMe
if metadata.FromManualSplit && !metadata.Span.Key.Equal(metadata.ParentStartKey) {
// Only send new manual split keys (i.e. a child rangefeed start key that
// differs from the parent start key)
if s.addMu != nil {
// Split points can be sent concurrently during the initial scan.
s.addMu.Lock()
defer s.addMu.Unlock()
}
s.seb.addSplitPoint(metadata.Span.Key)
s.setErr(s.maybeFlushBatch(ctx))
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
if m.cfg.withMetadata {
// Send metadata after the stream successfully registers to avoid sending
// metadata about a rangefeed that never starts.
sendMetadata(m.eventCh, span, parentRangefeedMetadata)
if err := sendMetadata(ctx, m.eventCh, span, parentRangefeedMetadata); err != nil {
return err
}
}

return nil
Expand Down
18 changes: 14 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ func (ds *DistSender) RangeFeedSpans(
})

if cfg.withMetadata {
sendMetadata(eventCh, span, sri.parentRangeFeedMetadata)
if err := sendMetadata(ctx, eventCh, span, sri.parentRangeFeedMetadata); err != nil {
return err
}
}

case <-ctx.Done():
Expand Down Expand Up @@ -520,9 +522,13 @@ type parentRangeFeedMetadata struct {
}

func sendMetadata(
eventCh chan<- RangeFeedMessage, span roachpb.Span, parentMetadata parentRangeFeedMetadata,
) {
eventCh <- RangeFeedMessage{
ctx context.Context,
eventCh chan<- RangeFeedMessage,
span roachpb.Span,
parentMetadata parentRangeFeedMetadata,
) error {
select {
case eventCh <- RangeFeedMessage{
RangeFeedEvent: &kvpb.RangeFeedEvent{
Metadata: &kvpb.RangeFeedMetadata{
Span: span,
Expand All @@ -531,6 +537,10 @@ func sendMetadata(
},
},
RegisteredSpan: span,
}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

Expand Down

0 comments on commit 6205244

Please sign in to comment.