diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index a38ed1d9628a..e226b3ef90c0 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -306,7 +306,13 @@ func (s *eventStream) onMetadata(ctx context.Context, metadata *kvpb.RangeFeedMe if metadata.FromManualSplit && !metadata.Span.Key.Equal(metadata.ParentStartKey) { // Only send new manual split keys (i.e. a child rangefeed start key that // differs from the parent start key) + if s.addMu != nil { + // Split points can be sent concurrently during the initial scan. + s.addMu.Lock() + defer s.addMu.Unlock() + } s.seb.addSplitPoint(metadata.Span.Key) + s.setErr(s.maybeFlushBatch(ctx)) } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index acc6783b68f2..e6c0e0ed556e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -237,7 +237,9 @@ func (m *rangefeedMuxer) startSingleRangeFeed( if m.cfg.withMetadata { // Send metadata after the stream successfully registers to avoid sending // metadata about a rangefeed that never starts. - sendMetadata(m.eventCh, span, parentRangefeedMetadata) + if err := sendMetadata(ctx, m.eventCh, span, parentRangefeedMetadata); err != nil { + return err + } } return nil diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 3cf65f4ba133..a4f7704b2f1f 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -285,7 +285,9 @@ func (ds *DistSender) RangeFeedSpans( }) if cfg.withMetadata { - sendMetadata(eventCh, span, sri.parentRangeFeedMetadata) + if err := sendMetadata(ctx, eventCh, span, sri.parentRangeFeedMetadata); err != nil { + return err + } } case <-ctx.Done(): @@ -520,9 +522,13 @@ type parentRangeFeedMetadata struct { } func sendMetadata( - eventCh chan<- RangeFeedMessage, span roachpb.Span, parentMetadata parentRangeFeedMetadata, -) { - eventCh <- RangeFeedMessage{ + ctx context.Context, + eventCh chan<- RangeFeedMessage, + span roachpb.Span, + parentMetadata parentRangeFeedMetadata, +) error { + select { + case eventCh <- RangeFeedMessage{ RangeFeedEvent: &kvpb.RangeFeedEvent{ Metadata: &kvpb.RangeFeedMetadata{ Span: span, @@ -531,6 +537,10 @@ func sendMetadata( }, }, RegisteredSpan: span, + }: + return nil + case <-ctx.Done(): + return ctx.Err() } }