From f5c8abf73b61dc38c77871e1fead82b9949dd79b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 14 Aug 2016 09:59:42 -0700 Subject: [PATCH] session: remove session manager and add ttl --- clientv3/concurrency/election.go | 35 +++++++++-------- clientv3/concurrency/mutex.go | 31 +++++++-------- clientv3/concurrency/session.go | 54 +++++++++++++++------------ contrib/recipes/double_barrier.go | 33 ++++++++-------- contrib/recipes/key.go | 16 +++----- contrib/recipes/rwmutex.go | 27 +++++++++----- integration/v3_double_barrier_test.go | 47 +++++++++++++---------- integration/v3_election_test.go | 51 ++++++++++++++++++------- integration/v3_lock_test.go | 23 +++++++++--- 9 files changed, 189 insertions(+), 128 deletions(-) diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index 89b52089a6b6..f14cd55e369d 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -29,7 +29,7 @@ var ( ) type Election struct { - client *v3.Client + session *Session keyPrefix string @@ -39,20 +39,18 @@ type Election struct { } // NewElection returns a new election on a given key prefix. -func NewElection(client *v3.Client, pfx string) *Election { - return &Election{client: client, keyPrefix: pfx} +func NewElection(s *Session, pfx string) *Election { + return &Election{session: s, keyPrefix: pfx} } // Campaign puts a value as eligible for the election. It blocks until // it is elected, an error occurs, or the context is cancelled. func (e *Election) Campaign(ctx context.Context, val string) error { - s, serr := NewSession(e.client) - if serr != nil { - return serr - } + s := e.session + client := e.session.Client() k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease()) - txn := e.client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0)) + txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0)) txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease()))) txn = txn.Else(v3.OpGet(k)) resp, err := txn.Commit() @@ -72,12 +70,12 @@ func (e *Election) Campaign(ctx context.Context, val string) error { } } - err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1)) + err = waitDeletes(ctx, client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1)) if err != nil { // clean up in case of context cancel select { case <-ctx.Done(): - e.Resign(e.client.Ctx()) + e.Resign(client.Ctx()) default: e.leaderSession = nil } @@ -92,8 +90,9 @@ func (e *Election) Proclaim(ctx context.Context, val string) error { if e.leaderSession == nil { return ErrElectionNotLeader } + client := e.session.Client() cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev) - txn := e.client.Txn(ctx).If(cmp) + txn := client.Txn(ctx).If(cmp) txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease()))) tresp, terr := txn.Commit() if terr != nil { @@ -111,7 +110,8 @@ func (e *Election) Resign(ctx context.Context) (err error) { if e.leaderSession == nil { return nil } - _, err = e.client.Delete(ctx, e.leaderKey) + client := e.session.Client() + _, err = client.Delete(ctx, e.leaderKey) e.leaderKey = "" e.leaderSession = nil return err @@ -119,7 +119,8 @@ func (e *Election) Resign(ctx context.Context) (err error) { // Leader returns the leader value for the current election. func (e *Election) Leader(ctx context.Context) (string, error) { - resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) + client := e.session.Client() + resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) == 0 { @@ -139,9 +140,11 @@ func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse { } func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { + client := e.session.Client() + defer close(ch) for { - resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) + resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return } @@ -152,7 +155,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { if len(resp.Kvs) == 0 { // wait for first key put on prefix opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()} - wch := e.client.Watch(cctx, e.keyPrefix, opts...) + wch := client.Watch(cctx, e.keyPrefix, opts...) for kv == nil { wr, ok := <-wch @@ -172,7 +175,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { kv = resp.Kvs[0] } - wch := e.client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision)) + wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision)) keyDeleted := false for !keyDeleted { wr, ok := <-wch diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index 803a8470ab64..298d7636b8b5 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -24,24 +24,22 @@ import ( // Mutex implements the sync Locker interface with etcd type Mutex struct { - client *v3.Client + s *Session pfx string myKey string myRev int64 } -func NewMutex(client *v3.Client, pfx string) *Mutex { - return &Mutex{client, pfx, "", -1} +func NewMutex(s *Session, pfx string) *Mutex { + return &Mutex{s, pfx, "", -1} } // Lock locks the mutex with a cancellable context. If the context is cancelled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. func (m *Mutex) Lock(ctx context.Context) error { - s, serr := NewSession(m.client) - if serr != nil { - return serr - } + s := m.s + client := m.s.Client() m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease()) cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) @@ -49,7 +47,7 @@ func (m *Mutex) Lock(ctx context.Context) error { put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) // reuse key in case this session already holds the lock get := v3.OpGet(m.myKey) - resp, err := m.client.Txn(ctx).If(cmp).Then(put).Else(get).Commit() + resp, err := client.Txn(ctx).If(cmp).Then(put).Else(get).Commit() if err != nil { return err } @@ -59,18 +57,19 @@ func (m *Mutex) Lock(ctx context.Context) error { } // wait for deletion revisions prior to myKey - err = waitDeletes(ctx, m.client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1)) + err = waitDeletes(ctx, client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1)) // release lock key if cancelled select { case <-ctx.Done(): - m.Unlock(m.client.Ctx()) + m.Unlock(client.Ctx()) default: } return err } func (m *Mutex) Unlock(ctx context.Context) error { - if _, err := m.client.Delete(ctx, m.myKey); err != nil { + client := m.s.Client() + if _, err := client.Delete(ctx, m.myKey); err != nil { return err } m.myKey = "\x00" @@ -87,17 +86,19 @@ func (m *Mutex) Key() string { return m.myKey } type lockerMutex struct{ *Mutex } func (lm *lockerMutex) Lock() { - if err := lm.Mutex.Lock(lm.client.Ctx()); err != nil { + client := lm.s.Client() + if err := lm.Mutex.Lock(client.Ctx()); err != nil { panic(err) } } func (lm *lockerMutex) Unlock() { - if err := lm.Mutex.Unlock(lm.client.Ctx()); err != nil { + client := lm.s.Client() + if err := lm.Mutex.Unlock(client.Ctx()); err != nil { panic(err) } } // NewLocker creates a sync.Locker backed by an etcd mutex. -func NewLocker(client *v3.Client, pfx string) sync.Locker { - return &lockerMutex{NewMutex(client, pfx)} +func NewLocker(s *Session, pfx string) sync.Locker { + return &lockerMutex{NewMutex(s, pfx)} } diff --git a/clientv3/concurrency/session.go b/clientv3/concurrency/session.go index e6b170412323..f4a2a80e5c9b 100644 --- a/clientv3/concurrency/session.go +++ b/clientv3/concurrency/session.go @@ -15,21 +15,11 @@ package concurrency import ( - "sync" - v3 "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" ) -// only keep one ephemeral lease per client -var clientSessions clientSessionMgr = clientSessionMgr{sessions: make(map[*v3.Client]*Session)} - -const sessionTTL = 60 - -type clientSessionMgr struct { - sessions map[*v3.Client]*Session - mu sync.Mutex -} +const defaultSessionTTL = 60 // Session represents a lease kept alive for the lifetime of a client. // Fault-tolerant applications may use sessions to reason about liveness. @@ -42,14 +32,13 @@ type Session struct { } // NewSession gets the leased session for a client. -func NewSession(client *v3.Client) (*Session, error) { - clientSessions.mu.Lock() - defer clientSessions.mu.Unlock() - if s, ok := clientSessions.sessions[client]; ok { - return s, nil +func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { + ops := &sessionOptions{} + for _, opt := range opts { + opt(ops) } - resp, err := client.Grant(client.Ctx(), sessionTTL) + resp, err := client.Grant(client.Ctx(), defaultSessionTTL) if err != nil { return nil, err } @@ -63,16 +52,10 @@ func NewSession(client *v3.Client) (*Session, error) { donec := make(chan struct{}) s := &Session{client: client, id: id, cancel: cancel, donec: donec} - clientSessions.sessions[client] = s // keep the lease alive until client error or cancelled context go func() { - defer func() { - clientSessions.mu.Lock() - delete(clientSessions.sessions, client) - clientSessions.mu.Unlock() - close(donec) - }() + defer close(donec) for range keepAlive { // eat messages until keep alive channel closes } @@ -81,6 +64,11 @@ func NewSession(client *v3.Client) (*Session, error) { return s, nil } +// Client is the etcd client that is attached to the session. +func (s *Session) Client() *v3.Client { + return s.client +} + // Lease is the lease ID for keys bound to the session. func (s *Session) Lease() v3.LeaseID { return s.id } @@ -102,3 +90,21 @@ func (s *Session) Close() error { _, err := s.client.Revoke(s.client.Ctx(), s.id) return err } + +type sessionOptions struct { + ttl int +} + +// SessionOption configures Session. +type SessionOption func(*sessionOptions) + +// WithTTL configures the session's TTL in seconds. +// If TTL is <= 0, the default 60 seconds TTL will be used. +func WithTTL(ttl int) SessionOption { + return func(so *sessionOptions) { + if ttl <= 0 { + so.ttl = defaultSessionTTL + } + so.ttl = ttl + } +} diff --git a/contrib/recipes/double_barrier.go b/contrib/recipes/double_barrier.go index 54168a4b7820..e31b649a1f66 100644 --- a/contrib/recipes/double_barrier.go +++ b/contrib/recipes/double_barrier.go @@ -16,6 +16,7 @@ package recipe import ( "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" ) @@ -23,32 +24,33 @@ import ( // DoubleBarrier blocks processes on Enter until an expected count enters, then // blocks again on Leave until all processes have left. type DoubleBarrier struct { - client *clientv3.Client - ctx context.Context + s *concurrency.Session + ctx context.Context key string // key for the collective barrier count int myKey *EphemeralKV // current key for this process on the barrier } -func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier { +func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier { return &DoubleBarrier{ - client: client, - ctx: context.TODO(), - key: key, - count: count, + s: s, + ctx: context.TODO(), + key: key, + count: count, } } // Enter waits for "count" processes to enter the barrier then returns func (b *DoubleBarrier) Enter() error { - ek, err := NewUniqueEphemeralKey(b.client, b.key+"/waiters") + client := b.s.Client() + ek, err := NewUniqueEphemeralKey(b.s, b.key+"/waiters") if err != nil { return err } b.myKey = ek - resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) + resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) if err != nil { return err } @@ -59,12 +61,12 @@ func (b *DoubleBarrier) Enter() error { if len(resp.Kvs) == b.count { // unblock waiters - _, err = b.client.Put(b.ctx, b.key+"/ready", "") + _, err = client.Put(b.ctx, b.key+"/ready", "") return err } _, err = WaitEvents( - b.client, + client, b.key+"/ready", ek.Revision(), []mvccpb.Event_EventType{mvccpb.PUT}) @@ -73,7 +75,8 @@ func (b *DoubleBarrier) Enter() error { // Leave waits for "count" processes to leave the barrier then returns func (b *DoubleBarrier) Leave() error { - resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) + client := b.s.Client() + resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) if err != nil { return err } @@ -94,7 +97,7 @@ func (b *DoubleBarrier) Leave() error { if len(resp.Kvs) == 1 { // this is the only node in the barrier; finish up - if _, err = b.client.Delete(b.ctx, b.key+"/ready"); err != nil { + if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil { return err } return b.myKey.Delete() @@ -106,7 +109,7 @@ func (b *DoubleBarrier) Leave() error { // lowest process in node => wait on highest process if isLowest { _, err = WaitEvents( - b.client, + client, string(highest.Key), highest.ModRevision, []mvccpb.Event_EventType{mvccpb.DELETE}) @@ -123,7 +126,7 @@ func (b *DoubleBarrier) Leave() error { key := string(lowest.Key) _, err = WaitEvents( - b.client, + client, key, lowest.ModRevision, []mvccpb.Event_EventType{mvccpb.DELETE}) diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go index ba6303086c23..90e45762fc1f 100644 --- a/contrib/recipes/key.go +++ b/contrib/recipes/key.go @@ -160,12 +160,8 @@ func (rk *RemoteKV) Put(val string) error { type EphemeralKV struct{ RemoteKV } // NewEphemeralKV creates a new key/value pair associated with a session lease -func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) { - s, err := concurrency.NewSession(client) - if err != nil { - return nil, err - } - k, err := NewKV(client, key, val, s.Lease()) +func NewEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) { + k, err := NewKV(s.Client(), key, val, s.Lease()) if err != nil { return nil, err } @@ -173,15 +169,15 @@ func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) { } // NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease -func NewUniqueEphemeralKey(client *v3.Client, prefix string) (*EphemeralKV, error) { - return NewUniqueEphemeralKV(client, prefix, "") +func NewUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) { + return NewUniqueEphemeralKV(s, prefix, "") } // NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease -func NewUniqueEphemeralKV(client *v3.Client, prefix, val string) (ek *EphemeralKV, err error) { +func NewUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) { for { newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) - ek, err = NewEphemeralKV(client, newKey, val) + ek, err = NewEphemeralKV(s, newKey, val) if err == nil || err != ErrKeyExists { break } diff --git a/contrib/recipes/rwmutex.go b/contrib/recipes/rwmutex.go index 1753d87f493d..87b237cca6c8 100644 --- a/contrib/recipes/rwmutex.go +++ b/contrib/recipes/rwmutex.go @@ -16,24 +16,27 @@ package recipe import ( v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" ) type RWMutex struct { - client *v3.Client - ctx context.Context + s *concurrency.Session + ctx context.Context key string myKey *EphemeralKV } -func NewRWMutex(client *v3.Client, key string) *RWMutex { - return &RWMutex{client, context.TODO(), key, nil} +func NewRWMutex(s *concurrency.Session, key string) *RWMutex { + return &RWMutex{s, context.TODO(), key, nil} } func (rwm *RWMutex) RLock() error { - rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/read") + client := rwm.s.Client() + + rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/read") if err != nil { return err } @@ -41,7 +44,7 @@ func (rwm *RWMutex) RLock() error { // if there are nodes with "write-" and a lower // revision number than us we must wait - resp, err := rwm.client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...) + resp, err := client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...) if err != nil { return err } @@ -53,7 +56,9 @@ func (rwm *RWMutex) RLock() error { } func (rwm *RWMutex) Lock() error { - rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/write") + client := rwm.s.Client() + + rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/write") if err != nil { return err } @@ -62,7 +67,7 @@ func (rwm *RWMutex) Lock() error { for { // find any key of lower rev number blocks the write lock opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1)) - resp, err := rwm.client.Get(rwm.ctx, rwm.key, opts...) + resp, err := client.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err } @@ -80,15 +85,17 @@ func (rwm *RWMutex) Lock() error { } func (rwm *RWMutex) waitOnLowest() error { + client := rwm.s.Client() + // must block; get key before ek for waiting opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) - lastKey, err := rwm.client.Get(rwm.ctx, rwm.key, opts...) + lastKey, err := client.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err } // wait for release on prior key _, err = WaitEvents( - rwm.client, + client, string(lastKey.Kvs[0].Key), rwm.myKey.Revision(), []mvccpb.Event_EventType{mvccpb.DELETE}) diff --git a/integration/v3_double_barrier_test.go b/integration/v3_double_barrier_test.go index 10ba1ee5fbe5..b9a3d6335e58 100644 --- a/integration/v3_double_barrier_test.go +++ b/integration/v3_double_barrier_test.go @@ -25,15 +25,25 @@ import ( func TestDoubleBarrier(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - defer dropSessionLease(clus) waiters := 10 + session, err := concurrency.NewSession(clus.RandClient()) + if err != nil { + t.Error(err) + } + defer session.Orphan() - b := recipe.NewDoubleBarrier(clus.RandClient(), "test-barrier", waiters) + b := recipe.NewDoubleBarrier(session, "test-barrier", waiters) donec := make(chan struct{}) for i := 0; i < waiters-1; i++ { go func() { - bb := recipe.NewDoubleBarrier(clus.RandClient(), "test-barrier", waiters) + session, err := concurrency.NewSession(clus.RandClient()) + if err != nil { + t.Error(err) + } + defer session.Orphan() + + bb := recipe.NewDoubleBarrier(session, "test-barrier", waiters) if err := bb.Enter(); err != nil { t.Fatalf("could not enter on barrier (%v)", err) } @@ -86,14 +96,24 @@ func TestDoubleBarrier(t *testing.T) { func TestDoubleBarrierFailover(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - defer dropSessionLease(clus) waiters := 10 donec := make(chan struct{}) + s0, err := concurrency.NewSession(clus.clients[0]) + if err != nil { + t.Error(err) + } + defer s0.Orphan() + s1, err := concurrency.NewSession(clus.clients[0]) + if err != nil { + t.Error(err) + } + defer s1.Orphan() + // sacrificial barrier holder; lease will be revoked go func() { - b := recipe.NewDoubleBarrier(clus.clients[0], "test-barrier", waiters) + b := recipe.NewDoubleBarrier(s0, "test-barrier", waiters) if err := b.Enter(); err != nil { t.Fatalf("could not enter on barrier (%v)", err) } @@ -102,7 +122,7 @@ func TestDoubleBarrierFailover(t *testing.T) { for i := 0; i < waiters-1; i++ { go func() { - b := recipe.NewDoubleBarrier(clus.clients[1], "test-barrier", waiters) + b := recipe.NewDoubleBarrier(s1, "test-barrier", waiters) if err := b.Enter(); err != nil { t.Fatalf("could not enter on barrier (%v)", err) } @@ -120,12 +140,8 @@ func TestDoubleBarrierFailover(t *testing.T) { t.Fatalf("timed out waiting for enter, %d", i) } } - // kill lease, expect Leave unblock - s, err := concurrency.NewSession(clus.clients[0]) - if err != nil { - t.Fatal(err) - } - if err = s.Close(); err != nil { + + if err = s0.Close(); err != nil { t.Fatal(err) } // join on rest of waiters @@ -137,10 +153,3 @@ func TestDoubleBarrierFailover(t *testing.T) { } } } - -func dropSessionLease(clus *ClusterV3) { - for _, client := range clus.clients { - s, _ := concurrency.NewSession(client) - s.Orphan() - } -} diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index 9f4cea6be666..f2b9cd04aa32 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -28,7 +28,6 @@ import ( func TestElectionWait(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - defer dropSessionLease(clus) leaders := 3 followers := 3 @@ -44,7 +43,12 @@ func TestElectionWait(t *testing.T) { nextc = append(nextc, make(chan struct{})) go func(ch chan struct{}) { for j := 0; j < leaders; j++ { - b := concurrency.NewElection(newClient(), "test-election") + session, err := concurrency.NewSession(newClient()) + if err != nil { + t.Error(err) + } + b := concurrency.NewElection(session, "test-election") + cctx, cancel := context.WithCancel(context.TODO()) defer cancel() s, ok := <-b.Observe(cctx) @@ -54,6 +58,7 @@ func TestElectionWait(t *testing.T) { electedc <- string(s.Kvs[0].Value) // wait for next election round <-ch + session.Orphan() } donec <- struct{}{} }(nextc[i]) @@ -62,7 +67,13 @@ func TestElectionWait(t *testing.T) { // elect some leaders for i := 0; i < leaders; i++ { go func() { - e := concurrency.NewElection(newClient(), "test-election") + session, err := concurrency.NewSession(newClient()) + if err != nil { + t.Error(err) + } + defer session.Orphan() + + e := concurrency.NewElection(session, "test-election") ev := fmt.Sprintf("electval-%v", time.Now().UnixNano()) if err := e.Campaign(context.TODO(), ev); err != nil { t.Fatalf("failed volunteer (%v)", err) @@ -97,13 +108,23 @@ func TestElectionWait(t *testing.T) { func TestElectionFailover(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - defer dropSessionLease(clus) cctx, cancel := context.WithCancel(context.TODO()) defer cancel() + ss := make([]*concurrency.Session, 3, 3) + + for i := 0; i < 3; i++ { + var err error + ss[i], err = concurrency.NewSession(clus.clients[i]) + if err != nil { + t.Error(err) + } + defer ss[i].Orphan() + } + // first leader (elected) - e := concurrency.NewElection(clus.clients[0], "test-election") + e := concurrency.NewElection(ss[0], "test-election") if err := e.Campaign(context.TODO(), "foo"); err != nil { t.Fatalf("failed volunteer (%v)", err) } @@ -121,7 +142,7 @@ func TestElectionFailover(t *testing.T) { // next leader electedc := make(chan struct{}) go func() { - ee := concurrency.NewElection(clus.clients[1], "test-election") + ee := concurrency.NewElection(ss[1], "test-election") if eer := ee.Campaign(context.TODO(), "bar"); eer != nil { t.Fatal(eer) } @@ -129,16 +150,12 @@ func TestElectionFailover(t *testing.T) { }() // invoke leader failover - session, serr := concurrency.NewSession(clus.clients[0]) - if serr != nil { - t.Fatal(serr) - } - if err := session.Close(); err != nil { + if err := ss[0].Close(); err != nil { t.Fatal(err) } // check new leader - e = concurrency.NewElection(clus.clients[2], "test-election") + e = concurrency.NewElection(ss[2], "test-election") resp, ok = <-e.Observe(cctx) if !ok { t.Fatalf("could not wait for second election; channel closed") @@ -159,11 +176,17 @@ func TestElectionSessionRecampaign(t *testing.T) { defer clus.Terminate(t) cli := clus.RandClient() - e := concurrency.NewElection(cli, "test-elect") + session, err := concurrency.NewSession(cli) + if err != nil { + t.Error(err) + } + defer session.Orphan() + + e := concurrency.NewElection(session, "test-elect") if err := e.Campaign(context.TODO(), "abc"); err != nil { t.Fatal(err) } - e2 := concurrency.NewElection(cli, "test-elect") + e2 := concurrency.NewElection(session, "test-elect") if err := e2.Campaign(context.TODO(), "def"); err != nil { t.Fatal(err) } diff --git a/integration/v3_lock_test.go b/integration/v3_lock_test.go index 29d69420019a..b0e2e67963a1 100644 --- a/integration/v3_lock_test.go +++ b/integration/v3_lock_test.go @@ -49,7 +49,11 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) lockedC := make(chan *concurrency.Mutex) for i := 0; i < waiters; i++ { go func() { - m := concurrency.NewMutex(chooseClient(), "test-mutex") + session, err := concurrency.NewSession(chooseClient()) + if err != nil { + t.Error(err) + } + m := concurrency.NewMutex(session, "test-mutex") if err := m.Lock(context.TODO()); err != nil { t.Fatalf("could not wait on lock (%v)", err) } @@ -81,12 +85,17 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) func TestMutexSessionRelock(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - cli := clus.RandClient() - m := concurrency.NewMutex(cli, "test-mutex") + session, err := concurrency.NewSession(clus.RandClient()) + if err != nil { + t.Error(err) + } + + m := concurrency.NewMutex(session, "test-mutex") if err := m.Lock(context.TODO()); err != nil { t.Fatal(err) } - m2 := concurrency.NewMutex(cli, "test-mutex") + + m2 := concurrency.NewMutex(session, "test-mutex") if err := m2.Lock(context.TODO()); err != nil { t.Fatal(err) } @@ -119,7 +128,11 @@ func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client wlockedC := make(chan *recipe.RWMutex, 1) for i := 0; i < waiters; i++ { go func() { - rwm := recipe.NewRWMutex(chooseClient(), "test-rwmutex") + session, err := concurrency.NewSession(chooseClient()) + if err != nil { + t.Error(err) + } + rwm := recipe.NewRWMutex(session, "test-rwmutex") if rand.Intn(1) == 0 { if err := rwm.RLock(); err != nil { t.Fatalf("could not rlock (%v)", err)