Skip to content

Commit

Permalink
clientv3/concurrency: allow election on prefixes of keys.
Browse files Browse the repository at this point in the history
After winning an election or obtaining a lock, we
auto-append a slash after the provided key prefix.
This avoids the previous deadlock due to waiting
on the wrong key.

Fixes #6278
  • Loading branch information
glycerine committed Aug 30, 2016
1 parent 48f4a7d commit 9497e96
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 5 deletions.
5 changes: 2 additions & 3 deletions clientv3/concurrency/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Election struct {

// NewElection returns a new election on a given key prefix.
func NewElection(s *Session, pfx string) *Election {
return &Election{session: s, keyPrefix: pfx}
return &Election{session: s, keyPrefix: pfx + "/"}
}

// Campaign puts a value as eligible for the election. It blocks until
Expand All @@ -49,15 +49,14 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client()

k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}

e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
Expand Down
4 changes: 2 additions & 2 deletions clientv3/concurrency/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Mutex struct {
}

func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx, "", -1}
return &Mutex{s, pfx + "/", "", -1}
}

// Lock locks the mutex with a cancellable context. If the context is cancelled
Expand All @@ -41,7 +41,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()

m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
Expand Down
28 changes: 28 additions & 0 deletions integration/v3_election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,31 @@ func TestElectionSessionRecampaign(t *testing.T) {
t.Fatalf("expected value=%q, got response %v", "def", resp)
}
}

// TestElectionOnPrefixOfExistingKey checks that a single
// candidate can be elected on a new key that is a prefix
// of an existing key. To wit, check for regression
// of bug #6278. https://github.com/coreos/etcd/issues/6278
//
func TestElectionOnPrefixOfExistingKey(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)

cli := clus.RandClient()
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
t.Fatal(err)
}
s, serr := concurrency.NewSession(cli)
if serr != nil {
t.Fatal(serr)
}
e := concurrency.NewElection(s, "test")
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
err := e.Campaign(ctx, "abc")
cancel()
if err != nil {
// after 5 seconds, deadlock results in
// 'context deadline exceeded' here.
t.Fatal(err)
}
}

0 comments on commit 9497e96

Please sign in to comment.