Skip to content

Commit

Permalink
Merge pull request #7248 from ravigadde/session-w-lease
Browse files Browse the repository at this point in the history
clientv3: start a session with existing lease
  • Loading branch information
xiang90 authored Jan 31, 2017
2 parents d2716fc + c586218 commit 7d6280f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
25 changes: 19 additions & 6 deletions clientv3/concurrency/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
opt(ops)
}

resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = v3.LeaseID(resp.ID)
}
id := v3.LeaseID(resp.ID)

ctx, cancel := context.WithCancel(ops.ctx)
keepAlive, err := client.KeepAlive(ctx, id)
Expand Down Expand Up @@ -98,8 +101,9 @@ func (s *Session) Close() error {
}

type sessionOptions struct {
ttl int
ctx context.Context
ttl int
leaseID v3.LeaseID
ctx context.Context
}

// SessionOption configures Session.
Expand All @@ -115,6 +119,15 @@ func WithTTL(ttl int) SessionOption {
}
}

// WithLease specifies the existing leaseID to be used for the session.
// This is useful in process restart scenario, for example, to reclaim
// leadership from an election prior to restart.
func WithLease(leaseID v3.LeaseID) SessionOption {
return func(so *sessionOptions) {
so.leaseID = leaseID
}
}

// WithContext assigns a context to the session instead of defaulting to
// using the client context. This is useful for canceling NewSession and
// Close operations immediately without having to close the client. If the
Expand Down
47 changes: 47 additions & 0 deletions integration/v3_election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,50 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) {
t.Fatal(err)
}
}

// TestElectionOnSessionRestart tests that a quick restart of leader (resulting
// in a new session with the same lease id) does not result in loss of
// leadership.
func TestElectionOnSessionRestart(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()

session, err := concurrency.NewSession(cli)
if err != nil {
t.Fatal(err)
}

e := concurrency.NewElection(session, "test-elect")
if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil {
t.Fatal(cerr)
}

// ensure leader is not lost to waiter on fail-over
waitSession, werr := concurrency.NewSession(cli)
if werr != nil {
t.Fatal(werr)
}
defer waitSession.Orphan()
waitCtx, waitCancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer waitCancel()
go concurrency.NewElection(waitSession, "test-elect").Campaign(waitCtx, "123")

// simulate restart by reusing the lease from the old session
newSession, nerr := concurrency.NewSession(cli, concurrency.WithLease(session.Lease()))
if nerr != nil {
t.Fatal(nerr)
}
defer newSession.Orphan()

newElection := concurrency.NewElection(newSession, "test-elect")
if ncerr := newElection.Campaign(context.TODO(), "def"); ncerr != nil {
t.Fatal(ncerr)
}

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
if resp := <-newElection.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" {
t.Errorf("expected value=%q, got response %v", "def", resp)
}
}

0 comments on commit 7d6280f

Please sign in to comment.