diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index acc6783b68f2..e6c0e0ed556e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -237,7 +237,9 @@ func (m *rangefeedMuxer) startSingleRangeFeed( if m.cfg.withMetadata { // Send metadata after the stream successfully registers to avoid sending // metadata about a rangefeed that never starts. - sendMetadata(m.eventCh, span, parentRangefeedMetadata) + if err := sendMetadata(ctx, m.eventCh, span, parentRangefeedMetadata); err != nil { + return err + } } return nil diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 5c6004e5106b..1580ec0d4f57 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -285,7 +285,9 @@ func (ds *DistSender) RangeFeedSpans( }) if cfg.withMetadata { - sendMetadata(eventCh, span, sri.parentRangeFeedMetadata) + if err := sendMetadata(ctx, eventCh, span, sri.parentRangeFeedMetadata); err != nil { + return err + } } case <-ctx.Done(): @@ -520,9 +522,13 @@ type parentRangeFeedMetadata struct { } func sendMetadata( - eventCh chan<- RangeFeedMessage, span roachpb.Span, parentMetadata parentRangeFeedMetadata, -) { - eventCh <- RangeFeedMessage{ + ctx context.Context, + eventCh chan<- RangeFeedMessage, + span roachpb.Span, + parentMetadata parentRangeFeedMetadata, +) error { + select { + case eventCh <- RangeFeedMessage{ RangeFeedEvent: &kvpb.RangeFeedEvent{ Metadata: &kvpb.RangeFeedMetadata{ Span: span, @@ -531,6 +537,10 @@ func sendMetadata( }, }, RegisteredSpan: span, + }: + return nil + case <-ctx.Done(): + return ctx.Err() } }