Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
104553: sql: support foreign key checks in udfs r=rharding6373 a=rharding6373

This change removes restrictions around and adds support for running postquery checks in routines, which allows postquery checks like foreign key constraint checks in UDFs.

Epic: CRDB-25388
Informs: cockroachdb#87289

Release note: None

107384: kvcoord: Fix metrics tracking in mux rangefeed r=miretskiy a=miretskiy

Fix an observability bug in mux rangefeed which would incorrectly count various rangefeed related metrics (total ranges, catchup ranges, etc).

Fixes cockroachdb#106152
Fixes cockroachdb#106252

Release note: None

107996: sql: delete TestSchemaChangeCompletion r=chengxiong-ruan a=chengxiong-ruan

Informs: cockroachdb#51796

The test is very buggy and not doing what it was supposed to test. Deleting since we already have TestLegacySchemaChangerWaitsForOtherSchemaChanges and TestConcurrentSchemaChangesDoNotDeadlock.

Release note: None

Co-authored-by: rharding6373 <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Chengxiong Ruan <[email protected]>
  • Loading branch information
4 people committed Aug 2, 2023
4 parents 6eb06ac + 54c3e26 + 41049cb + 19a000d commit 05ad3c4
Show file tree
Hide file tree
Showing 21 changed files with 724 additions and 159 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ go_test(
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/span",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
31 changes: 21 additions & 10 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,18 @@ type DistSenderMetrics struct {
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
DistSenderRangeFeedMetrics
}

// DistSenderRangeFeedMetrics is a set of rangefeed specific metrics.
type DistSenderRangeFeedMetrics struct {
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
}

func makeDistSenderMetrics() DistSenderMetrics {
Expand All @@ -334,11 +339,7 @@ func makeDistSenderMetrics() DistSenderMetrics {
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
DistSenderRangeFeedMetrics: makeDistSenderRangeFeedMetrics(),
}
for i := range m.MethodCounts {
method := kvpb.Method(i).String()
Expand All @@ -357,6 +358,16 @@ func makeDistSenderMetrics() DistSenderMetrics {
return m
}

func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics {
return DistSenderRangeFeedMetrics{
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
}
}

// updateCrossLocalityMetricsOnReplicaAddressedBatchRequest updates
// DistSenderMetrics for batch requests that have been divided and are currently
// forwarding to a specific replica for the corresponding range. The metrics
Expand Down
57 changes: 32 additions & 25 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type rangefeedMuxer struct {
g ctxgroup.Group

ds *DistSender
metrics *DistSenderRangeFeedMetrics
cfg rangeFeedConfig
registry *rangeFeedRegistry
catchupSem *limit.ConcurrentRequestLimiter
Expand Down Expand Up @@ -84,9 +85,13 @@ func muxRangeFeed(
registry: rr,
ds: ds,
cfg: cfg,
metrics: &ds.metrics.DistSenderRangeFeedMetrics,
catchupSem: catchupSem,
eventCh: eventCh,
}
if cfg.knobs.metrics != nil {
m.metrics = cfg.knobs.metrics
}

divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g)
return errors.CombineErrors(m.g.Wait(), ctx.Err())
Expand Down Expand Up @@ -158,7 +163,7 @@ type activeMuxRangeFeed struct {
roachpb.ReplicaDescriptor
startAfter hlc.Timestamp

// cathchupRes is the catchup scan quota acquired upon the
// catchupRes is the catchup scan quota acquired upon the
// start of rangefeed.
// It is released when this stream receives first non-empty checkpoint
// (meaning: catchup scan completes).
Expand Down Expand Up @@ -211,7 +216,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(

// Register active mux range feed.
stream := &activeMuxRangeFeed{
activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.ds.metrics.RangefeedRanges),
activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics.RangefeedRanges),
rSpan: rs,
startAfter: startAfter,
token: token,
Expand Down Expand Up @@ -241,7 +246,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error

{
// Before starting single rangefeed, acquire catchup scan quota.
catchupRes, err := acquireCatchupScanQuota(ctx, m.ds, m.catchupSem)
catchupRes, err := acquireCatchupScanQuota(ctx, &m.ds.st.SV, m.catchupSem, m.metrics)
if err != nil {
return err
}
Expand Down Expand Up @@ -387,13 +392,19 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
recvErr = nil
}

toRestart := ms.close()

// make sure that the underlying error is not fatal. If it is, there is no
// reason to restart each rangefeed, so just bail out.
if _, err := handleRangefeedError(ctx, recvErr); err != nil {
// Regardless of an error, release any resources (i.e. metrics) still
// being held by active stream.
for _, s := range toRestart {
s.release()
}
return err
}

toRestart := ms.close()
if log.V(1) {
log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart))
}
Expand Down Expand Up @@ -429,8 +440,14 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
continue
}

if m.cfg.knobs.onMuxRangefeedEvent != nil {
m.cfg.knobs.onMuxRangefeedEvent(event)
if m.cfg.knobs.onRangefeedEvent != nil {
skip, err := m.cfg.knobs.onRangefeedEvent(ctx, active.Span, &event.RangeFeedEvent)
if err != nil {
return err
}
if skip {
continue
}
}

switch t := event.GetValue().(type) {
Expand All @@ -451,7 +468,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
case *kvpb.RangeFeedError:
log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError())
if active.catchupRes != nil {
m.ds.metrics.RangefeedErrorCatchup.Inc(1)
m.metrics.RangefeedErrorCatchup.Inc(1)
}
ms.deleteStream(event.StreamID)
// Restart rangefeed on another goroutine. Restart might be a bit
Expand All @@ -473,7 +490,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
}
}

// restarActiveRangeFeeds restarts one or more rangefeeds.
// restartActiveRangeFeeds restarts one or more rangefeeds.
func (m *rangefeedMuxer) restartActiveRangeFeeds(
ctx context.Context, reason error, toRestart []*activeMuxRangeFeed,
) error {
Expand All @@ -489,13 +506,7 @@ func (m *rangefeedMuxer) restartActiveRangeFeeds(
func (m *rangefeedMuxer) restartActiveRangeFeed(
ctx context.Context, active *activeMuxRangeFeed, reason error,
) error {
m.ds.metrics.RangefeedRestartRanges.Inc(1)

if log.V(1) {
log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor,
timeutil.Since(active.Resolved.GoTime()), reason)
}
m.metrics.RangefeedRestartRanges.Inc(1)
active.setLastError(reason)

// Release catchup scan reservation if any -- we will acquire another
Expand All @@ -518,6 +529,12 @@ func (m *rangefeedMuxer) restartActiveRangeFeed(
return err
}

if log.V(1) {
log.Infof(ctx, "RangeFeed %s@%s (r%d, replica %s) disconnected with last checkpoint %s ago: %v (errInfo %v)",
active.Span, active.StartAfter, active.RangeID, active.ReplicaDescriptor,
timeutil.Since(active.Resolved.GoTime()), reason, errInfo)
}

if errInfo.evict {
active.resetRouting(ctx, rangecache.EvictionToken{})
}
Expand Down Expand Up @@ -587,13 +604,3 @@ func (c *muxStream) close() []*activeMuxRangeFeed {

return toRestart
}

// a test only option to modify mux rangefeed event.
func withOnMuxEvent(fn func(event *kvpb.MuxRangeFeedEvent)) RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.knobs.onMuxRangefeedEvent = fn
})
}

// TestingWithOnMuxEvent allow external tests access to the withOnMuxEvent option.
var TestingWithOnMuxEvent = withOnMuxEvent
Loading

0 comments on commit 05ad3c4

Please sign in to comment.