Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'upstream/release-7.0' into slack-vitess…
Browse files Browse the repository at this point in the history
…-2020.08.19.r0
  • Loading branch information
ameetkotian committed Aug 27, 2020
2 parents 2b339a8 + 9f6b69c commit a8260ae
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 52 deletions.
12 changes: 12 additions & 0 deletions go/sync2/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package sync2
// cases, you just want a familiar API.

import (
"context"
"time"
)

Expand Down Expand Up @@ -61,6 +62,17 @@ func (sem *Semaphore) Acquire() bool {
}
}

// AcquireContext returns true on successful acquisition, and
// false on context expiry. Timeout is ignored.
func (sem *Semaphore) AcquireContext(ctx context.Context) bool {
select {
case <-sem.slots:
return true
case <-ctx.Done():
return false
}
}

// TryAcquire acquires a semaphore if it's immediately available.
// It returns false otherwise.
func (sem *Semaphore) TryAcquire() bool {
Expand Down
56 changes: 34 additions & 22 deletions go/sync2/semaphore_flaky_test.go → go/sync2/semaphore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,63 @@ limitations under the License.
package sync2

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSemaNoTimeout(t *testing.T) {
s := NewSemaphore(1, 0)
s.Acquire()
released := false
go func() {
time.Sleep(10 * time.Millisecond)
released = true
s.Release()
}()
s.Acquire()
if !released {
t.Errorf("release: false, want true")
}
assert.True(t, released)
}

func TestSemaTimeout(t *testing.T) {
s := NewSemaphore(1, 5*time.Millisecond)
s := NewSemaphore(1, 1*time.Millisecond)
s.Acquire()
release := make(chan struct{})
released := make(chan struct{})
go func() {
<-release
s.Release()
released <- struct{}{}
}()
assert.False(t, s.Acquire())
release <- struct{}{}
<-released
assert.True(t, s.Acquire())
}

func TestSemaAcquireContext(t *testing.T) {
s := NewSemaphore(1, 0)
s.Acquire()
release := make(chan struct{})
released := make(chan struct{})
go func() {
time.Sleep(10 * time.Millisecond)
<-release
s.Release()
released <- struct{}{}
}()
if s.Acquire() {
t.Errorf("Acquire: true, want false")
}
time.Sleep(10 * time.Millisecond)
if !s.Acquire() {
t.Errorf("Acquire: false, want true")
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
assert.False(t, s.AcquireContext(ctx))
release <- struct{}{}
<-released
assert.True(t, s.AcquireContext(context.Background()))
}

func TestSemaTryAcquire(t *testing.T) {
s := NewSemaphore(1, 0)
if !s.TryAcquire() {
t.Errorf("TryAcquire: false, want true")
}
if s.TryAcquire() {
t.Errorf("TryAcquire: true, want false")
}
assert.True(t, s.TryAcquire())
assert.False(t, s.TryAcquire())
s.Release()
if !s.TryAcquire() {
t.Errorf("TryAcquire: false, want true")
}
assert.True(t, s.TryAcquire())
}
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/replmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (rm *replManager) SetTabletType(tabletType topodatapb.TabletType) {

func (rm *replManager) check() {
// We need to obtain the action lock if we're going to fix
// replication
if err := rm.tm.lock(rm.ctx); err != nil {
// replication, but only if the lock is available to take.
if !rm.tm.tryLock() {
return
}
defer rm.tm.unlock()
Expand Down
25 changes: 10 additions & 15 deletions go/vt/vttablet/tabletmanager/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,23 @@ import (
// Utility functions for RPC service
//

// lock is used at the beginning of an RPC call, to lock the
// action mutex. It returns ctx.Err() if <-ctx.Done() after the lock.
// lock is used at the beginning of an RPC call, to acquire the
// action semaphore. It returns ctx.Err() if the context expires.
func (tm *TabletManager) lock(ctx context.Context) error {
tm.actionMutex.Lock()
tm.actionMutexLocked = true

// After we take the lock (which could take a long time), we
// check the client is still here.
select {
case <-ctx.Done():
tm.actionMutexLocked = false
tm.actionMutex.Unlock()
return ctx.Err()
default:
if tm.actionSema.AcquireContext(ctx) {
return nil
}
return ctx.Err()
}

// tryLock will return immediately, true on success and false on failure.
func (tm *TabletManager) tryLock() bool {
return tm.actionSema.TryAcquire()
}

// unlock is the symmetrical action to lock.
func (tm *TabletManager) unlock() {
tm.actionMutexLocked = false
tm.actionMutex.Unlock()
tm.actionSema.Release()
}

// HandleRPCPanic is part of the RPCTM interface.
Expand Down
20 changes: 8 additions & 12 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ topology server. Only 'vtctl DeleteTablet'
should be run by other processes, everything else should ask
the tablet server to make the change.
Most RPC calls lock the actionMutex, except the easy read-only ones.
Most RPC calls obtain the actionSema, except the easy read-only ones.
RPC calls that change the tablet record will also call updateState.
See rpc_server.go for all cases, and which actions take the actionMutex,
See rpc_server.go for all cases, and which actions take the actionSema,
and which run changeCallback.
*/
package tabletmanager
Expand All @@ -42,6 +42,7 @@ import (
"time"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/vterrors"

"golang.org/x/net/context"
Expand Down Expand Up @@ -140,17 +141,11 @@ type TabletManager struct {
// when we transition back from something like MASTER.
baseTabletType topodatapb.TabletType

// actionMutex is there to run only one action at a time.
// This mutex can be held for long periods of time (hours),
// like in the case of a restore. This mutex must be obtained
// actionSema is there to run only one action at a time.
// This semaphore can be held for long periods of time (hours),
// like in the case of a restore. This semaphore must be obtained
// first before other mutexes.
actionMutex sync.Mutex

// actionMutexLocked is set to true after we acquire actionMutex,
// and reset to false when we release it.
// It is meant as a sanity check to make sure the methods that need
// to have the actionMutex have it.
actionMutexLocked bool
actionSema *sync2.Semaphore

// orc is an optional client for Orchestrator HTTP API calls.
// If this is nil, those calls will be skipped.
Expand Down Expand Up @@ -238,6 +233,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, healthCheckInterval ti
tm.replManager = newReplManager(tm.BatchCtx, tm, healthCheckInterval)
tm.tabletAlias = tablet.Alias
tm.tmState = newTMState(tm, tablet)
tm.actionSema = sync2.NewSemaphore(1, 0)

demoteType, err := topoproto.ParseTabletType(*demoteMasterType)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/tm_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type tmState struct {
// while changing the state of the system to match these values.
// This can be held for many seconds while tmState connects to
// external components to change their state.
// Obtaining tm.actionMutex before calling a tmState function is
// Obtaining tm.actionSema before calling a tmState function is
// not required.
// Because mu can be held for long, we publish the current state
// of these variables into displayState, which can be accessed
Expand Down

0 comments on commit a8260ae

Please sign in to comment.