Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: shard transitions account for delete-then-create race #315

Merged
merged 1 commit into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions consumer/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,25 @@ func (r *Resolver) updateLocalShards() {
var next = make(map[pc.ShardID]*shard, len(r.state.LocalItems))

for _, li := range r.state.LocalItems {
var assignment = li.Assignments[li.Index]
var id = pc.ShardID(li.Item.Decoded.(allocator.Item).ID)
var (
asn = li.Assignments[li.Index]
id = pc.ShardID(li.Item.Decoded.(allocator.Item).ID)
shard, ok = r.shards[id]
)

if !ok {
// Local shard doesn't exist and must be created.
} else if p := shard.resolved.assignment; asn.Decoded.(allocator.Assignment).Slot < p.Decoded.(allocator.Assignment).Slot {
// Local shard is being promoted (update).
} else if p.Raw.CreateRevision == asn.Raw.CreateRevision {
// Assignment value was updated but the assignment itself is unchanged.
Comment on lines +292 to +293
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this case. How would the assignment value be updated without creating a new revision?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the etcd spec. CreateRevision is separately tracked from ModRevision.

} else {
// Next slot is greater-or-equal to the current slot, and the assignment
// has a different CreateRevision. This implies a deletion of the prior
// assignment, and a creation of a new assignment with the given slot.
shard, ok = nil, false
}

var shard, ok = r.shards[id]
if !ok {
r.wg.Add(1)
shard = r.newShard(li.Item) // Newly assigned shard.
Expand All @@ -298,8 +313,12 @@ func (r *Resolver) updateLocalShards() {
} else {
delete(r.shards, id) // Move from |r.shards| to |next|.
}

next[id] = shard
transition(shard, li.Item, assignment)
transition(shard, li.Item, asn)

shard.resolved.spec = li.Item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec)
shard.resolved.assignment = asn
}

var prev = r.shards
Expand Down
24 changes: 22 additions & 2 deletions consumer/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,34 @@ func TestResolverShardTransitions(t *testing.T) {

expectStatusCode(t, tf.state, pc.ReplicaStatus_PRIMARY)

// Cancel |sdB|.
// Cancel |sB|, then immediately re-create it with the same (primary) slot.
// Introduce a small delay to coerce the deletion and creation to be observed together.
tf.ks.WatchApplyDelay = 3 * time.Millisecond
tf.allocateShardNoWait(makeShard(shardB))
tf.allocateShard(makeShard(shardB), localID)
tf.ks.WatchApplyDelay = 0

// Expect a new shard |newB| is created, and the prior |sB| is cancelled.
tf.ks.Mu.RLock()
var newB = tf.resolver.shards[shardB]
tf.ks.Mu.RUnlock()

require.True(t, sB != newB)
<-sB.Context().Done()

// Expect |newB| is transitioned to primary.
<-newB.recovery.player.Done()
<-newB.storeReadyCh
require.Nil(t, newB.Context().Err())

// Cancel |newB|.
tf.allocateShard(makeShard(shardB))

tf.ks.Mu.RLock()
require.Len(t, tf.resolver.shards, 0)
tf.ks.Mu.RUnlock()

<-sB.Context().Done() // Expect |sB| is cancelled.
<-newB.Context().Done() // Expect |newB| is cancelled.

tf.allocateShard(makeShard(shardA)) // Cleanup.
}
Expand Down
2 changes: 0 additions & 2 deletions consumer/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ var transition = func(s *shard, item, assignment keyspace.KeyValue) {
s.wg.Add(1) // Transition standby => primary.
go servePrimary(s)
}
s.resolved.spec = item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec)
s.resolved.assignment = assignment
}

// serveStandby recovers and tails the shard recovery log, until the Replica is
Expand Down
18 changes: 11 additions & 7 deletions consumer/test_support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,7 @@ func newTestFixtureWithIdleShard(t require.TestingT) (*testFixture, *shard, func
var tf, cleanup = newTestFixture(t)

var realTransition = transition
transition = func(s *shard, item, assignment keyspace.KeyValue) {
s.resolved.spec = item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec)
s.resolved.assignment = assignment
}
transition = func(s *shard, item, assignment keyspace.KeyValue) { /* No-op */ }
tf.allocateShard(makeShard(shardA), localID)

return tf, tf.resolver.shards[shardA], func() {
Expand All @@ -279,6 +276,15 @@ func newTestFixtureWithIdleShard(t require.TestingT) (*testFixture, *shard, func
}

func (f *testFixture) allocateShard(spec *pc.ShardSpec, assignments ...pb.ProcessSpec_ID) {
var revision = f.allocateShardNoWait(spec, assignments...)

// Wait for the Etcd revision to be read-through by the fixture's KeySpace.
f.ks.Mu.RLock()
require.NoError(f.t, f.ks.WaitForRevision(f.tasks.Context(), revision))
f.ks.Mu.RUnlock()
}

func (f *testFixture) allocateShardNoWait(spec *pc.ShardSpec, assignments ...pb.ProcessSpec_ID) int64 {
var ops []clientv3.Op

// Upsert ConsumerSpec fixtures.
Expand Down Expand Up @@ -328,9 +334,7 @@ func (f *testFixture) allocateShard(spec *pc.ShardSpec, assignments ...pb.Proces
require.NoError(f.t, err)
require.Equal(f.t, true, resp.Succeeded)

f.ks.Mu.RLock()
require.NoError(f.t, f.ks.WaitForRevision(f.tasks.Context(), resp.Header.Revision))
f.ks.Mu.RUnlock()
return resp.Header.Revision
}

func (f *testFixture) setReplicaStatus(spec *pc.ShardSpec, assignment pb.ProcessSpec_ID, slot int, code pc.ReplicaStatus_Code) {
Expand Down