diff --git a/pkg/settings/values.go b/pkg/settings/values.go index 3664f0914c15..3a953a7a4642 100644 --- a/pkg/settings/values.go +++ b/pkg/settings/values.go @@ -21,7 +21,7 @@ import ( // MaxSettings is the maximum number of settings that the system supports. // Exported for tests. -const MaxSettings = 511 +const MaxSettings = 1023 // Values is a container that stores values for all registered settings. // Each setting is assigned a unique slot (up to MaxSettings). diff --git a/pkg/sql/distsql/BUILD.bazel b/pkg/sql/distsql/BUILD.bazel index df99eda7ad1e..5fd46c5fceae 100644 --- a/pkg/sql/distsql/BUILD.bazel +++ b/pkg/sql/distsql/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/kv", "//pkg/roachpb", "//pkg/server/telemetry", + "//pkg/settings", "//pkg/sql/catalog/descs", "//pkg/sql/colflow", "//pkg/sql/execinfra", diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 824861ad4437..637233cda6c0 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/colflow" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -148,6 +149,15 @@ func (ds *ServerImpl) SetCancelDeadFlowsCallback(cb func(int)) { ds.flowScheduler.TestingKnobs.CancelDeadFlowsCallback = cb } +// TODO(yuzefovich): remove this setting in 23.1. +var cancelRunningQueriesAfterFlowDrainWait = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.distsql.drain.cancel_after_wait.enabled", + "determines whether queries that are still running on a node being drained "+ + "are forcefully canceled after waiting the 'server.shutdown.query_wait' period", + true, +) + // Drain changes the node's draining state through gossip and drains the // server's flowRegistry. See flowRegistry.Drain for more details. func (ds *ServerImpl) Drain( @@ -167,7 +177,8 @@ func (ds *ServerImpl) Drain( // wait a minimum time for the draining state to be gossiped. minWait = 0 } - ds.flowRegistry.Drain(flowWait, minWait, reporter) + cancelStillRunning := cancelRunningQueriesAfterFlowDrainWait.Get(&ds.Settings.SV) + ds.flowRegistry.Drain(flowWait, minWait, reporter, cancelStillRunning) } // setDraining changes the node's draining state through gossip to the provided diff --git a/pkg/sql/distsql/setup_flow_after_drain_test.go b/pkg/sql/distsql/setup_flow_after_drain_test.go index 4388c3a8ee77..b1a17170a8b1 100644 --- a/pkg/sql/distsql/setup_flow_after_drain_test.go +++ b/pkg/sql/distsql/setup_flow_after_drain_test.go @@ -47,7 +47,9 @@ func TestSetupFlowAfterDrain(t *testing.T) { flowScheduler, ) distSQLSrv.flowRegistry.Drain( - time.Duration(0) /* flowDrainWait */, time.Duration(0) /* minFlowDrainWait */, nil /* reporter */) + time.Duration(0) /* flowDrainWait */, time.Duration(0), /* minFlowDrainWait */ + nil /* reporter */, false, /* cancelStillRunning */ + ) // We create some flow; it doesn't matter what. req := execinfrapb.SetupFlowRequest{Version: execinfra.Version} diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 7ad00d39b8da..c3b529f13fdc 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -114,11 +114,6 @@ type Flow interface { // query. IsLocal() bool - // HasInboundStreams returns whether this flow has any inbound streams (i.e. - // it is part of the distributed plan and other nodes are sending data to - // this flow). - HasInboundStreams() bool - // IsVectorized returns whether this flow will run with vectorized execution. IsVectorized() bool @@ -392,11 +387,18 @@ func (f *FlowBase) StartInternal( ctx, 1, "starting (%d processors, %d startables) asynchronously", len(processors), len(f.startables), ) - // Only register the flow if there will be inbound stream connections that - // need to look up this flow in the flow registry. - if f.HasInboundStreams() { - // Once we call RegisterFlow, the inbound streams become accessible; we must - // set up the WaitGroup counter before. + // Only register the flow if it is a part of the distributed plan. This is + // needed to satisfy two different use cases: + // 1. there are inbound stream connections that need to look up this flow in + // the flow registry. This can only happen if the plan is not fully local + // (since those inbound streams originate on different nodes). + // 2. when the node is draining, the flow registry can cancel all running + // non-fully local flows if they don't finish on their own during the grace + // period. Cancellation of local flows occurs by cancelling the connections + // that the local flows were spinned up for. + if !f.IsLocal() { + // Once we call RegisterFlow, the inbound streams become accessible; we + // must set up the WaitGroup counter before. // The counter will be further incremented below to account for the // processors. f.waitGroup.Add(len(f.inboundStreams)) @@ -427,7 +429,7 @@ func (f *FlowBase) StartInternal( // a vectorized flow with a parallel unordered synchronizer. That component // starts goroutines on its own, so we need to preserve that fact so that we // correctly wait in Wait(). - f.startedGoroutines = f.startedGoroutines || len(f.startables) > 0 || len(processors) > 0 || f.HasInboundStreams() + f.startedGoroutines = f.startedGoroutines || len(f.startables) > 0 || len(processors) > 0 || len(f.inboundStreams) > 0 return nil } @@ -436,11 +438,6 @@ func (f *FlowBase) IsLocal() bool { return f.Local } -// HasInboundStreams returns whether this flow has any inbound streams. -func (f *FlowBase) HasInboundStreams() bool { - return len(f.inboundStreams) != 0 -} - // IsVectorized returns whether this flow will run with vectorized execution. func (f *FlowBase) IsVectorized() bool { panic("IsVectorized should not be called on FlowBase") @@ -549,7 +546,8 @@ func (f *FlowBase) Cleanup(ctx context.Context) { if log.V(1) { log.Infof(ctx, "cleaning up") } - if f.HasInboundStreams() && f.Started() { + // Local flows do not get registered. + if !f.IsLocal() && f.Started() { f.flowRegistry.UnregisterFlow(f.ID) } f.status = flowFinished @@ -569,7 +567,7 @@ func (f *FlowBase) Cleanup(ctx context.Context) { // For a detailed description of the distsql query cancellation mechanism, // read docs/RFCS/query_cancellation.md. func (f *FlowBase) cancel() { - if !f.HasInboundStreams() { + if len(f.inboundStreams) == 0 { return } // Pending streams have yet to be started; send an error to its receivers diff --git a/pkg/sql/flowinfra/flow_registry.go b/pkg/sql/flowinfra/flow_registry.go index 497e03284c76..3d6749836bd4 100644 --- a/pkg/sql/flowinfra/flow_registry.go +++ b/pkg/sql/flowinfra/flow_registry.go @@ -438,7 +438,9 @@ func (fr *FlowRegistry) waitForFlow( // are still flows active after flowDrainWait, Drain waits an extra // expectedConnectionTime so that any flows that were registered at the end of // the time window have a reasonable amount of time to connect to their -// consumers, thus unblocking them. +// consumers, thus unblocking them. All flows that are still running at this +// point are canceled if cancelStillRunning is true. +// // The FlowRegistry rejects any new flows once it has finished draining. // // Note that since local flows are not added to the registry, they are not @@ -454,6 +456,7 @@ func (fr *FlowRegistry) Drain( flowDrainWait time.Duration, minFlowDrainWait time.Duration, reporter func(int, redact.SafeString), + cancelStillRunning bool, ) { allFlowsDone := make(chan struct{}, 1) start := timeutil.Now() @@ -480,6 +483,18 @@ func (fr *FlowRegistry) Drain( time.Sleep(expectedConnectionTime) fr.Lock() } + if cancelStillRunning { + // Now cancel all still running flows. + for _, f := range fr.flows { + if f.flow != nil && f.flow.ctxCancel != nil { + // f.flow might be nil when ConnectInboundStream() was + // called, but the consumer of that inbound stream hasn't + // been scheduled yet. + // f.flow.ctxCancel might be nil in tests. + f.flow.ctxCancel() + } + } + } fr.Unlock() }() diff --git a/pkg/sql/flowinfra/flow_registry_test.go b/pkg/sql/flowinfra/flow_registry_test.go index 257407067626..b8a5314c4db9 100644 --- a/pkg/sql/flowinfra/flow_registry_test.go +++ b/pkg/sql/flowinfra/flow_registry_test.go @@ -393,7 +393,7 @@ func TestFlowRegistryDrain(t *testing.T) { registerFlow(t, id) drainDone := make(chan struct{}) go func() { - reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */) + reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */) drainDone <- struct{}{} }() // Be relatively sure that the FlowRegistry is draining. @@ -406,7 +406,7 @@ func TestFlowRegistryDrain(t *testing.T) { // DrainTimeout verifies that Drain returns once the timeout expires. t.Run("DrainTimeout", func(t *testing.T) { registerFlow(t, id) - reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */) + reg.Drain(0 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */) reg.UnregisterFlow(id) reg.Undrain() }) @@ -417,7 +417,7 @@ func TestFlowRegistryDrain(t *testing.T) { registerFlow(t, id) drainDone := make(chan struct{}) go func() { - reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */) + reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */) drainDone <- struct{}{} }() // Be relatively sure that the FlowRegistry is draining. @@ -460,7 +460,7 @@ func TestFlowRegistryDrain(t *testing.T) { } defer func() { reg.testingRunBeforeDrainSleep = nil }() go func() { - reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */) + reg.Drain(math.MaxInt64 /* flowDrainWait */, 0 /* minFlowDrainWait */, nil /* reporter */, false /* cancelStillRunning */) drainDone <- struct{}{} }() if err := <-errChan; err != nil { @@ -488,7 +488,7 @@ func TestFlowRegistryDrain(t *testing.T) { minFlowDrainWait := 10 * time.Millisecond start := timeutil.Now() go func() { - reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */) + reg.Drain(math.MaxInt64 /* flowDrainWait */, minFlowDrainWait, nil /* reporter */, false /* cancelStillRunning */) drainDone <- struct{}{} }() // Be relatively sure that the FlowRegistry is draining. diff --git a/pkg/sql/flowinfra/flow_scheduler_test.go b/pkg/sql/flowinfra/flow_scheduler_test.go index 7b845d857a60..b9e65fb5b4e2 100644 --- a/pkg/sql/flowinfra/flow_scheduler_test.go +++ b/pkg/sql/flowinfra/flow_scheduler_test.go @@ -91,10 +91,6 @@ func (m *mockFlow) IsLocal() bool { panic("not implemented") } -func (m *mockFlow) HasInboundStreams() bool { - panic("not implemented") -} - func (m *mockFlow) IsVectorized() bool { panic("not implemented") }