Skip to content

Commit

Permalink
tell apart etcd lock session done from normal occasions (#440)
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored Jun 24, 2021
1 parent d33f2fc commit 59f65e0
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions lock/etcdlock/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (

// Mutex is etcdv3 lock
type Mutex struct {
timeout time.Duration
mutex *concurrency.Mutex
session *concurrency.Session
timeout time.Duration
mutex *concurrency.Mutex
session *concurrency.Session
locked bool
lockedMux sync.Mutex
}

type lockContext struct {
Expand Down Expand Up @@ -73,6 +75,10 @@ func (m *Mutex) Lock(ctx context.Context) (context.Context, error) {
ctx, cancel = context.WithCancel(ctx)
rCtx := &lockContext{Context: ctx}

m.lockedMux.Lock()
m.locked = true
m.lockedMux.Unlock()

go func() {
defer cancel()

Expand All @@ -99,13 +105,20 @@ func (m *Mutex) TryLock(ctx context.Context) (context.Context, error) {
ctx, cancel = context.WithCancel(ctx)
rCtx := &lockContext{Context: ctx}

go func() {
defer cancel()
m.lockedMux.Lock()
m.locked = true
m.lockedMux.Unlock()

go func() {
select {
case <-m.session.Done():
rCtx.setError(types.ErrLockSessionDone)
// session.Done() has multi semantics
if m.locked {
rCtx.setError(types.ErrLockSessionDone)
cancel()
}
case <-ctx.Done():
cancel()
}
}()

Expand All @@ -123,6 +136,10 @@ func (m *Mutex) Unlock(ctx context.Context) error {
}

func (m *Mutex) unlock(ctx context.Context) error {
m.lockedMux.Lock()
m.locked = false
m.lockedMux.Unlock()

_, err := m.session.Client().Txn(ctx).If(m.mutex.IsOwner()).
Then(clientv3.OpDelete(m.mutex.Key())).Commit()
// no way to clear it...
Expand Down

0 comments on commit 59f65e0

Please sign in to comment.