From 0087160ef15e0a9ca96b9b25745214eef1620c0b Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Wed, 2 Feb 2022 00:24:36 -0500 Subject: [PATCH] consumer: shard transitions account for delete-then-create race If a shard assignment is removed (for example, because its FAILED) and is then immediately re-created by the allocation coordinator, we previously could fail to notice that the prior local shard was invalidated and should be re-created. Account for this by additionally monitoring the CreateRevision of assignments and treating an assignment of the same or higher slot, with a differing CreateRevision, as being a novel assignment which requires a novel local shard to be boot-strapped. Fixes #314 --- consumer/resolver.go | 27 +++++++++++++++++++++++---- consumer/resolver_test.go | 24 ++++++++++++++++++++++-- consumer/shard.go | 2 -- consumer/test_support_test.go | 18 +++++++++++------- 4 files changed, 56 insertions(+), 15 deletions(-) diff --git a/consumer/resolver.go b/consumer/resolver.go index e5401938..58337802 100644 --- a/consumer/resolver.go +++ b/consumer/resolver.go @@ -279,10 +279,25 @@ func (r *Resolver) updateLocalShards() { var next = make(map[pc.ShardID]*shard, len(r.state.LocalItems)) for _, li := range r.state.LocalItems { - var assignment = li.Assignments[li.Index] - var id = pc.ShardID(li.Item.Decoded.(allocator.Item).ID) + var ( + asn = li.Assignments[li.Index] + id = pc.ShardID(li.Item.Decoded.(allocator.Item).ID) + shard, ok = r.shards[id] + ) + + if !ok { + // Local shard doesn't exist and must be created. + } else if p := shard.resolved.assignment; asn.Decoded.(allocator.Assignment).Slot < p.Decoded.(allocator.Assignment).Slot { + // Local shard is being promoted (update). + } else if p.Raw.CreateRevision == asn.Raw.CreateRevision { + // Assignment value was updated but the assignment itself is unchanged. + } else { + // Next slot is greater-or-equal to the current slot, and the assignment + // has a different CreateRevision. This implies a deletion of the prior + // assignment, and a creation of a new assignment with the given slot. + shard, ok = nil, false + } - var shard, ok = r.shards[id] if !ok { r.wg.Add(1) shard = r.newShard(li.Item) // Newly assigned shard. @@ -298,8 +313,12 @@ func (r *Resolver) updateLocalShards() { } else { delete(r.shards, id) // Move from |r.shards| to |next|. } + next[id] = shard - transition(shard, li.Item, assignment) + transition(shard, li.Item, asn) + + shard.resolved.spec = li.Item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec) + shard.resolved.assignment = asn } var prev = r.shards diff --git a/consumer/resolver_test.go b/consumer/resolver_test.go index 4b8232f7..cdfda05c 100644 --- a/consumer/resolver_test.go +++ b/consumer/resolver_test.go @@ -283,14 +283,34 @@ func TestResolverShardTransitions(t *testing.T) { expectStatusCode(t, tf.state, pc.ReplicaStatus_PRIMARY) - // Cancel |sdB|. + // Cancel |sB|, then immediately re-create it with the same (primary) slot. + // Introduce a small delay to coerce the deletion and creation to be observed together. + tf.ks.WatchApplyDelay = 3 * time.Millisecond + tf.allocateShardNoWait(makeShard(shardB)) + tf.allocateShard(makeShard(shardB), localID) + tf.ks.WatchApplyDelay = 0 + + // Expect a new shard |newB| is created, and the prior |sB| is cancelled. + tf.ks.Mu.RLock() + var newB = tf.resolver.shards[shardB] + tf.ks.Mu.RUnlock() + + require.True(t, sB != newB) + <-sB.Context().Done() + + // Expect |newB| is transitioned to primary. + <-newB.recovery.player.Done() + <-newB.storeReadyCh + require.Nil(t, newB.Context().Err()) + + // Cancel |newB|. tf.allocateShard(makeShard(shardB)) tf.ks.Mu.RLock() require.Len(t, tf.resolver.shards, 0) tf.ks.Mu.RUnlock() - <-sB.Context().Done() // Expect |sB| is cancelled. + <-newB.Context().Done() // Expect |newB| is cancelled. tf.allocateShard(makeShard(shardA)) // Cleanup. } diff --git a/consumer/shard.go b/consumer/shard.go index 1353afdb..daf4dab2 100644 --- a/consumer/shard.go +++ b/consumer/shard.go @@ -161,8 +161,6 @@ var transition = func(s *shard, item, assignment keyspace.KeyValue) { s.wg.Add(1) // Transition standby => primary. go servePrimary(s) } - s.resolved.spec = item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec) - s.resolved.assignment = assignment } // serveStandby recovers and tails the shard recovery log, until the Replica is diff --git a/consumer/test_support_test.go b/consumer/test_support_test.go index 5ddef925..ca6b89dd 100644 --- a/consumer/test_support_test.go +++ b/consumer/test_support_test.go @@ -264,10 +264,7 @@ func newTestFixtureWithIdleShard(t require.TestingT) (*testFixture, *shard, func var tf, cleanup = newTestFixture(t) var realTransition = transition - transition = func(s *shard, item, assignment keyspace.KeyValue) { - s.resolved.spec = item.Decoded.(allocator.Item).ItemValue.(*pc.ShardSpec) - s.resolved.assignment = assignment - } + transition = func(s *shard, item, assignment keyspace.KeyValue) { /* No-op */ } tf.allocateShard(makeShard(shardA), localID) return tf, tf.resolver.shards[shardA], func() { @@ -279,6 +276,15 @@ func newTestFixtureWithIdleShard(t require.TestingT) (*testFixture, *shard, func } func (f *testFixture) allocateShard(spec *pc.ShardSpec, assignments ...pb.ProcessSpec_ID) { + var revision = f.allocateShardNoWait(spec, assignments...) + + // Wait for the Etcd revision to be read-through by the fixture's KeySpace. + f.ks.Mu.RLock() + require.NoError(f.t, f.ks.WaitForRevision(f.tasks.Context(), revision)) + f.ks.Mu.RUnlock() +} + +func (f *testFixture) allocateShardNoWait(spec *pc.ShardSpec, assignments ...pb.ProcessSpec_ID) int64 { var ops []clientv3.Op // Upsert ConsumerSpec fixtures. @@ -328,9 +334,7 @@ func (f *testFixture) allocateShard(spec *pc.ShardSpec, assignments ...pb.Proces require.NoError(f.t, err) require.Equal(f.t, true, resp.Succeeded) - f.ks.Mu.RLock() - require.NoError(f.t, f.ks.WaitForRevision(f.tasks.Context(), resp.Header.Revision)) - f.ks.Mu.RUnlock() + return resp.Header.Revision } func (f *testFixture) setReplicaStatus(spec *pc.ShardSpec, assignment pb.ProcessSpec_ID, slot int, code pc.ReplicaStatus_Code) {