Skip to content

Commit

Permalink
session: remove session manager and add ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Aug 15, 2016
1 parent 817de6d commit f5c8abf
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 128 deletions.
35 changes: 19 additions & 16 deletions clientv3/concurrency/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
)

type Election struct {
client *v3.Client
session *Session

keyPrefix string

Expand All @@ -39,20 +39,18 @@ type Election struct {
}

// NewElection returns a new election on a given key prefix.
func NewElection(client *v3.Client, pfx string) *Election {
return &Election{client: client, keyPrefix: pfx}
func NewElection(s *Session, pfx string) *Election {
return &Election{session: s, keyPrefix: pfx}
}

// Campaign puts a value as eligible for the election. It blocks until
// it is elected, an error occurs, or the context is cancelled.
func (e *Election) Campaign(ctx context.Context, val string) error {
s, serr := NewSession(e.client)
if serr != nil {
return serr
}
s := e.session
client := e.session.Client()

k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
txn := e.client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
Expand All @@ -72,12 +70,12 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
}
}

err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
err = waitDeletes(ctx, client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.Resign(e.client.Ctx())
e.Resign(client.Ctx())
default:
e.leaderSession = nil
}
Expand All @@ -92,8 +90,9 @@ func (e *Election) Proclaim(ctx context.Context, val string) error {
if e.leaderSession == nil {
return ErrElectionNotLeader
}
client := e.session.Client()
cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
txn := e.client.Txn(ctx).If(cmp)
txn := client.Txn(ctx).If(cmp)
txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
tresp, terr := txn.Commit()
if terr != nil {
Expand All @@ -111,15 +110,17 @@ func (e *Election) Resign(ctx context.Context) (err error) {
if e.leaderSession == nil {
return nil
}
_, err = e.client.Delete(ctx, e.leaderKey)
client := e.session.Client()
_, err = client.Delete(ctx, e.leaderKey)
e.leaderKey = ""
e.leaderSession = nil
return err
}

// Leader returns the leader value for the current election.
func (e *Election) Leader(ctx context.Context) (string, error) {
resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
client := e.session.Client()
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil {
return "", err
} else if len(resp.Kvs) == 0 {
Expand All @@ -139,9 +140,11 @@ func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
}

func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
client := e.session.Client()

defer close(ch)
for {
resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil {
return
}
Expand All @@ -152,7 +155,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
if len(resp.Kvs) == 0 {
// wait for first key put on prefix
opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
wch := e.client.Watch(cctx, e.keyPrefix, opts...)
wch := client.Watch(cctx, e.keyPrefix, opts...)

for kv == nil {
wr, ok := <-wch
Expand All @@ -172,7 +175,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
kv = resp.Kvs[0]
}

wch := e.client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
keyDeleted := false
for !keyDeleted {
wr, ok := <-wch
Expand Down
31 changes: 16 additions & 15 deletions clientv3/concurrency/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,30 @@ import (

// Mutex implements the sync Locker interface with etcd
type Mutex struct {
client *v3.Client
s *Session

pfx string
myKey string
myRev int64
}

func NewMutex(client *v3.Client, pfx string) *Mutex {
return &Mutex{client, pfx, "", -1}
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx, "", -1}
}

// Lock locks the mutex with a cancellable context. If the context is cancelled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s, serr := NewSession(m.client)
if serr != nil {
return serr
}
s := m.s
client := m.s.Client()

m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
resp, err := m.client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
resp, err := client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
if err != nil {
return err
}
Expand All @@ -59,18 +57,19 @@ func (m *Mutex) Lock(ctx context.Context) error {
}

// wait for deletion revisions prior to myKey
err = waitDeletes(ctx, m.client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
err = waitDeletes(ctx, client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
// release lock key if cancelled
select {
case <-ctx.Done():
m.Unlock(m.client.Ctx())
m.Unlock(client.Ctx())
default:
}
return err
}

func (m *Mutex) Unlock(ctx context.Context) error {
if _, err := m.client.Delete(ctx, m.myKey); err != nil {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
Expand All @@ -87,17 +86,19 @@ func (m *Mutex) Key() string { return m.myKey }
type lockerMutex struct{ *Mutex }

func (lm *lockerMutex) Lock() {
if err := lm.Mutex.Lock(lm.client.Ctx()); err != nil {
client := lm.s.Client()
if err := lm.Mutex.Lock(client.Ctx()); err != nil {
panic(err)
}
}
func (lm *lockerMutex) Unlock() {
if err := lm.Mutex.Unlock(lm.client.Ctx()); err != nil {
client := lm.s.Client()
if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
panic(err)
}
}

// NewLocker creates a sync.Locker backed by an etcd mutex.
func NewLocker(client *v3.Client, pfx string) sync.Locker {
return &lockerMutex{NewMutex(client, pfx)}
func NewLocker(s *Session, pfx string) sync.Locker {
return &lockerMutex{NewMutex(s, pfx)}
}
54 changes: 30 additions & 24 deletions clientv3/concurrency/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,11 @@
package concurrency

import (
"sync"

v3 "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
)

// only keep one ephemeral lease per client
var clientSessions clientSessionMgr = clientSessionMgr{sessions: make(map[*v3.Client]*Session)}

const sessionTTL = 60

type clientSessionMgr struct {
sessions map[*v3.Client]*Session
mu sync.Mutex
}
const defaultSessionTTL = 60

// Session represents a lease kept alive for the lifetime of a client.
// Fault-tolerant applications may use sessions to reason about liveness.
Expand All @@ -42,14 +32,13 @@ type Session struct {
}

// NewSession gets the leased session for a client.
func NewSession(client *v3.Client) (*Session, error) {
clientSessions.mu.Lock()
defer clientSessions.mu.Unlock()
if s, ok := clientSessions.sessions[client]; ok {
return s, nil
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
ops := &sessionOptions{}
for _, opt := range opts {
opt(ops)
}

resp, err := client.Grant(client.Ctx(), sessionTTL)
resp, err := client.Grant(client.Ctx(), defaultSessionTTL)
if err != nil {
return nil, err
}
Expand All @@ -63,16 +52,10 @@ func NewSession(client *v3.Client) (*Session, error) {

donec := make(chan struct{})
s := &Session{client: client, id: id, cancel: cancel, donec: donec}
clientSessions.sessions[client] = s

// keep the lease alive until client error or cancelled context
go func() {
defer func() {
clientSessions.mu.Lock()
delete(clientSessions.sessions, client)
clientSessions.mu.Unlock()
close(donec)
}()
defer close(donec)
for range keepAlive {
// eat messages until keep alive channel closes
}
Expand All @@ -81,6 +64,11 @@ func NewSession(client *v3.Client) (*Session, error) {
return s, nil
}

// Client is the etcd client that is attached to the session.
func (s *Session) Client() *v3.Client {
return s.client
}

// Lease is the lease ID for keys bound to the session.
func (s *Session) Lease() v3.LeaseID { return s.id }

Expand All @@ -102,3 +90,21 @@ func (s *Session) Close() error {
_, err := s.client.Revoke(s.client.Ctx(), s.id)
return err
}

type sessionOptions struct {
ttl int
}

// SessionOption configures Session.
type SessionOption func(*sessionOptions)

// WithTTL configures the session's TTL in seconds.
// If TTL is <= 0, the default 60 seconds TTL will be used.
func WithTTL(ttl int) SessionOption {
return func(so *sessionOptions) {
if ttl <= 0 {
so.ttl = defaultSessionTTL
}
so.ttl = ttl
}
}
33 changes: 18 additions & 15 deletions contrib/recipes/double_barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,41 @@ package recipe

import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)

// DoubleBarrier blocks processes on Enter until an expected count enters, then
// blocks again on Leave until all processes have left.
type DoubleBarrier struct {
client *clientv3.Client
ctx context.Context
s *concurrency.Session
ctx context.Context

key string // key for the collective barrier
count int
myKey *EphemeralKV // current key for this process on the barrier
}

func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier {
return &DoubleBarrier{
client: client,
ctx: context.TODO(),
key: key,
count: count,
s: s,
ctx: context.TODO(),
key: key,
count: count,
}
}

// Enter waits for "count" processes to enter the barrier then returns
func (b *DoubleBarrier) Enter() error {
ek, err := NewUniqueEphemeralKey(b.client, b.key+"/waiters")
client := b.s.Client()
ek, err := NewUniqueEphemeralKey(b.s, b.key+"/waiters")
if err != nil {
return err
}
b.myKey = ek

resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
if err != nil {
return err
}
Expand All @@ -59,12 +61,12 @@ func (b *DoubleBarrier) Enter() error {

if len(resp.Kvs) == b.count {
// unblock waiters
_, err = b.client.Put(b.ctx, b.key+"/ready", "")
_, err = client.Put(b.ctx, b.key+"/ready", "")
return err
}

_, err = WaitEvents(
b.client,
client,
b.key+"/ready",
ek.Revision(),
[]mvccpb.Event_EventType{mvccpb.PUT})
Expand All @@ -73,7 +75,8 @@ func (b *DoubleBarrier) Enter() error {

// Leave waits for "count" processes to leave the barrier then returns
func (b *DoubleBarrier) Leave() error {
resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
client := b.s.Client()
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
if err != nil {
return err
}
Expand All @@ -94,7 +97,7 @@ func (b *DoubleBarrier) Leave() error {

if len(resp.Kvs) == 1 {
// this is the only node in the barrier; finish up
if _, err = b.client.Delete(b.ctx, b.key+"/ready"); err != nil {
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
return err
}
return b.myKey.Delete()
Expand All @@ -106,7 +109,7 @@ func (b *DoubleBarrier) Leave() error {
// lowest process in node => wait on highest process
if isLowest {
_, err = WaitEvents(
b.client,
client,
string(highest.Key),
highest.ModRevision,
[]mvccpb.Event_EventType{mvccpb.DELETE})
Expand All @@ -123,7 +126,7 @@ func (b *DoubleBarrier) Leave() error {

key := string(lowest.Key)
_, err = WaitEvents(
b.client,
client,
key,
lowest.ModRevision,
[]mvccpb.Event_EventType{mvccpb.DELETE})
Expand Down
Loading

0 comments on commit f5c8abf

Please sign in to comment.