diff --git a/pkg/ccl/sqlproxyccl/conn_migration.go b/pkg/ccl/sqlproxyccl/conn_migration.go index 1706acc0067c..61ed4cf2b361 100644 --- a/pkg/ccl/sqlproxyccl/conn_migration.go +++ b/pkg/ccl/sqlproxyccl/conn_migration.go @@ -78,7 +78,7 @@ func (f *forwarder) tryBeginTransfer() (started bool, cleanupFn func()) { defer f.mu.Unlock() // Forwarder hasn't been initialized. - if !f.isInitializedLocked() { + if !f.mu.isInitialized { return false, nil } @@ -120,9 +120,9 @@ var errTransferCannotStart = errors.New("transfer cannot be started") // where the forwarder is not in a state that is eligible for a connection // migration. // -// NOTE: If the forwarder hasn't been closed, runTransfer has an invariant +// NOTE: If the forwarder hasn't been closed, TransferConnection has an invariant // where the processors have been resumed prior to calling this method. When -// runTransfer returns, it is guaranteed that processors will either be +// TransferConnection returns, it is guaranteed that processors will either be // re-resumed, or the forwarder will be closed (in the case of a non-recoverable // error). // @@ -145,7 +145,7 @@ func (f *forwarder) TransferConnection() (retErr error) { // Create a transfer context, and timeout handler which gets triggered // whenever the context expires. We have to close the forwarder because // the transfer may be blocked on I/O, and the only way for now is to close - // the connections. This then allow runTransfer to return and cleanup. + // the connections. This then allow TransferConnection to return and cleanup. ctx, cancel := newTransferContext(f.ctx) defer cancel() @@ -177,8 +177,8 @@ func (f *forwarder) TransferConnection() (retErr error) { latencyDur := timeutil.Since(tBegin) f.metrics.ConnMigrationAttemptedLatency.RecordValue(latencyDur.Nanoseconds()) - // When runTransfer returns, it's either the forwarder has been closed, - // or the procesors have been resumed. + // When TransferConnection returns, it's either the forwarder has been + // closed, or the procesors have been resumed. if !ctx.isRecoverable() { log.Infof(logCtx, "transfer failed: connection closed, latency=%v, err=%v", latencyDur, retErr) f.metrics.ConnMigrationErrorFatalCount.Inc(1) diff --git a/pkg/ccl/sqlproxyccl/conn_migration_test.go b/pkg/ccl/sqlproxyccl/conn_migration_test.go index 93781d060d7f..09ba67742b77 100644 --- a/pkg/ccl/sqlproxyccl/conn_migration_test.go +++ b/pkg/ccl/sqlproxyccl/conn_migration_test.go @@ -91,6 +91,7 @@ func TestForwarder_tryBeginTransfer(t *testing.T) { f := &forwarder{} f.mu.request = &processor{} f.mu.response = &processor{} + f.mu.isInitialized = true started, cleanupFn := f.tryBeginTransfer() require.False(t, started) @@ -107,6 +108,7 @@ func TestForwarder_tryBeginTransfer(t *testing.T) { f := &forwarder{} f.mu.request = &processor{} f.mu.response = &processor{} + f.mu.isInitialized = true started, cleanupFn := f.tryBeginTransfer() require.True(t, started) diff --git a/pkg/ccl/sqlproxyccl/forwarder.go b/pkg/ccl/sqlproxyccl/forwarder.go index 641af896463f..88067604dd32 100644 --- a/pkg/ccl/sqlproxyccl/forwarder.go +++ b/pkg/ccl/sqlproxyccl/forwarder.go @@ -71,6 +71,17 @@ type forwarder struct { mu struct { syncutil.Mutex + // isInitialized indicates that the forwarder has been initialized. + // + // TODO(jaylim-crl): This prevents the connection from being transferred + // before we fully resume the processors (because the balancer now + // tracks assignments instead of forwarders). If we don't do this, there + // could be a situation where we resume the processors mid transfer. One + // alternative idea is to replace both isInitialized and isTransferring + // with a lock, which is held by the owner of the forwarder (e.g. main + // thread, or connection migrator thread). + isInitialized bool + // isTransferring indicates that a connection migration is in progress. isTransferring bool @@ -154,7 +165,7 @@ func newForwarder( // // run can only be called once throughout the lifetime of the forwarder. func (f *forwarder) run(clientConn net.Conn, serverConn net.Conn) error { - initialize := func() error { + setup := func() error { f.mu.Lock() defer f.mu.Unlock() @@ -165,8 +176,9 @@ func (f *forwarder) run(clientConn net.Conn, serverConn net.Conn) error { return f.ctx.Err() } - // Run can only be called once. - if f.isInitializedLocked() { + // Run can only be called once. If lastUpdated has already been set + // (i.e. non-zero), it has to be the case where run has been called. + if !f.mu.activity.lastUpdated.IsZero() { return errors.AssertionFailedf("forwarder has already been started") } @@ -185,10 +197,23 @@ func (f *forwarder) run(clientConn net.Conn, serverConn net.Conn) error { f.mu.activity.lastUpdated = f.timeSource.Now() return nil } - if err := initialize(); err != nil { - return err + markInitialized := func() { + f.mu.Lock() + defer f.mu.Unlock() + f.mu.isInitialized = true + } + + if err := setup(); err != nil { + return errors.Wrap(err, "setting up forwarder") } - return f.resumeProcessors() + + if err := f.resumeProcessors(); err != nil { + return errors.Wrap(err, "resuming processors") + } + + // Mark the forwarder as initialized, and connection is ready for a transfer. + markInitialized() + return nil } // Context returns the context associated with the forwarder. @@ -237,7 +262,7 @@ func (f *forwarder) IsIdle() (idle bool) { defer f.mu.Unlock() // If the forwarder hasn't been initialized, it is considered active. - if !f.isInitializedLocked() { + if !f.mu.isInitialized { return false } @@ -270,12 +295,6 @@ func (f *forwarder) IsIdle() (idle bool) { return now.Sub(f.mu.activity.lastUpdated) >= idleTimeout } -// isInitializedLocked returns true if the forwarder has been initialized -// through Run, or false otherwise. -func (f *forwarder) isInitializedLocked() bool { - return f.mu.request != nil && f.mu.response != nil -} - // resumeProcessors starts both the request and response processors // asynchronously. The forwarder will be closed if any of the processors // return an error while resuming. This is idempotent as resume() will return diff --git a/pkg/ccl/sqlproxyccl/forwarder_test.go b/pkg/ccl/sqlproxyccl/forwarder_test.go index 2f6d9ba46887..88059e9fd2bc 100644 --- a/pkg/ccl/sqlproxyccl/forwarder_test.go +++ b/pkg/ccl/sqlproxyccl/forwarder_test.go @@ -44,6 +44,12 @@ func TestForward(t *testing.T) { err := f.run(p1, p2) require.NoError(t, err) + func() { + f.mu.Lock() + defer f.mu.Unlock() + require.True(t, f.mu.isInitialized) + }() + // Close the connection right away to simulate processor error. p1.Close() @@ -77,6 +83,11 @@ func TestForward(t *testing.T) { require.NoError(t, err) require.Nil(t, f.ctx.Err()) require.False(t, f.IsIdle()) + func() { + f.mu.Lock() + defer f.mu.Unlock() + require.True(t, f.mu.isInitialized) + }() f.mu.Lock() requestProc := f.mu.request @@ -217,6 +228,11 @@ func TestForward(t *testing.T) { require.NoError(t, err) require.Nil(t, f.ctx.Err()) require.False(t, f.IsIdle()) + func() { + f.mu.Lock() + defer f.mu.Unlock() + require.True(t, f.mu.isInitialized) + }() f.mu.Lock() responseProc := f.mu.response diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index 15c499a1354a..53c865b674c7 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -837,11 +837,6 @@ func TestConnectionRebalancingDisabled(t *testing.T) { const podCount = 2 tenantID := serverutils.TestTenantID() tenants := startTestTenantPods(ctx, t, s, tenantID, podCount, base.TestingKnobs{}) - defer func() { - for _, tenant := range tenants { - tenant.Stopper().Stop(ctx) - } - }() // Register one SQL pod in the directory server. tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */) @@ -934,11 +929,6 @@ func TestCancelQuery(t *testing.T) { }, } tenants := startTestTenantPods(ctx, t, s, tenantID, podCount, tenantKnobs) - defer func() { - for _, tenant := range tenants { - tenant.Stopper().Stop(ctx) - } - }() // Use a custom time source for testing. t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) @@ -1272,11 +1262,6 @@ func TestPodWatcher(t *testing.T) { const podCount = 4 tenantID := serverutils.TestTenantID() tenants := startTestTenantPods(ctx, t, s, tenantID, podCount, base.TestingKnobs{}) - defer func() { - for _, tenant := range tenants { - tenant.Stopper().Stop(ctx) - } - }() // Register only 3 SQL pods in the directory server. We will add the 4th // once the watcher has been established. @@ -1739,11 +1724,6 @@ func TestCurConnCountMetric(t *testing.T) { // Start a single SQL pod. tenantID := serverutils.TestTenantID() tenants := startTestTenantPods(ctx, t, s, tenantID, 1, base.TestingKnobs{}) - defer func() { - for _, tenant := range tenants { - tenant.Stopper().Stop(ctx) - } - }() // Register the SQL pod in the directory server. tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */) @@ -2295,7 +2275,8 @@ func queryAddr(ctx context.Context, t *testing.T, db queryer) string { // startTestTenantPods starts count SQL pods for the given tenant, and returns // a list of tenant servers. Note that a default admin testuser with the -// password hunter2 will be created. +// password hunter2 will be created. The test tenants will automatically be +// stopped once the server's stopper (from ts) is stopped. func startTestTenantPods( ctx context.Context, t *testing.T, diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_static_directory_svr.go b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_static_directory_svr.go index a2df19ca21dc..0d7396de2f8a 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_static_directory_svr.go +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_static_directory_svr.go @@ -344,7 +344,7 @@ func (d *TestStaticDirectoryServer) RemovePod(tenantID roachpb.TenantID, podAddr } // Start starts the test directory server using an in-memory listener. This -// returns an error if the server cannot be started. If the sevrer has already +// returns an error if the server cannot be started. If the server has already // been started, this is a no-op. func (d *TestStaticDirectoryServer) Start(ctx context.Context) error { d.process.Lock()