From 9c828bce15f59a23859a1fc628a4eee385e2b1c1 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Tue, 21 Dec 2021 10:14:10 +0200 Subject: [PATCH 1/6] [cluster] Fix unbalanced initial shard allocation for sharded placement --- src/cluster/placement/algo/sharded_helper.go | 17 ++++++--- src/cluster/placement/algo/sharded_test.go | 36 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/src/cluster/placement/algo/sharded_helper.go b/src/cluster/placement/algo/sharded_helper.go index 29b1613efc..28e993cacb 100644 --- a/src/cluster/placement/algo/sharded_helper.go +++ b/src/cluster/placement/algo/sharded_helper.go @@ -491,8 +491,8 @@ func (ph *helper) returnInitializingShardsToSource( func (ph *helper) mostUnderLoadedInstance() (placement.Instance, bool) { var ( - res placement.Instance - maxLoadGap int + res placement.Instance + maxLoadGap int totalLoadSurplus int ) @@ -656,12 +656,19 @@ func (h *instanceHeap) Less(i, j int) bool { leftLoadOnJ := h.targetLoadForInstance(instanceJ.ID()) - loadOnInstance(instanceJ) // If both instance has tokens to be filled, prefer the one from bigger isolation group // since it tends to be more picky in accepting shards - if leftLoadOnI > 0 && leftLoadOnJ > 0 { - if instanceI.IsolationGroup() != instanceJ.IsolationGroup() { - return h.igToWeightMap[instanceI.IsolationGroup()] > h.igToWeightMap[instanceJ.IsolationGroup()] + if leftLoadOnI > 0 && leftLoadOnJ > 0 && instanceI.IsolationGroup() != instanceJ.IsolationGroup() { + var ( + igWeightI = h.igToWeightMap[instanceI.IsolationGroup()] + igWeightJ = h.igToWeightMap[instanceJ.IsolationGroup()] + ) + if igWeightI != igWeightJ { + return igWeightI > igWeightJ } } // compare left capacity on both instances + if leftLoadOnI == leftLoadOnJ { + return instanceI.ID() < instanceJ.ID() + } if h.capacityAscending { return leftLoadOnI > leftLoadOnJ } diff --git a/src/cluster/placement/algo/sharded_test.go b/src/cluster/placement/algo/sharded_test.go index b305bfbeed..4479cf5fc0 100644 --- a/src/cluster/placement/algo/sharded_test.go +++ b/src/cluster/placement/algo/sharded_test.go @@ -1306,6 +1306,42 @@ func TestBalanceShardsForShardedWhenImbalanced(t *testing.T) { assert.Equal(t, expectedInstances, balancedPlacement.Instances()) } +func TestInitialPlacementIsBalanced(t *testing.T) { + instances := make([]placement.Instance, 0) + + for i := 0; i < 10; i++ { + instances = append(instances, + placement.NewEmptyInstance(fmt.Sprintf("instance-0-%03d", i), "iso0", "zone", "endpoint", 1), + placement.NewEmptyInstance(fmt.Sprintf("instance-1-%03d", i), "iso1", "zone", "endpoint", 1), + ) + } + + ids := make([]uint32, 1024) + for i := 0; i < len(ids); i++ { + ids[i] = uint32(i) + } + + a := newShardedAlgorithm(placement.NewOptions()) + p, err := a.InitialPlacement(instances, ids, 2) + require.NoError(t, err) + + var ( + min = math.MaxInt32 + max = math.MinInt32 + ) + for _, instance := range p.Instances() { + n := instance.Shards().NumShards() + if n < min { + min = n + } + if n > max { + max = n + } + } + require.True(t, max-min <= 1, + "The number of shards per instance differs by more than 1, min=%d max=%d", min, max) +} + func verifyAllShardsInAvailableState(t *testing.T, p placement.Placement) { for _, instance := range p.Instances() { s := instance.Shards() From 144fa72f4cd5d4d4f35209e47469987cecf3fd52 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Tue, 21 Dec 2021 10:41:07 +0200 Subject: [PATCH 2/6] small refactoring --- src/cluster/placement/algo/sharded_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cluster/placement/algo/sharded_test.go b/src/cluster/placement/algo/sharded_test.go index 4479cf5fc0..77ffde3007 100644 --- a/src/cluster/placement/algo/sharded_test.go +++ b/src/cluster/placement/algo/sharded_test.go @@ -1317,12 +1317,12 @@ func TestInitialPlacementIsBalanced(t *testing.T) { } ids := make([]uint32, 1024) - for i := 0; i < len(ids); i++ { + for i := range ids { ids[i] = uint32(i) } - a := newShardedAlgorithm(placement.NewOptions()) - p, err := a.InitialPlacement(instances, ids, 2) + algo := newShardedAlgorithm(placement.NewOptions()) + p, err := algo.InitialPlacement(instances, ids, 2) require.NoError(t, err) var ( From 1721594d34f54216415e58e57a196e1e00e149d2 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Tue, 21 Dec 2021 11:35:28 +0200 Subject: [PATCH 3/6] PR comments --- src/cluster/placement/algo/sharded_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cluster/placement/algo/sharded_test.go b/src/cluster/placement/algo/sharded_test.go index 77ffde3007..3af305b75c 100644 --- a/src/cluster/placement/algo/sharded_test.go +++ b/src/cluster/placement/algo/sharded_test.go @@ -1316,13 +1316,13 @@ func TestInitialPlacementIsBalanced(t *testing.T) { ) } - ids := make([]uint32, 1024) - for i := range ids { - ids[i] = uint32(i) + shardIDs := make([]uint32, 1024) + for i := range shardIDs { + shardIDs[i] = uint32(i) } algo := newShardedAlgorithm(placement.NewOptions()) - p, err := algo.InitialPlacement(instances, ids, 2) + p, err := algo.InitialPlacement(instances, shardIDs, 2) require.NoError(t, err) var ( @@ -1338,8 +1338,8 @@ func TestInitialPlacementIsBalanced(t *testing.T) { max = n } } - require.True(t, max-min <= 1, - "The number of shards per instance differs by more than 1, min=%d max=%d", min, max) + require.Equal(t, 102, min) + require.Equal(t, 103, max) } func verifyAllShardsInAvailableState(t *testing.T, p placement.Placement) { From ebc054bfa2f292d5b10e0115fd035e65f0fda888 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Tue, 21 Dec 2021 13:57:37 +0200 Subject: [PATCH 4/6] add property test --- .../placement/algo/sharded_prop_test.go | 89 +++++++++++++++++++ src/cluster/placement/algo/sharded_test.go | 36 -------- 2 files changed, 89 insertions(+), 36 deletions(-) create mode 100644 src/cluster/placement/algo/sharded_prop_test.go diff --git a/src/cluster/placement/algo/sharded_prop_test.go b/src/cluster/placement/algo/sharded_prop_test.go new file mode 100644 index 0000000000..7c07b88f1f --- /dev/null +++ b/src/cluster/placement/algo/sharded_prop_test.go @@ -0,0 +1,89 @@ +package algo + +import ( + "fmt" + "math" + "os" + "testing" + "time" + + "github.com/leanovate/gopter" + "github.com/leanovate/gopter/gen" + "github.com/leanovate/gopter/prop" + + "github.com/m3db/m3/src/cluster/placement" +) + +const minSuccessfulTests = 100 + +func TestInitialPlacementIsBalancedPropTest(t *testing.T) { + var ( + parameters = gopter.DefaultTestParameters() + seed = time.Now().UnixNano() + props = gopter.NewProperties(parameters) + reporter = gopter.NewFormatedReporter(true, 160, os.Stdout) + ) + + parameters.MinSuccessfulTests = minSuccessfulTests + parameters.Rng.Seed(seed) + + props.Property("Initial placement is balanced", prop.ForAll( + testInitialPlacementIsBalanced, + gen.IntRange(1, 3), + gen.IntRange(1, 20), + gen.IntRange(128, 1024), + )) + + if !props.Run(reporter) { + t.Errorf("failed with initial seed: %d", seed) + } +} + +func testInitialPlacementIsBalanced(replicaCount, instanceCount, shardCount int) (bool, error) { + instances := make([]placement.Instance, 0) + for i := 0; i < replicaCount; i++ { + for j := 0; j < instanceCount; j++ { + var ( + instanceID = fmt.Sprintf("instance-%d-%03d", i, j) + isolationGroup = fmt.Sprintf("iso-%d", i) + instance = placement.NewEmptyInstance(instanceID, isolationGroup, "zone", "endpoint", 1) + ) + instances = append(instances, instance) + } + } + + shardIDs := make([]uint32, shardCount) + for i := range shardIDs { + shardIDs[i] = uint32(i) + } + + algo := newShardedAlgorithm(placement.NewOptions()) + p, err := algo.InitialPlacement(instances, shardIDs, replicaCount) + if err != nil { + return false, err + } + + for _, shardID := range shardIDs { + if n := len(p.InstancesForShard(shardID)); n != replicaCount { + return false, fmt.Errorf("shard %d has %d replicas, but replication factor is %d", shardID, n, replicaCount) + } + } + + var ( + min = math.MaxInt32 + max = math.MinInt32 + ) + for _, instance := range p.Instances() { + n := instance.Shards().NumShards() + if n < min { + min = n + } + if n > max { + max = n + } + } + if max-min > 1 { + return false, fmt.Errorf("shard count differs by more than 1, min=%v max=%v", min, max) + } + return true, nil +} diff --git a/src/cluster/placement/algo/sharded_test.go b/src/cluster/placement/algo/sharded_test.go index 3af305b75c..b305bfbeed 100644 --- a/src/cluster/placement/algo/sharded_test.go +++ b/src/cluster/placement/algo/sharded_test.go @@ -1306,42 +1306,6 @@ func TestBalanceShardsForShardedWhenImbalanced(t *testing.T) { assert.Equal(t, expectedInstances, balancedPlacement.Instances()) } -func TestInitialPlacementIsBalanced(t *testing.T) { - instances := make([]placement.Instance, 0) - - for i := 0; i < 10; i++ { - instances = append(instances, - placement.NewEmptyInstance(fmt.Sprintf("instance-0-%03d", i), "iso0", "zone", "endpoint", 1), - placement.NewEmptyInstance(fmt.Sprintf("instance-1-%03d", i), "iso1", "zone", "endpoint", 1), - ) - } - - shardIDs := make([]uint32, 1024) - for i := range shardIDs { - shardIDs[i] = uint32(i) - } - - algo := newShardedAlgorithm(placement.NewOptions()) - p, err := algo.InitialPlacement(instances, shardIDs, 2) - require.NoError(t, err) - - var ( - min = math.MaxInt32 - max = math.MinInt32 - ) - for _, instance := range p.Instances() { - n := instance.Shards().NumShards() - if n < min { - min = n - } - if n > max { - max = n - } - } - require.Equal(t, 102, min) - require.Equal(t, 103, max) -} - func verifyAllShardsInAvailableState(t *testing.T, p placement.Placement) { for _, instance := range p.Instances() { s := instance.Shards() From 29e0bf392b3b6e343044afcf87b8ac76872ca6d1 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Tue, 21 Dec 2021 14:26:21 +0200 Subject: [PATCH 5/6] add missing copyright comment --- .../placement/algo/sharded_prop_test.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/cluster/placement/algo/sharded_prop_test.go b/src/cluster/placement/algo/sharded_prop_test.go index 7c07b88f1f..f3ff0e8ace 100644 --- a/src/cluster/placement/algo/sharded_prop_test.go +++ b/src/cluster/placement/algo/sharded_prop_test.go @@ -1,3 +1,23 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package algo import ( From 49585eeb2a14216bc3cbec71d073e39b634ae98d Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Wed, 22 Dec 2021 12:29:37 +0200 Subject: [PATCH 6/6] update test --- src/cluster/placement/algo/sharded_prop_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster/placement/algo/sharded_prop_test.go b/src/cluster/placement/algo/sharded_prop_test.go index f3ff0e8ace..ded632b79a 100644 --- a/src/cluster/placement/algo/sharded_prop_test.go +++ b/src/cluster/placement/algo/sharded_prop_test.go @@ -51,7 +51,7 @@ func TestInitialPlacementIsBalancedPropTest(t *testing.T) { testInitialPlacementIsBalanced, gen.IntRange(1, 3), gen.IntRange(1, 20), - gen.IntRange(128, 1024), + gen.IntRange(64, 3072), )) if !props.Run(reporter) {