Skip to content

Commit

Permalink
ccl/sqlproxyccl: ensure that connections cannot be transferred before…
Browse files Browse the repository at this point in the history
… init

Related to #80446.

In #80446, we updated the connection tracker to track server assignments
instead of forwarders. This also meant that there is a possibility where we
can start transferring the connection before we even resumed the forwarder
for the first time, breaking the TransferConnection invariant where the
processors must be resumed before being called.

This commit fixes that issue by introducing a new isInitialized flag to the
forwarder, which will only get set to true once run returns. Attempting to
transfer a connection with isInitialized=false will return an error. This
should fix flakes that we've been seeing on CI.

Release note: None

Release justification: sqlproxy bug fix. This ensures that we don't resume
the processors mid connection transfer, causing unexpected issues on the
client's end. Note that this situation is rare since it involves ensuring
timely behavior of forwarder.Run and forwarder.TransferConnection at the same
time.
  • Loading branch information
jaylim-crl committed Aug 11, 2022
1 parent 0ad97e4 commit ae851fa
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 41 deletions.
12 changes: 6 additions & 6 deletions pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (f *forwarder) tryBeginTransfer() (started bool, cleanupFn func()) {
defer f.mu.Unlock()

// Forwarder hasn't been initialized.
if !f.isInitializedLocked() {
if !f.mu.isInitialized {
return false, nil
}

Expand Down Expand Up @@ -120,9 +120,9 @@ var errTransferCannotStart = errors.New("transfer cannot be started")
// where the forwarder is not in a state that is eligible for a connection
// migration.
//
// NOTE: If the forwarder hasn't been closed, runTransfer has an invariant
// NOTE: If the forwarder hasn't been closed, TransferConnection has an invariant
// where the processors have been resumed prior to calling this method. When
// runTransfer returns, it is guaranteed that processors will either be
// TransferConnection returns, it is guaranteed that processors will either be
// re-resumed, or the forwarder will be closed (in the case of a non-recoverable
// error).
//
Expand All @@ -145,7 +145,7 @@ func (f *forwarder) TransferConnection() (retErr error) {
// Create a transfer context, and timeout handler which gets triggered
// whenever the context expires. We have to close the forwarder because
// the transfer may be blocked on I/O, and the only way for now is to close
// the connections. This then allow runTransfer to return and cleanup.
// the connections. This then allow TransferConnection to return and cleanup.
ctx, cancel := newTransferContext(f.ctx)
defer cancel()

Expand Down Expand Up @@ -177,8 +177,8 @@ func (f *forwarder) TransferConnection() (retErr error) {
latencyDur := timeutil.Since(tBegin)
f.metrics.ConnMigrationAttemptedLatency.RecordValue(latencyDur.Nanoseconds())

// When runTransfer returns, it's either the forwarder has been closed,
// or the procesors have been resumed.
// When TransferConnection returns, it's either the forwarder has been
// closed, or the procesors have been resumed.
if !ctx.isRecoverable() {
log.Infof(logCtx, "transfer failed: connection closed, latency=%v, err=%v", latencyDur, retErr)
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/sqlproxyccl/conn_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestForwarder_tryBeginTransfer(t *testing.T) {
f := &forwarder{}
f.mu.request = &processor{}
f.mu.response = &processor{}
f.mu.isInitialized = true

started, cleanupFn := f.tryBeginTransfer()
require.False(t, started)
Expand All @@ -107,6 +108,7 @@ func TestForwarder_tryBeginTransfer(t *testing.T) {
f := &forwarder{}
f.mu.request = &processor{}
f.mu.response = &processor{}
f.mu.isInitialized = true

started, cleanupFn := f.tryBeginTransfer()
require.True(t, started)
Expand Down
45 changes: 32 additions & 13 deletions pkg/ccl/sqlproxyccl/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ type forwarder struct {
mu struct {
syncutil.Mutex

// isInitialized indicates that the forwarder has been initialized.
//
// TODO(jaylim-crl): This prevents the connection from being transferred
// before we fully resume the processors (because the balancer now
// tracks assignments instead of forwarders). If we don't do this, there
// could be a situation where we resume the processors mid transfer. One
// alternative idea is to replace both isInitialized and isTransferring
// with a lock, which is held by the owner of the forwarder (e.g. main
// thread, or connection migrator thread).
isInitialized bool

// isTransferring indicates that a connection migration is in progress.
isTransferring bool

Expand Down Expand Up @@ -154,7 +165,7 @@ func newForwarder(
//
// run can only be called once throughout the lifetime of the forwarder.
func (f *forwarder) run(clientConn net.Conn, serverConn net.Conn) error {
initialize := func() error {
setup := func() error {
f.mu.Lock()
defer f.mu.Unlock()

Expand All @@ -165,8 +176,9 @@ func (f *forwarder) run(clientConn net.Conn, serverConn net.Conn) error {
return f.ctx.Err()
}

// Run can only be called once.
if f.isInitializedLocked() {
// Run can only be called once. If lastUpdated has already been set
// (i.e. non-zero), it has to be the case where run has been called.
if !f.mu.activity.lastUpdated.IsZero() {
return errors.AssertionFailedf("forwarder has already been started")
}

Expand All @@ -185,10 +197,23 @@ func (f *forwarder) run(clientConn net.Conn, serverConn net.Conn) error {
f.mu.activity.lastUpdated = f.timeSource.Now()
return nil
}
if err := initialize(); err != nil {
return err
markInitialized := func() {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.isInitialized = true
}

if err := setup(); err != nil {
return errors.Wrap(err, "setting up forwarder")
}
return f.resumeProcessors()

if err := f.resumeProcessors(); err != nil {
return errors.Wrap(err, "resuming processors")
}

// Mark the forwarder as initialized, and connection is ready for a transfer.
markInitialized()
return nil
}

// Context returns the context associated with the forwarder.
Expand Down Expand Up @@ -237,7 +262,7 @@ func (f *forwarder) IsIdle() (idle bool) {
defer f.mu.Unlock()

// If the forwarder hasn't been initialized, it is considered active.
if !f.isInitializedLocked() {
if !f.mu.isInitialized {
return false
}

Expand Down Expand Up @@ -270,12 +295,6 @@ func (f *forwarder) IsIdle() (idle bool) {
return now.Sub(f.mu.activity.lastUpdated) >= idleTimeout
}

// isInitializedLocked returns true if the forwarder has been initialized
// through Run, or false otherwise.
func (f *forwarder) isInitializedLocked() bool {
return f.mu.request != nil && f.mu.response != nil
}

// resumeProcessors starts both the request and response processors
// asynchronously. The forwarder will be closed if any of the processors
// return an error while resuming. This is idempotent as resume() will return
Expand Down
16 changes: 16 additions & 0 deletions pkg/ccl/sqlproxyccl/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ func TestForward(t *testing.T) {
err := f.run(p1, p2)
require.NoError(t, err)

func() {
f.mu.Lock()
defer f.mu.Unlock()
require.True(t, f.mu.isInitialized)
}()

// Close the connection right away to simulate processor error.
p1.Close()

Expand Down Expand Up @@ -77,6 +83,11 @@ func TestForward(t *testing.T) {
require.NoError(t, err)
require.Nil(t, f.ctx.Err())
require.False(t, f.IsIdle())
func() {
f.mu.Lock()
defer f.mu.Unlock()
require.True(t, f.mu.isInitialized)
}()

f.mu.Lock()
requestProc := f.mu.request
Expand Down Expand Up @@ -217,6 +228,11 @@ func TestForward(t *testing.T) {
require.NoError(t, err)
require.Nil(t, f.ctx.Err())
require.False(t, f.IsIdle())
func() {
f.mu.Lock()
defer f.mu.Unlock()
require.True(t, f.mu.isInitialized)
}()

f.mu.Lock()
responseProc := f.mu.response
Expand Down
23 changes: 2 additions & 21 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,11 +837,6 @@ func TestConnectionRebalancingDisabled(t *testing.T) {
const podCount = 2
tenantID := serverutils.TestTenantID()
tenants := startTestTenantPods(ctx, t, s, tenantID, podCount, base.TestingKnobs{})
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Register one SQL pod in the directory server.
tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */)
Expand Down Expand Up @@ -934,11 +929,6 @@ func TestCancelQuery(t *testing.T) {
},
}
tenants := startTestTenantPods(ctx, t, s, tenantID, podCount, tenantKnobs)
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Use a custom time source for testing.
t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -1272,11 +1262,6 @@ func TestPodWatcher(t *testing.T) {
const podCount = 4
tenantID := serverutils.TestTenantID()
tenants := startTestTenantPods(ctx, t, s, tenantID, podCount, base.TestingKnobs{})
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Register only 3 SQL pods in the directory server. We will add the 4th
// once the watcher has been established.
Expand Down Expand Up @@ -1739,11 +1724,6 @@ func TestCurConnCountMetric(t *testing.T) {
// Start a single SQL pod.
tenantID := serverutils.TestTenantID()
tenants := startTestTenantPods(ctx, t, s, tenantID, 1, base.TestingKnobs{})
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Register the SQL pod in the directory server.
tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */)
Expand Down Expand Up @@ -2295,7 +2275,8 @@ func queryAddr(ctx context.Context, t *testing.T, db queryer) string {

// startTestTenantPods starts count SQL pods for the given tenant, and returns
// a list of tenant servers. Note that a default admin testuser with the
// password hunter2 will be created.
// password hunter2 will be created. The test tenants will automatically be
// stopped once the server's stopper (from ts) is stopped.
func startTestTenantPods(
ctx context.Context,
t *testing.T,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (d *TestStaticDirectoryServer) RemovePod(tenantID roachpb.TenantID, podAddr
}

// Start starts the test directory server using an in-memory listener. This
// returns an error if the server cannot be started. If the sevrer has already
// returns an error if the server cannot be started. If the server has already
// been started, this is a no-op.
func (d *TestStaticDirectoryServer) Start(ctx context.Context) error {
d.process.Lock()
Expand Down

0 comments on commit ae851fa

Please sign in to comment.