Skip to content

Commit

Permalink
clientv3: start a session with existing lease
Browse files Browse the repository at this point in the history
This change is needed to handle process restarts with elections. When the
leader process is restarted, it should be able to hang on to the leadership
by using the existing lease.

Fixes #7166
  • Loading branch information
ravigadde committed Jan 31, 2017
1 parent d2716fc commit c586218
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
25 changes: 19 additions & 6 deletions clientv3/concurrency/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
opt(ops)
}

resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = v3.LeaseID(resp.ID)
}
id := v3.LeaseID(resp.ID)

ctx, cancel := context.WithCancel(ops.ctx)
keepAlive, err := client.KeepAlive(ctx, id)
Expand Down Expand Up @@ -98,8 +101,9 @@ func (s *Session) Close() error {
}

type sessionOptions struct {
ttl int
ctx context.Context
ttl int
leaseID v3.LeaseID
ctx context.Context
}

// SessionOption configures Session.
Expand All @@ -115,6 +119,15 @@ func WithTTL(ttl int) SessionOption {
}
}

// WithLease specifies the existing leaseID to be used for the session.
// This is useful in process restart scenario, for example, to reclaim
// leadership from an election prior to restart.
func WithLease(leaseID v3.LeaseID) SessionOption {
return func(so *sessionOptions) {
so.leaseID = leaseID
}
}

// WithContext assigns a context to the session instead of defaulting to
// using the client context. This is useful for canceling NewSession and
// Close operations immediately without having to close the client. If the
Expand Down
47 changes: 47 additions & 0 deletions integration/v3_election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,50 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) {
t.Fatal(err)
}
}

// TestElectionOnSessionRestart tests that a quick restart of leader (resulting
// in a new session with the same lease id) does not result in loss of
// leadership.
func TestElectionOnSessionRestart(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()

session, err := concurrency.NewSession(cli)
if err != nil {
t.Fatal(err)
}

e := concurrency.NewElection(session, "test-elect")
if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil {
t.Fatal(cerr)
}

// ensure leader is not lost to waiter on fail-over
waitSession, werr := concurrency.NewSession(cli)
if werr != nil {
t.Fatal(werr)
}
defer waitSession.Orphan()
waitCtx, waitCancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer waitCancel()
go concurrency.NewElection(waitSession, "test-elect").Campaign(waitCtx, "123")

// simulate restart by reusing the lease from the old session
newSession, nerr := concurrency.NewSession(cli, concurrency.WithLease(session.Lease()))
if nerr != nil {
t.Fatal(nerr)
}
defer newSession.Orphan()

newElection := concurrency.NewElection(newSession, "test-elect")
if ncerr := newElection.Campaign(context.TODO(), "def"); ncerr != nil {
t.Fatal(ncerr)
}

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
if resp := <-newElection.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" {
t.Errorf("expected value=%q, got response %v", "def", resp)
}
}

0 comments on commit c586218

Please sign in to comment.