From 9f472c452aa13d986d3659baf27c9771a61f7ffb Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Tue, 25 Aug 2020 19:46:46 -0700 Subject: [PATCH 1/3] sync2: Add AcquireContext to sema And fix flaky test Signed-off-by: Sugu Sougoumarane --- go/sync2/semaphore.go | 14 +++++ ...aphore_flaky_test.go => semaphore_test.go} | 56 +++++++++++-------- 2 files changed, 48 insertions(+), 22 deletions(-) rename go/sync2/{semaphore_flaky_test.go => semaphore_test.go} (53%) diff --git a/go/sync2/semaphore.go b/go/sync2/semaphore.go index 536399824cc..f3f107e53b6 100644 --- a/go/sync2/semaphore.go +++ b/go/sync2/semaphore.go @@ -21,6 +21,7 @@ package sync2 // cases, you just want a familiar API. import ( + "context" "time" ) @@ -61,6 +62,19 @@ func (sem *Semaphore) Acquire() bool { } } +// AcquireContext returns true on successful acquisition, and +// false on context expiry. Timeout is ignored. +func (sem *Semaphore) AcquireContext(ctx context.Context) bool { + tm := time.NewTimer(sem.timeout) + defer tm.Stop() + select { + case <-sem.slots: + return true + case <-ctx.Done(): + return false + } +} + // TryAcquire acquires a semaphore if it's immediately available. // It returns false otherwise. func (sem *Semaphore) TryAcquire() bool { diff --git a/go/sync2/semaphore_flaky_test.go b/go/sync2/semaphore_test.go similarity index 53% rename from go/sync2/semaphore_flaky_test.go rename to go/sync2/semaphore_test.go index 60b287df216..aa4f6f5cee1 100644 --- a/go/sync2/semaphore_flaky_test.go +++ b/go/sync2/semaphore_test.go @@ -17,8 +17,11 @@ limitations under the License. package sync2 import ( + "context" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestSemaNoTimeout(t *testing.T) { @@ -26,42 +29,51 @@ func TestSemaNoTimeout(t *testing.T) { s.Acquire() released := false go func() { - time.Sleep(10 * time.Millisecond) released = true s.Release() }() s.Acquire() - if !released { - t.Errorf("release: false, want true") - } + assert.True(t, released) } func TestSemaTimeout(t *testing.T) { - s := NewSemaphore(1, 5*time.Millisecond) + s := NewSemaphore(1, 1*time.Millisecond) + s.Acquire() + release := make(chan struct{}) + released := make(chan struct{}) + go func() { + <-release + s.Release() + released <- struct{}{} + }() + assert.False(t, s.Acquire()) + release <- struct{}{} + <-released + assert.True(t, s.Acquire()) +} + +func TestSemaAcquireContext(t *testing.T) { + s := NewSemaphore(1, 0) s.Acquire() + release := make(chan struct{}) + released := make(chan struct{}) go func() { - time.Sleep(10 * time.Millisecond) + <-release s.Release() + released <- struct{}{} }() - if s.Acquire() { - t.Errorf("Acquire: true, want false") - } - time.Sleep(10 * time.Millisecond) - if !s.Acquire() { - t.Errorf("Acquire: false, want true") - } + ctx, cancel := context.WithCancel(context.Background()) + cancel() + assert.False(t, s.AcquireContext(ctx)) + release <- struct{}{} + <-released + assert.True(t, s.AcquireContext(context.Background())) } func TestSemaTryAcquire(t *testing.T) { s := NewSemaphore(1, 0) - if !s.TryAcquire() { - t.Errorf("TryAcquire: false, want true") - } - if s.TryAcquire() { - t.Errorf("TryAcquire: true, want false") - } + assert.True(t, s.TryAcquire()) + assert.False(t, s.TryAcquire()) s.Release() - if !s.TryAcquire() { - t.Errorf("TryAcquire: false, want true") - } + assert.True(t, s.TryAcquire()) } From d9ed3ad3e492bd433937ab5f085d212651d97339 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Tue, 25 Aug 2020 19:58:01 -0700 Subject: [PATCH 2/3] tm: deadlock: PromoteReplica vs replmanager Change actionMutex to a semaphore to implement a tryLock function in tm, and use it in replManager. Signed-off-by: Sugu Sougoumarane --- go/vt/vttablet/tabletmanager/replmanager.go | 4 ++-- go/vt/vttablet/tabletmanager/rpc_server.go | 23 ++++++++------------- go/vt/vttablet/tabletmanager/tm_init.go | 20 +++++++----------- go/vt/vttablet/tabletmanager/tm_state.go | 2 +- 4 files changed, 20 insertions(+), 29 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/replmanager.go b/go/vt/vttablet/tabletmanager/replmanager.go index d457f1485f2..b562ec8b47f 100644 --- a/go/vt/vttablet/tabletmanager/replmanager.go +++ b/go/vt/vttablet/tabletmanager/replmanager.go @@ -91,8 +91,8 @@ func (rm *replManager) SetTabletType(tabletType topodatapb.TabletType) { func (rm *replManager) check() { // We need to obtain the action lock if we're going to fix - // replication - if err := rm.tm.lock(rm.ctx); err != nil { + // replication, but only if the lock is available to take. + if !rm.tm.tryLock() { return } defer rm.tm.unlock() diff --git a/go/vt/vttablet/tabletmanager/rpc_server.go b/go/vt/vttablet/tabletmanager/rpc_server.go index d451fc778c9..094eb3196e5 100644 --- a/go/vt/vttablet/tabletmanager/rpc_server.go +++ b/go/vt/vttablet/tabletmanager/rpc_server.go @@ -35,27 +35,22 @@ import ( // // lock is used at the beginning of an RPC call, to lock the -// action mutex. It returns ctx.Err() if <-ctx.Done() after the lock. +// action mutex. It returns ctx.Err() if the context expires. func (tm *TabletManager) lock(ctx context.Context) error { - tm.actionMutex.Lock() - tm.actionMutexLocked = true - - // After we take the lock (which could take a long time), we - // check the client is still here. - select { - case <-ctx.Done(): - tm.actionMutexLocked = false - tm.actionMutex.Unlock() - return ctx.Err() - default: + if tm.actionSema.AcquireContext(ctx) { return nil } + return ctx.Err() +} + +// tryLock will return immediately, true on success and false on failure. +func (tm *TabletManager) tryLock() bool { + return tm.actionSema.TryAcquire() } // unlock is the symmetrical action to lock. func (tm *TabletManager) unlock() { - tm.actionMutexLocked = false - tm.actionMutex.Unlock() + tm.actionSema.Release() } // HandleRPCPanic is part of the RPCTM interface. diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index c06161f97b2..f093d288c8f 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -25,10 +25,10 @@ topology server. Only 'vtctl DeleteTablet' should be run by other processes, everything else should ask the tablet server to make the change. -Most RPC calls lock the actionMutex, except the easy read-only ones. +Most RPC calls obtain the actionSema, except the easy read-only ones. RPC calls that change the tablet record will also call updateState. -See rpc_server.go for all cases, and which actions take the actionMutex, +See rpc_server.go for all cases, and which actions take the actionSema, and which run changeCallback. */ package tabletmanager @@ -42,6 +42,7 @@ import ( "time" "vitess.io/vitess/go/flagutil" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/vterrors" "golang.org/x/net/context" @@ -140,17 +141,11 @@ type TabletManager struct { // when we transition back from something like MASTER. baseTabletType topodatapb.TabletType - // actionMutex is there to run only one action at a time. - // This mutex can be held for long periods of time (hours), - // like in the case of a restore. This mutex must be obtained + // actionSema is there to run only one action at a time. + // This semaphore can be held for long periods of time (hours), + // like in the case of a restore. This semaphore must be obtained // first before other mutexes. - actionMutex sync.Mutex - - // actionMutexLocked is set to true after we acquire actionMutex, - // and reset to false when we release it. - // It is meant as a sanity check to make sure the methods that need - // to have the actionMutex have it. - actionMutexLocked bool + actionSema *sync2.Semaphore // orc is an optional client for Orchestrator HTTP API calls. // If this is nil, those calls will be skipped. @@ -238,6 +233,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, healthCheckInterval ti tm.replManager = newReplManager(tm.BatchCtx, tm, healthCheckInterval) tm.tabletAlias = tablet.Alias tm.tmState = newTMState(tm, tablet) + tm.actionSema = sync2.NewSemaphore(1, 0) demoteType, err := topoproto.ParseTabletType(*demoteMasterType) if err != nil { diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index a18fc118853..86ca17fced3 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -49,7 +49,7 @@ type tmState struct { // while changing the state of the system to match these values. // This can be held for many seconds while tmState connects to // external components to change their state. - // Obtaining tm.actionMutex before calling a tmState function is + // Obtaining tm.actionSema before calling a tmState function is // not required. // Because mu can be held for long, we publish the current state // of these variables into displayState, which can be accessed From 1e0b4b325d3f051459d755530014dc83b9366eb9 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Wed, 26 Aug 2020 09:04:40 -0700 Subject: [PATCH 3/3] tm: address review comments Signed-off-by: Sugu Sougoumarane --- go/sync2/semaphore.go | 2 -- go/vt/vttablet/tabletmanager/rpc_server.go | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/go/sync2/semaphore.go b/go/sync2/semaphore.go index f3f107e53b6..c0d1fd2ce48 100644 --- a/go/sync2/semaphore.go +++ b/go/sync2/semaphore.go @@ -65,8 +65,6 @@ func (sem *Semaphore) Acquire() bool { // AcquireContext returns true on successful acquisition, and // false on context expiry. Timeout is ignored. func (sem *Semaphore) AcquireContext(ctx context.Context) bool { - tm := time.NewTimer(sem.timeout) - defer tm.Stop() select { case <-sem.slots: return true diff --git a/go/vt/vttablet/tabletmanager/rpc_server.go b/go/vt/vttablet/tabletmanager/rpc_server.go index 094eb3196e5..9420ca9824c 100644 --- a/go/vt/vttablet/tabletmanager/rpc_server.go +++ b/go/vt/vttablet/tabletmanager/rpc_server.go @@ -34,8 +34,8 @@ import ( // Utility functions for RPC service // -// lock is used at the beginning of an RPC call, to lock the -// action mutex. It returns ctx.Err() if the context expires. +// lock is used at the beginning of an RPC call, to acquire the +// action semaphore. It returns ctx.Err() if the context expires. func (tm *TabletManager) lock(ctx context.Context) error { if tm.actionSema.AcquireContext(ctx) { return nil