From 4d831dcd88000a0eba9694e7554d5ba2fbad742b Mon Sep 17 00:00:00 2001 From: cw9 Date: Mon, 24 Oct 2016 18:06:05 -0400 Subject: [PATCH] Consider weights in placement algorithm (#29) --- src/cluster/placement/algo/algo.go | 84 ++-- src/cluster/placement/algo/algo_test.go | 263 ++++++++----- src/cluster/placement/algo/helper.go | 355 ++++++++--------- src/cluster/placement/placement.go | 69 +++- src/cluster/placement/placement_test.go | 124 ++++-- src/cluster/placement/planner/planner_test.go | 52 +-- .../placement/service/placementservice.go | 194 +++++++--- .../service/placementservice_test.go | 361 ++++++++++++++---- src/cluster/placement/types.go | 15 +- 9 files changed, 985 insertions(+), 532 deletions(-) diff --git a/src/cluster/placement/algo/algo.go b/src/cluster/placement/algo/algo.go index c71f4fb48d..4cf16aba01 100644 --- a/src/cluster/placement/algo/algo.go +++ b/src/cluster/placement/algo/algo.go @@ -23,6 +23,7 @@ package algo import ( "container/heap" "errors" + "fmt" "github.com/m3db/m3cluster/placement" ) @@ -44,7 +45,7 @@ func NewRackAwarePlacementAlgorithm(opt placement.Options) placement.Algorithm { } func (a rackAwarePlacementAlgorithm) BuildInitialPlacement(hosts []placement.Host, shards []uint32) (placement.Snapshot, error) { - ph := newInitPlacementHelper(a.options, hosts, shards) + ph := newInitHelper(hosts, shards, a.options) if err := ph.PlaceShards(shards, nil); err != nil { return nil, err @@ -53,7 +54,8 @@ func (a rackAwarePlacementAlgorithm) BuildInitialPlacement(hosts []placement.Hos } func (a rackAwarePlacementAlgorithm) AddReplica(ps placement.Snapshot) (placement.Snapshot, error) { - ph := newPlacementHelperWithTargetRF(a.options, ps, ps.Replicas()+1) + ps = ps.Copy() + ph := newAddReplicaHelper(ps, a.options) if err := ph.PlaceShards(ps.Shards(), nil); err != nil { return nil, err } @@ -61,10 +63,9 @@ func (a rackAwarePlacementAlgorithm) AddReplica(ps placement.Snapshot) (placemen } func (a rackAwarePlacementAlgorithm) RemoveHost(ps placement.Snapshot, leavingHost placement.Host) (placement.Snapshot, error) { - var ph PlacementHelper - var leavingHostShards placement.HostShards - var err error - if ph, leavingHostShards, err = newRemoveHostPlacementHelper(a.options, ps, leavingHost); err != nil { + ps = ps.Copy() + ph, leavingHostShards, err := newRemoveHostHelper(ps, leavingHost, a.options) + if err != nil { return nil, err } // place the shards from the leaving host to the rest of the cluster @@ -75,45 +76,67 @@ func (a rackAwarePlacementAlgorithm) RemoveHost(ps placement.Snapshot, leavingHo } func (a rackAwarePlacementAlgorithm) AddHost(ps placement.Snapshot, addingHost placement.Host) (placement.Snapshot, error) { - var addingHostShards placement.HostShards - var err error - if addingHostShards, err = getNewHostShardsForSnapshot(ps, addingHost); err != nil { - return nil, err - } - return a.addHostShards(ps, addingHostShards) + ps = ps.Copy() + return a.addHostShards(ps, placement.NewHostShards(addingHost)) } -func (a rackAwarePlacementAlgorithm) ReplaceHost(ps placement.Snapshot, leavingHost, addingHost placement.Host) (placement.Snapshot, error) { - ph, leavingHostShards, addingHostShards, err := newReplaceHostPlacementHelper(a.options, ps, leavingHost, addingHost) +func (a rackAwarePlacementAlgorithm) ReplaceHost(ps placement.Snapshot, leavingHost placement.Host, addingHosts []placement.Host) (placement.Snapshot, error) { + ps = ps.Copy() + ph, leavingHostShards, addingHostShards, err := newReplaceHostHelper(ps, leavingHost, addingHosts, a.options) if err != nil { return nil, err } - var shardsUnassigned []uint32 // move shards from leaving host to adding host - for _, shard := range leavingHostShards.Shards() { - if moved := ph.MoveShard(shard, leavingHostShards, addingHostShards); !moved { - shardsUnassigned = append(shardsUnassigned, shard) + hh := ph.BuildHostHeap(addingHostShards, true) + for leavingHostShards.ShardsLen() > 0 { + if hh.Len() == 0 { + break + } + tryHost := heap.Pop(hh).(placement.HostShards) + if moved := ph.MoveOneShard(leavingHostShards, tryHost); moved { + heap.Push(hh, tryHost) } } - // if there are shards that can not be moved to adding host - // distribute them to the cluster - if err := ph.PlaceShards(shardsUnassigned, leavingHostShards); err != nil { - return nil, err + if !a.options.AllowPartialReplace() { + if leavingHostShards.ShardsLen() > 0 { + return nil, fmt.Errorf("could not fully replace all shards from %s, %v shards left unassigned", leavingHost.ID(), leavingHostShards.ShardsLen()) + } + return ph.GenerateSnapshot(), nil } - // add the adding host to the cluster and bring its load up to target load - cl := ph.GenerateSnapshot() + if leavingHostShards.ShardsLen() == 0 { + return ph.GenerateSnapshot(), nil + } + // place the shards from the leaving host to the rest of the cluster + if err := ph.PlaceShards(leavingHostShards.Shards(), leavingHostShards); err != nil { + return nil, err + } + // fill up to the target load for added hosts if have not already done so + newPS := ph.GenerateSnapshot() + for _, addingHost := range addingHosts { + newPS, leavingHostShards, err = removeHostFromPlacement(newPS, addingHost) + if err != nil { + return nil, err + } + newPS, err = a.addHostShards(newPS, leavingHostShards) + if err != nil { + return nil, err + } + } - return a.addHostShards(cl, addingHostShards) + return newPS, nil } func (a rackAwarePlacementAlgorithm) addHostShards(ps placement.Snapshot, addingHostShard placement.HostShards) (placement.Snapshot, error) { - ph := newAddHostShardsPlacementHelper(a.options, ps, addingHostShard) + if ps.HostShard(addingHostShard.Host().ID()) != nil { + return nil, errAddingHostAlreadyExist + } + ph := newAddHostShardsHelper(ps, addingHostShard, a.options) targetLoad := ph.GetTargetLoadForHost(addingHostShard.Host().ID()) // try to take shards from the most loaded hosts until the adding host reaches target load - hh := ph.GetHostHeap() + hh := ph.BuildHostHeap(ps.HostShards(), false) for addingHostShard.ShardsLen() < targetLoad { if hh.Len() == 0 { return nil, errCouldNotReachTargetLoad @@ -126,10 +149,3 @@ func (a rackAwarePlacementAlgorithm) addHostShards(ps placement.Snapshot, adding return ph.GenerateSnapshot(), nil } - -func getNewHostShardsForSnapshot(ps placement.Snapshot, addingHost placement.Host) (placement.HostShards, error) { - if ps.HostShard(addingHost.ID()) != nil { - return nil, errAddingHostAlreadyExist - } - return placement.NewEmptyHostShardsFromHost(addingHost), nil -} diff --git a/src/cluster/placement/algo/algo_test.go b/src/cluster/placement/algo/algo_test.go index a5044a23c9..8d452100ed 100644 --- a/src/cluster/placement/algo/algo_test.go +++ b/src/cluster/placement/algo/algo_test.go @@ -22,6 +22,7 @@ package algo import ( "fmt" + "math" "testing" "github.com/m3db/m3cluster/placement" @@ -29,15 +30,15 @@ import ( ) func TestGoodCase(t *testing.T) { - h1 := placement.NewHost("r1h1", "r1", "z1") - h2 := placement.NewHost("r1h2", "r1", "z1") - h3 := placement.NewHost("r2h3", "r2", "z1") - h4 := placement.NewHost("r2h4", "r2", "z1") - h5 := placement.NewHost("r3h5", "r3", "z1") - h6 := placement.NewHost("r4h6", "r4", "z1") - h7 := placement.NewHost("r5h7", "r5", "z1") - h8 := placement.NewHost("r6h8", "r6", "z1") - h9 := placement.NewHost("r7h9", "r7", "z1") + h1 := placement.NewHost("r1h1", "r1", "z1", 1) + h2 := placement.NewHost("r1h2", "r1", "z1", 1) + h3 := placement.NewHost("r2h3", "r2", "z1", 1) + h4 := placement.NewHost("r2h4", "r2", "z1", 1) + h5 := placement.NewHost("r3h5", "r3", "z1", 1) + h6 := placement.NewHost("r4h6", "r4", "z1", 1) + h7 := placement.NewHost("r5h7", "r5", "z1", 1) + h8 := placement.NewHost("r6h8", "r6", "z1", 1) + h9 := placement.NewHost("r7h9", "r7", "z1", 1) hosts := []placement.Host{h1, h2, h3, h4, h5, h6, h7, h8, h9} @@ -46,12 +47,12 @@ func TestGoodCase(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetAllowPartialReplace(true)) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) validateDistribution(t, p, 1.01, "good case1 replica 1") - p, err = a.AddHost(p, placement.NewHost("r6h21", "r6", "z1")) + p, err = a.AddHost(p, placement.NewHost("r6h21", "r6", "z1", 1)) assert.NoError(t, err) validateDistribution(t, p, 1.01, "good case1 add 1") @@ -59,8 +60,8 @@ func TestGoodCase(t *testing.T) { assert.NoError(t, err) validateDistribution(t, p, 1.01, "good case1 remove 1") - h12 := placement.NewHost("r3h12", "r3", "z1") - p, err = a.ReplaceHost(p, h5, h12) + h12 := placement.NewHost("r3h12", "r3", "z1", 1) + p, err = a.ReplaceHost(p, h5, []placement.Host{h12}) assert.NoError(t, err) validateDistribution(t, p, 1.01, "good case1 add 1") @@ -76,18 +77,18 @@ func TestGoodCase(t *testing.T) { assert.NoError(t, err) validateDistribution(t, p, 1.01, "good case1 replica 3") - h10 := placement.NewHost("r4h10", "r4", "z1") + h10 := placement.NewHost("r4h10", "r4", "z1", 1) p, err = a.AddHost(p, h10) assert.NoError(t, err) validateDistribution(t, p, 1.01, "good case1 add 1") - h11 := placement.NewHost("r7h11", "r7", "z1") + h11 := placement.NewHost("r7h11", "r7", "z1", 1) p, err = a.AddHost(p, h11) assert.NoError(t, err) validateDistribution(t, p, 1.01, "good case1 add 2") - h13 := placement.NewHost("r5h13", "r5", "z1") - p, err = a.ReplaceHost(p, h3, h13) + h13 := placement.NewHost("r5h13", "r5", "z1", 1) + p, err = a.ReplaceHost(p, h3, []placement.Host{h13}) assert.NoError(t, err) validateDistribution(t, p, 1.01, "good case1 replace 1") @@ -96,20 +97,84 @@ func TestGoodCase(t *testing.T) { validateDistribution(t, p, 1.02, "good case1 remove 2") } +func TestGoodCaseWithWeight(t *testing.T) { + h1 := placement.NewHost("h1", "r1", "z1", 10) + h2 := placement.NewHost("h2", "r1", "z1", 10) + h3 := placement.NewHost("h3", "r2", "z1", 20) + h4 := placement.NewHost("h4", "r3", "z1", 10) + h5 := placement.NewHost("h5", "r4", "z1", 30) + h6 := placement.NewHost("h6", "r6", "z1", 40) + h7 := placement.NewHost("h7", "r7", "z1", 10) + h8 := placement.NewHost("h8", "r8", "z1", 10) + h9 := placement.NewHost("h9", "r9", "z1", 10) + + hosts := []placement.Host{h1, h2, h3, h4, h5, h6, h7, h8, h9} + + ids := make([]uint32, 1024) + for i := 0; i < len(ids); i++ { + ids[i] = uint32(i) + } + + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) + p, err := a.BuildInitialPlacement(hosts, ids) + assert.NoError(t, err) + validateDistribution(t, p, 1.01, "good case1 replica 1") + + p, err = a.AddHost(p, placement.NewHost("h21", "r2", "z1", 10)) + assert.NoError(t, err) + validateDistribution(t, p, 1.01, "good case1 add 1") + + p, err = a.RemoveHost(p, h1) + assert.NoError(t, err) + validateDistribution(t, p, 1.01, "good case1 remove 1") + + p, err = a.ReplaceHost(p, h3, + []placement.Host{ + placement.NewHost("h31", "r1", "z1", 10), + placement.NewHost("h32", "r1", "z1", 10), + }) + assert.NoError(t, err) + validateDistribution(t, p, 1.01, "good case1 replace 1") + + p, err = a.AddReplica(p) + assert.NoError(t, err) + validateDistribution(t, p, 1.01, "good case1 replica 2") + + p, err = a.AddReplica(p) + assert.NoError(t, err) + validateDistribution(t, p, 1.01, "good case1 replica 3") + + h10 := placement.NewHost("h10", "r10", "z1", 10) + p, err = a.AddHost(p, h10) + assert.NoError(t, err) + validateDistribution(t, p, 1.01, "good case1 add 1") + + h11 := placement.NewHost("h11", "r7", "z1", 10) + p, err = a.AddHost(p, h11) + assert.NoError(t, err) + validateDistribution(t, p, 1.01, "good case1 add 2") + + h13 := placement.NewHost("h13", "r5", "z1", 10) + + p, err = a.ReplaceHost(p, h11, []placement.Host{h13}) + assert.NoError(t, err) + validateDistribution(t, p, 1.01, "good case1 replace 1") +} + func TestOverSizedRack(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") - r1h6 := placement.NewHost("r1h6", "r1", "z1") - r1h7 := placement.NewHost("r1h7", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) + r1h6 := placement.NewHost("r1h6", "r1", "z1", 1) + r1h7 := placement.NewHost("r1h7", "r1", "z1", 1) - r2h2 := placement.NewHost("r2h2", "r2", "z1") - r2h3 := placement.NewHost("r2h3", "r2", "z1") + r2h2 := placement.NewHost("r2h2", "r2", "z1", 1) + r2h3 := placement.NewHost("r2h3", "r2", "z1", 1) - r3h4 := placement.NewHost("r3h4", "r3", "z1") - r3h8 := placement.NewHost("r3h8", "r3", "z1") + r3h4 := placement.NewHost("r3h4", "r3", "z1", 1) + r3h8 := placement.NewHost("r3h8", "r3", "z1", 1) - r4h5 := placement.NewHost("r4h5", "r4", "z1") + r4h5 := placement.NewHost("r4h5", "r4", "z1", 1) - r5h9 := placement.NewHost("r5h9", "r5", "z1") + r5h9 := placement.NewHost("r5h9", "r5", "z1", 1) hosts := []placement.Host{r1h1, r2h2, r2h3, r3h4, r4h5, r1h6, r1h7, r3h8, r5h9} @@ -118,7 +183,7 @@ func TestOverSizedRack(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestOverSizedRack replica 1") @@ -131,25 +196,30 @@ func TestOverSizedRack(t *testing.T) { assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestOverSizedRack replica 3") - r4h10 := placement.NewHost("r4h10", "r4", "z1") - p, err = a.ReplaceHost(p, r3h8, r4h10) + r4h10 := placement.NewHost("r4h10", "r4", "z1", 1) + _, err = a.ReplaceHost(p, r3h8, []placement.Host{r4h10}) + assert.Error(t, err) + + a = NewRackAwarePlacementAlgorithm(placement.NewOptions().SetAllowPartialReplace(true)) + p, err = a.ReplaceHost(p, r3h8, []placement.Host{r4h10}) assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestOverSizedRack replace 1") // At this point, r1 has 4 hosts to share a copy of 1024 partitions - r1h11 := placement.NewHost("r1h11", "r1", "z1") - p, err = a.ReplaceHost(p, r2h2, r1h11) + r1h11 := placement.NewHost("r1h11", "r1", "z1", 1) + + p, err = a.ReplaceHost(p, r2h2, []placement.Host{r1h11}) assert.NoError(t, err) validateDistribution(t, p, 1.22, "TestOverSizedRack replace 2") // adding a new host to relieve the load on the hot hosts - r4h12 := placement.NewHost("r4h12", "r4", "z1") + r4h12 := placement.NewHost("r4h12", "r4", "z1", 1) p, err = a.AddHost(p, r4h12) assert.NoError(t, err) validateDistribution(t, p, 1.15, "TestOverSizedRack add 1") } -func TestInitPlacementOn0Host(t *testing.T) { +func TestInitPlacementOnHost(t *testing.T) { hosts := []placement.Host{} ids := make([]uint32, 1024) @@ -157,15 +227,15 @@ func TestInitPlacementOn0Host(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.Error(t, err) assert.Nil(t, p) } func TestOneRack(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") - r1h2 := placement.NewHost("r1h2", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) + r1h2 := placement.NewHost("r1h2", "r1", "z1", 1) hosts := []placement.Host{r1h1, r1h2} @@ -174,12 +244,12 @@ func TestOneRack(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestOneRack replica 1") - r1h6 := placement.NewHost("r1h6", "r1", "z1") + r1h6 := placement.NewHost("r1h6", "r1", "z1", 1) p, err = a.AddHost(p, r1h6) assert.NoError(t, err) @@ -187,11 +257,11 @@ func TestOneRack(t *testing.T) { } func TestRFGreaterThanRackLen(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") - r1h6 := placement.NewHost("r1h6", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) + r1h6 := placement.NewHost("r1h6", "r1", "z1", 1) - r2h2 := placement.NewHost("r2h2", "r2", "z1") - r2h3 := placement.NewHost("r2h3", "r2", "z1") + r2h2 := placement.NewHost("r2h2", "r2", "z1", 1) + r2h3 := placement.NewHost("r2h3", "r2", "z1", 1) hosts := []placement.Host{r1h1, r2h2, r2h3, r1h6} @@ -200,7 +270,7 @@ func TestRFGreaterThanRackLen(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestRFGreaterThanRackLen replica 1") @@ -216,9 +286,9 @@ func TestRFGreaterThanRackLen(t *testing.T) { } func TestRFGreaterThanRackLenAfterHostRemoval(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) - r2h2 := placement.NewHost("r2h2", "r2", "z1") + r2h2 := placement.NewHost("r2h2", "r2", "z1", 1) hosts := []placement.Host{r1h1, r2h2} @@ -227,7 +297,7 @@ func TestRFGreaterThanRackLenAfterHostRemoval(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestRFGreaterThanRackLenAfterHostRemoval replica 1") @@ -243,9 +313,9 @@ func TestRFGreaterThanRackLenAfterHostRemoval(t *testing.T) { } func TestRFGreaterThanRackLenAfterHostReplace(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) - r2h2 := placement.NewHost("r2h2", "r2", "z1") + r2h2 := placement.NewHost("r2h2", "r2", "z1", 1) hosts := []placement.Host{r1h1, r2h2} @@ -254,7 +324,7 @@ func TestRFGreaterThanRackLenAfterHostReplace(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestRFGreaterThanRackLenAfterHostRemoval replica 1") @@ -263,17 +333,17 @@ func TestRFGreaterThanRackLenAfterHostReplace(t *testing.T) { assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestRFGreaterThanRackLenAfterHostRemoval replica 2") - r1h3 := placement.NewHost("r1h3", "r1", "z1") - p1, err := a.ReplaceHost(p, r2h2, r1h3) - assert.Equal(t, errNotEnoughRacks, err) + r1h3 := placement.NewHost("r1h3", "r1", "z1", 1) + p1, err := a.ReplaceHost(p, r2h2, []placement.Host{r1h3}) + assert.Error(t, err) assert.Nil(t, p1) assert.NoError(t, p.Validate()) } func TestLooseRackCheckAlgorithm(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) - r2h2 := placement.NewHost("r2h2", "r2", "z1") + r2h2 := placement.NewHost("r2h2", "r2", "z1", 1) hosts := []placement.Host{r1h1, r2h2} @@ -282,7 +352,7 @@ func TestLooseRackCheckAlgorithm(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) assert.NoError(t, p.Validate()) @@ -296,7 +366,7 @@ func TestLooseRackCheckAlgorithm(t *testing.T) { assert.Nil(t, p1) assert.NoError(t, p.Validate()) - r2h4 := placement.NewHost("r2h4", "r2", "z1") + r2h4 := placement.NewHost("r2h4", "r2", "z1", 1) p, err = a.AddHost(p, r2h4) assert.NoError(t, err) assert.NoError(t, p.Validate()) @@ -308,12 +378,12 @@ func TestLooseRackCheckAlgorithm(t *testing.T) { b := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(true)) // different with normal algo, which would return error here - r1h3 := placement.NewHost("r1h3", "r1", "z1") - p, err = b.ReplaceHost(p, r2h2, r1h3) + r1h3 := placement.NewHost("r1h3", "r1", "z1", 1) + p, err = b.ReplaceHost(p, r2h2, []placement.Host{r1h3}) assert.NoError(t, err) assert.NoError(t, p.Validate()) - p1, err = b.ReplaceHost(p, r2h4, r1h3) + p1, err = b.ReplaceHost(p, r2h4, []placement.Host{r1h3}) assert.Equal(t, errAddingHostAlreadyExist, err) assert.Nil(t, p1) assert.NoError(t, p.Validate()) @@ -322,7 +392,7 @@ func TestLooseRackCheckAlgorithm(t *testing.T) { assert.NoError(t, err) assert.NoError(t, p.Validate()) - r3h5 := placement.NewHost("r3h5", "r3", "z1") + r3h5 := placement.NewHost("r3h5", "r3", "z1", 1) p, err = b.AddHost(p, r3h5) assert.NoError(t, err) assert.NoError(t, p.Validate()) @@ -333,7 +403,7 @@ func TestLooseRackCheckAlgorithm(t *testing.T) { } func TestAddHostCouldNotReachTargetLoad(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) ids := make([]uint32, 1024) for i := 0; i < len(ids); i++ { @@ -342,7 +412,7 @@ func TestAddHostCouldNotReachTargetLoad(t *testing.T) { p := placement.NewPlacementSnapshot([]placement.HostShards{}, ids, 1) - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p1, err := a.AddHost(p, r1h1) // errCouldNotReachTargetLoad should only happen when trying to add a host to @@ -352,9 +422,9 @@ func TestAddHostCouldNotReachTargetLoad(t *testing.T) { } func TestAddExistHost(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) - r2h2 := placement.NewHost("r2h2", "r2", "z1") + r2h2 := placement.NewHost("r2h2", "r2", "z1", 1) hosts := []placement.Host{r1h1, r2h2} @@ -363,7 +433,7 @@ func TestAddExistHost(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestAddExistHost replica 1") @@ -375,9 +445,9 @@ func TestAddExistHost(t *testing.T) { } func TestRemoveAbsentHost(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) - r2h2 := placement.NewHost("r2h2", "r2", "z1") + r2h2 := placement.NewHost("r2h2", "r2", "z1", 1) hosts := []placement.Host{r1h1, r2h2} @@ -386,12 +456,12 @@ func TestRemoveAbsentHost(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestRemoveAbsentHost replica 1") - r3h3 := placement.NewHost("r3h3", "r3", "z1") + r3h3 := placement.NewHost("r3h3", "r3", "z1", 1) p1, err := a.RemoveHost(p, r3h3) assert.Error(t, err) @@ -400,9 +470,9 @@ func TestRemoveAbsentHost(t *testing.T) { } func TestReplaceAbsentHost(t *testing.T) { - r1h1 := placement.NewHost("r1h1", "r1", "z1") + r1h1 := placement.NewHost("r1h1", "r1", "z1", 1) - r2h2 := placement.NewHost("r2h2", "r2", "z1") + r2h2 := placement.NewHost("r2h2", "r2", "z1", 1) hosts := []placement.Host{r1h1, r2h2} @@ -411,47 +481,47 @@ func TestReplaceAbsentHost(t *testing.T) { ids[i] = uint32(i) } - a := NewRackAwarePlacementAlgorithm(placement.NewOptions().SetLooseRackCheck(false)) + a := NewRackAwarePlacementAlgorithm(placement.NewOptions()) p, err := a.BuildInitialPlacement(hosts, ids) assert.NoError(t, err) validateDistribution(t, p, 1.01, "TestReplaceAbsentHost replica 1") - r3h3 := placement.NewHost("r3h3", "r3", "z1") - r4h4 := placement.NewHost("r4h4", "r4", "z1") + r3h3 := placement.NewHost("r3h3", "r3", "z1", 1) + r4h4 := placement.NewHost("r4h4", "r4", "z1", 1) - p1, err := a.ReplaceHost(p, r3h3, r4h4) + p1, err := a.ReplaceHost(p, r3h3, []placement.Host{r4h4}) assert.Error(t, err) assert.Nil(t, p1) assert.NoError(t, p.Validate()) } func TestCanAssignHost(t *testing.T) { - h1 := placement.NewEmptyHostShards("r1h1", "r1", "z1") + h1 := placement.NewHostShards(placement.NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) h1.AddShard(3) - h2 := placement.NewEmptyHostShards("r1h2", "r1", "z1") + h2 := placement.NewHostShards(placement.NewHost("r1h2", "r1", "z1", 1)) h2.AddShard(4) h2.AddShard(5) h2.AddShard(6) - h3 := placement.NewEmptyHostShards("r2h3", "r2", "z1") + h3 := placement.NewHostShards(placement.NewHost("r2h3", "r2", "z1", 1)) h3.AddShard(1) h3.AddShard(3) h3.AddShard(5) - h4 := placement.NewEmptyHostShards("r2h4", "r2", "z1") + h4 := placement.NewHostShards(placement.NewHost("r2h4", "r2", "z1", 1)) h4.AddShard(2) h4.AddShard(4) h4.AddShard(6) - h5 := placement.NewEmptyHostShards("r3h5", "r3", "z1") + h5 := placement.NewHostShards(placement.NewHost("r3h5", "r3", "z1", 1)) h5.AddShard(5) h5.AddShard(6) h5.AddShard(1) - h6 := placement.NewEmptyHostShards("r4h6", "r4", "z1") + h6 := placement.NewHostShards(placement.NewHost("r4h6", "r4", "z1", 1)) h6.AddShard(2) h6.AddShard(3) h6.AddShard(4) @@ -460,35 +530,40 @@ func TestCanAssignHost(t *testing.T) { mp := placement.NewPlacementSnapshot(hss, []uint32{1, 2, 3, 4, 5, 6}, 3) - ph := newPlacementHelperWithTargetRF(placement.NewOptions(), mp, 3).(*placementHelper) + ph := newHelper(mp, 3, placement.NewOptions()).(*placementHelper) assert.True(t, ph.canAssignHost(2, h6, h5)) assert.True(t, ph.canAssignHost(1, h1, h6)) assert.False(t, ph.canAssignHost(2, h6, h1)) // rack check assert.False(t, ph.canAssignHost(2, h6, h3)) - ph = newPlacementHelperWithTargetRF(placement.NewOptions().SetLooseRackCheck(true), mp, 3).(*placementHelper) + ph = newHelper(mp, 3, placement.NewOptions().SetLooseRackCheck(true)).(*placementHelper) assert.True(t, ph.canAssignHost(2, h6, h3)) } func validateDistribution(t *testing.T, mp placement.Snapshot, expectPeakOverAvg float64, testCase string) { - sh := NewPlacementHelper(placement.NewOptions(), mp) + ph := NewPlacementHelper(mp, placement.NewOptions()).(*placementHelper) total := 0 for _, hostShard := range mp.HostShards() { hostLoad := hostShard.ShardsLen() total += hostLoad - hostOverAvg := float64(hostLoad) / float64(getAvgLoad(mp)) - assert.True(t, hostOverAvg <= expectPeakOverAvg, fmt.Sprintf("Bad distribution in %s, peak/Avg on %s is too high: %v, expecting %v, load on host: %v, avg load: %v", - testCase, hostShard.Host().ID(), hostOverAvg, expectPeakOverAvg, hostLoad, getAvgLoad(mp))) - - target := sh.GetTargetLoadForHost(hostShard.Host().ID()) - hostOverTarget := float64(hostLoad) / float64(target) - assert.True(t, hostOverTarget <= 1.03, fmt.Sprintf("Bad distribution in %s, peak/Target is too high. %s: %v, load on host: %v, target load: %v", - testCase, hostShard.Host().ID(), hostOverTarget, hostLoad, target)) + avgLoad := getWeightedLoad(ph, hostShard.Host().Weight()) + hostOverAvg := float64(hostLoad) / float64(avgLoad) + if math.Abs(float64(hostLoad-avgLoad)) > 1 { + assert.True(t, hostOverAvg <= expectPeakOverAvg, fmt.Sprintf("Bad distribution in %s, peak/Avg on %s is too high: %v, expecting %v, load on host: %v, avg load: %v", + testCase, hostShard.Host().ID(), hostOverAvg, expectPeakOverAvg, hostLoad, avgLoad)) + } + + targetLoad := ph.GetTargetLoadForHost(hostShard.Host().ID()) + hostOverTarget := float64(hostLoad) / float64(targetLoad) + if math.Abs(float64(hostLoad-targetLoad)) > 1 { + assert.True(t, hostOverTarget <= 1.03, fmt.Sprintf("Bad distribution in %s, peak/Target on %s is too high: %v, load on host: %v, target load: %v", + testCase, hostShard.Host().ID(), hostOverTarget, hostLoad, targetLoad)) + } } assert.Equal(t, total, mp.Replicas()*mp.ShardsLen(), fmt.Sprintf("Wrong total partition: expecting %v, but got %v", mp.Replicas()*mp.ShardsLen(), total)) assert.NoError(t, mp.Validate(), "snapshot validation failed") } -func getAvgLoad(ps placement.Snapshot) int { - return ps.Replicas() * ps.ShardsLen() / ps.HostsLen() +func getWeightedLoad(ph *placementHelper, weight uint32) int { + return ph.rf * len(ph.shardToHostMap) * int(weight) / int(ph.totalWeight) } diff --git a/src/cluster/placement/algo/helper.go b/src/cluster/placement/algo/helper.go index 1043c42d28..fa0b800b79 100644 --- a/src/cluster/placement/algo/helper.go +++ b/src/cluster/placement/algo/helper.go @@ -41,19 +41,20 @@ type PlacementHelper interface { MoveShard(shard uint32, from, to placement.HostShards) bool // HasNoRackConflict checks if the rack constraint is violated if the given shard is moved to the target rack HasNoRackConflict(shard uint32, from placement.HostShards, toRack string) bool - // GetHostHeap returns a host heap that sort the hosts based on their capacity - GetHostHeap() heap.Interface + // BuildHostHeap returns heap of HostShards sorted by available capacity + BuildHostHeap(hostShards []placement.HostShards, availableCapacityAscending bool) heap.Interface } type placementHelper struct { - hostHeap heap.Interface - targetLoad map[string]int - shardToHostMap map[uint32]map[placement.HostShards]struct{} - rackToHostsMap map[string]map[placement.HostShards]struct{} - rf int - uniqueShards []uint32 - hostShards []placement.HostShards - options placement.Options + targetLoad map[string]int + shardToHostMap map[uint32]map[placement.HostShards]struct{} + rackToHostsMap map[string]map[placement.HostShards]struct{} + rackToWeightMap map[string]uint32 + totalWeight uint32 + rf int + uniqueShards []uint32 + hostShards []placement.HostShards + options placement.Options } func (ph *placementHelper) GetTargetLoadForHost(hostID string) int { @@ -61,6 +62,9 @@ func (ph *placementHelper) GetTargetLoadForHost(hostID string) int { } func (ph *placementHelper) MoveOneShard(from, to placement.HostShards) bool { + if from == nil { + return false + } for _, shard := range from.Shards() { if ph.MoveShard(shard, from, to) { return true @@ -78,192 +82,213 @@ func (ph *placementHelper) MoveShard(shard uint32, from, to placement.HostShards return false } +func (ph placementHelper) HasNoRackConflict(shard uint32, from placement.HostShards, toRack string) bool { + if from != nil { + if from.Host().Rack() == toRack { + return true + } + } + for host := range ph.shardToHostMap[shard] { + if host.Host().Rack() == toRack { + return false + } + } + return true +} + +func (ph *placementHelper) BuildHostHeap(hostShards []placement.HostShards, availableCapacityAscending bool) heap.Interface { + return newHostHeap(hostShards, availableCapacityAscending, ph.targetLoad, ph.rackToWeightMap) +} + +func (ph *placementHelper) GenerateSnapshot() placement.Snapshot { + return placement.NewPlacementSnapshot(ph.hostShards, ph.uniqueShards, ph.rf) +} + func (ph placementHelper) PlaceShards(shards []uint32, from placement.HostShards) error { shardSet := placement.ConvertShardSliceToMap(shards) - var tried []placement.HostShards - if from != nil { // prefer to distribute "some" of the load to other racks first // because the load from a leaving host can always get assigned to a host on the same rack ph.placeToRacksOtherThanOrigin(shardSet, from) } + hostHeap := ph.BuildHostHeap(ph.hostShards, true) // if there are shards left to be assigned, distribute them evenly + var triedHosts []placement.HostShards for shard := range shardSet { - tried = tried[:0] - for ph.hostHeap.Len() > 0 { - tryHost := heap.Pop(ph.hostHeap).(placement.HostShards) - tried = append(tried, tryHost) - if ph.canAssignHost(shard, from, tryHost) { - ph.assignShardToHost(shard, tryHost) - for _, triedHost := range tried { - heap.Push(ph.hostHeap, triedHost) - } + moved := false + for hostHeap.Len() > 0 { + tryHost := heap.Pop(hostHeap).(placement.HostShards) + triedHosts = append(triedHosts, tryHost) + if ph.MoveShard(shard, from, tryHost) { + moved = true break } } - if ph.hostHeap.Len() == 0 { + if !moved { // this should only happen when RF > number of racks return errNotEnoughRacks } + for _, triedHost := range triedHosts { + heap.Push(hostHeap, triedHost) + } + triedHosts = triedHosts[:0] } return nil } -func (ph *placementHelper) GetHostHeap() heap.Interface { - return ph.hostHeap -} +// placeToRacksOtherThanOrigin move shards from a host to the rest of the cluster +// the goal of this function is to assign "some" of the shards to the hosts in other racks +func (ph placementHelper) placeToRacksOtherThanOrigin(shards map[uint32]int, from placement.HostShards) { + var otherRack []placement.HostShards + for rack, hostShards := range ph.rackToHostsMap { + if rack == from.Host().Rack() { + continue + } + for hostShard := range hostShards { + otherRack = append(otherRack, hostShard) + } + } -func (ph *placementHelper) GenerateSnapshot() placement.Snapshot { - return placement.NewPlacementSnapshot(ph.hostShards, ph.uniqueShards, ph.rf) + hostHeap := ph.BuildHostHeap(otherRack, true) + + var triedHosts []placement.HostShards + for shard := range shards { + for hostHeap.Len() > 0 { + tryHost := heap.Pop(hostHeap).(placement.HostShards) + if ph.GetTargetLoadForHost(tryHost.Host().ID())-tryHost.ShardsLen() <= 0 { + // this is where "some" is, at this point the best host option in the cluster + // from a different rack has reached its target load, time to break out of the loop + return + } + triedHosts = append(triedHosts, tryHost) + if ph.MoveShard(shard, from, tryHost) { + delete(shards, shard) + break + } + } + + for _, triedHost := range triedHosts { + heap.Push(hostHeap, triedHost) + } + triedHosts = triedHosts[:0] + } } // NewPlacementHelper returns a placement helper -func NewPlacementHelper(opt placement.Options, s placement.Snapshot) PlacementHelper { - return newPlacementHelperWithTargetRF(opt, s, s.Replicas()) +func NewPlacementHelper(ps placement.Snapshot, opt placement.Options) PlacementHelper { + return newHelper(ps, ps.Replicas(), opt) } -func newInitPlacementHelper(opt placement.Options, hosts []placement.Host, ids []uint32) PlacementHelper { + +func newInitHelper(hosts []placement.Host, ids []uint32, opt placement.Options) PlacementHelper { emptyPlacement := placement.NewEmptyPlacementSnapshot(hosts, ids) - return newPlaceShardingHelper(opt, emptyPlacement, emptyPlacement.Replicas()+1, true) + return newHelper(emptyPlacement, emptyPlacement.Replicas()+1, opt) } -func newPlacementHelperWithTargetRF(opt placement.Options, s placement.Snapshot, targetRF int) PlacementHelper { - return newPlaceShardingHelper(opt, s, targetRF, true) +func newAddReplicaHelper(ps placement.Snapshot, opt placement.Options) PlacementHelper { + return newHelper(ps, ps.Replicas()+1, opt) } -func newAddHostShardsPlacementHelper(opt placement.Options, s placement.Snapshot, hs placement.HostShards) PlacementHelper { - var hss []placement.HostShards +func newAddHostShardsHelper(ps placement.Snapshot, hostShards placement.HostShards, opt placement.Options) PlacementHelper { + ps = placement.NewPlacementSnapshot(append(ps.HostShards(), hostShards), ps.Shards(), ps.Replicas()) + return newHelper(ps, ps.Replicas(), opt) +} - for _, phs := range s.HostShards() { - hss = append(hss, phs) +func newRemoveHostHelper(ps placement.Snapshot, leavingHost placement.Host, opt placement.Options) (PlacementHelper, placement.HostShards, error) { + ps, leavingHostShards, err := removeHostFromPlacement(ps, leavingHost) + if err != nil { + return nil, nil, err } - - hss = append(hss, hs) - - ps := placement.NewPlacementSnapshot(hss, s.Shards(), s.Replicas()) - return newPlaceShardingHelper(opt, ps, s.Replicas(), false) + return newHelper(ps, ps.Replicas(), opt), leavingHostShards, nil } -func newReplaceHostPlacementHelper( +func newReplaceHostHelper( + ps placement.Snapshot, + leavingHost placement.Host, + addingHosts []placement.Host, opt placement.Options, - s placement.Snapshot, - leavingHost, addingHost placement.Host, -) (PlacementHelper, placement.HostShards, placement.HostShards, error) { - var err error - var addingHostShards placement.HostShards - if addingHostShards, err = getNewHostShardsForSnapshot(s, addingHost); err != nil { +) (PlacementHelper, placement.HostShards, []placement.HostShards, error) { + ps, leavingHostShards, err := removeHostFromPlacement(ps, leavingHost) + if err != nil { return nil, nil, nil, err } - var ph PlacementHelper - var leavingHostShards placement.HostShards - if ph, leavingHostShards, err = newRemoveHostPlacementHelper(opt, s, leavingHost); err != nil { - return nil, nil, nil, err - } - return ph, leavingHostShards, addingHostShards, nil -} - -func newRemoveHostPlacementHelper(opt placement.Options, s placement.Snapshot, leavingHost placement.Host) (PlacementHelper, placement.HostShards, error) { - if s.HostShard(leavingHost.ID()) == nil { - return nil, nil, errHostAbsent - } - var leavingHostShards placement.HostShards - var hosts []placement.HostShards - for _, phs := range s.HostShards() { - if phs.Host().ID() == leavingHost.ID() { - leavingHostShards = phs - continue + addingHostShards := make([]placement.HostShards, len(addingHosts)) + for i, host := range addingHosts { + ps, addingHostShards[i], err = addHostToPlacement(ps, host) + if err != nil { + return nil, nil, nil, err } - hosts = append(hosts, phs) } - ps := placement.NewPlacementSnapshot(hosts, s.Shards(), s.Replicas()) - return newPlaceShardingHelper(opt, ps, s.Replicas(), true), leavingHostShards, nil + return newHelper(ps, ps.Replicas(), opt), leavingHostShards, addingHostShards, nil } -func newPlaceShardingHelper(opt placement.Options, ps placement.Snapshot, targetRF int, hostCapacityAscending bool) PlacementHelper { +func newHelper(ps placement.Snapshot, targetRF int, opt placement.Options) PlacementHelper { ph := &placementHelper{ - shardToHostMap: make(map[uint32]map[placement.HostShards]struct{}), - rackToHostsMap: make(map[string]map[placement.HostShards]struct{}), - rf: targetRF, - hostShards: ps.HostShards(), - uniqueShards: ps.Shards(), - options: opt, + rf: targetRF, + hostShards: ps.HostShards(), + uniqueShards: ps.Shards(), + options: opt, } - // build rackToHost map - ph.buildRackToHostMap() - - ph.buildHostHeap(hostCapacityAscending) + ph.scanCurrentLoad() + ph.buildTargetLoad() return ph } -func (ph *placementHelper) buildRackToHostMap() { +func (ph *placementHelper) scanCurrentLoad() { + ph.shardToHostMap = make(map[uint32]map[placement.HostShards]struct{}, len(ph.uniqueShards)) + ph.rackToHostsMap = make(map[string]map[placement.HostShards]struct{}) + ph.rackToWeightMap = make(map[string]uint32) + totalWeight := uint32(0) for _, h := range ph.hostShards { if _, exist := ph.rackToHostsMap[h.Host().Rack()]; !exist { ph.rackToHostsMap[h.Host().Rack()] = make(map[placement.HostShards]struct{}) } ph.rackToHostsMap[h.Host().Rack()][h] = struct{}{} + + ph.rackToWeightMap[h.Host().Rack()] = ph.rackToWeightMap[h.Host().Rack()] + h.Host().Weight() + totalWeight += h.Host().Weight() + for _, shard := range h.Shards() { ph.assignShardToHost(shard, h) } } + ph.totalWeight = totalWeight } -func (ph *placementHelper) buildHostHeap(hostCapacityAscending bool) { - overSizedRack := 0 - overSizedHosts := 0 - rackSizeMap := make(map[string]int) - for rack := range ph.rackToHostsMap { - rackHostNumber := len(ph.rackToHostsMap[rack]) - if float64(rackHostNumber)/float64(ph.getHostLen()) >= 1.0/float64(ph.rf) { - overSizedRack++ - overSizedHosts += rackHostNumber +func (ph *placementHelper) buildTargetLoad() { + overWeightedRack := 0 + overWeight := uint32(0) + for _, weight := range ph.rackToWeightMap { + if isRackOverWeight(weight, ph.totalWeight, ph.rf) { + overWeightedRack++ + overWeight += weight } - rackSizeMap[rack] = rackHostNumber } - targetLoadMap := ph.buildTargetLoadMap(rackSizeMap, overSizedRack, overSizedHosts) - ph.targetLoad = targetLoadMap - ph.hostHeap = newHostHeap(ph.hostShards, hostCapacityAscending, targetLoadMap, ph.rackToHostsMap) -} - -func (ph *placementHelper) buildTargetLoadMap(rackSizeMap map[string]int, overSizedRackLen, overSizedHostLen int) map[string]int { targetLoad := make(map[string]int) for _, host := range ph.hostShards { - rackSize := rackSizeMap[host.Host().Rack()] - if float64(rackSize)/float64(ph.getHostLen()) >= 1.0/float64(ph.rf) { + rackWeight := ph.rackToWeightMap[host.Host().Rack()] + if isRackOverWeight(rackWeight, ph.totalWeight, ph.rf) { // if the host is on a over-sized rack, the target load is topped at shardLen / rackSize - targetLoad[host.Host().ID()] = int(math.Ceil(float64(ph.getShardLen()) / float64(rackSize))) + targetLoad[host.Host().ID()] = int(math.Ceil(float64(ph.getShardLen()) * float64(host.Host().Weight()) / float64(rackWeight))) } else { // if the host is on a normal rack, get the target load with aware of other over-sized rack - targetLoad[host.Host().ID()] = ph.getShardLen() * (ph.rf - overSizedRackLen) / (ph.getHostLen() - overSizedHostLen) + targetLoad[host.Host().ID()] = ph.getShardLen() * (ph.rf - overWeightedRack) * int(host.Host().Weight()) / int(ph.totalWeight-overWeight) } } - return targetLoad + ph.targetLoad = targetLoad } -func (ph placementHelper) getHostLen() int { - return len(ph.hostShards) +func isRackOverWeight(rackWeight, totalWeight uint32, rf int) bool { + return float64(rackWeight)/float64(totalWeight) >= 1.0/float64(rf) } func (ph placementHelper) getShardLen() int { return len(ph.uniqueShards) } -func (ph placementHelper) HasNoRackConflict(shard uint32, from placement.HostShards, toRack string) bool { - if from != nil { - if from.Host().Rack() == toRack { - return true - } - } - for host := range ph.shardToHostMap[shard] { - if host.Host().Rack() == toRack { - return false - } - } - return true -} - func (ph placementHelper) canAssignHost(shard uint32, from, to placement.HostShards) bool { if to.ContainsShard(shard) { return false @@ -272,6 +297,10 @@ func (ph placementHelper) canAssignHost(shard uint32, from, to placement.HostSha } func (ph placementHelper) assignShardToHost(shard uint32, to placement.HostShards) { + if to == nil { + return + } + to.AddShard(shard) if _, exist := ph.shardToHostMap[shard]; !exist { @@ -281,72 +310,24 @@ func (ph placementHelper) assignShardToHost(shard uint32, to placement.HostShard } func (ph placementHelper) removeShardFromHost(shard uint32, from placement.HostShards) { - from.RemoveShard(shard) - - delete(ph.shardToHostMap[shard], from) -} - -// placeToRacksOtherThanOrigin move shards from a host to the rest of the cluster -// the goal of this function is to assign "some" of the shards to the hosts in other racks -func (ph placementHelper) placeToRacksOtherThanOrigin(shards map[uint32]int, from placement.HostShards) { - var sameRack []placement.HostShards - var triedHosts []placement.HostShards - for ph.hostHeap.Len() > 0 { - tryHost := heap.Pop(ph.hostHeap).(placement.HostShards) - if from != nil && tryHost.Host().Rack() == from.Host().Rack() { - // do not place to same rack for now - sameRack = append(sameRack, tryHost) - } else { - triedHosts = append(triedHosts, tryHost) - } + if from == nil { + return } - for _, h := range triedHosts { - heap.Push(ph.hostHeap, h) - } - - triedHosts = triedHosts[:0] -outer: - for shard := range shards { - for ph.hostHeap.Len() > 0 { - tryHost := heap.Pop(ph.hostHeap).(placement.HostShards) - triedHosts = append(triedHosts, tryHost) - if ph.GetTargetLoadForHost(tryHost.Host().ID())-tryHost.ShardsLen() <= 0 { - // this is where "some" is, at this point the best host option in the cluster - // from a different rack has reached its target load, time to break out of the loop - break outer - } - if ph.canAssignHost(shard, from, tryHost) { - ph.assignShardToHost(shard, tryHost) - delete(shards, shard) - break - } - } - - for _, triedHost := range triedHosts { - heap.Push(ph.hostHeap, triedHost) - } - triedHosts = triedHosts[:0] - } - - for _, host := range sameRack { - heap.Push(ph.hostHeap, host) - } - for _, triedHost := range triedHosts { - heap.Push(ph.hostHeap, triedHost) - } + from.RemoveShard(shard) + delete(ph.shardToHostMap[shard], from) } // hostHeap provides an easy way to get best candidate host to assign/steal a shard type hostHeap struct { hosts []placement.HostShards - rackToHostsMap map[string]map[placement.HostShards]struct{} + rackToWeightMap map[string]uint32 targetLoad map[string]int hostCapacityAscending bool } -func newHostHeap(hosts []placement.HostShards, hostCapacityAscending bool, targetLoad map[string]int, rackToHostMap map[string]map[placement.HostShards]struct{}) *hostHeap { - hHeap := &hostHeap{hostCapacityAscending: hostCapacityAscending, hosts: hosts, targetLoad: targetLoad, rackToHostsMap: rackToHostMap} +func newHostHeap(hosts []placement.HostShards, hostCapacityAscending bool, targetLoad map[string]int, rackToWeightMap map[string]uint32) *hostHeap { + hHeap := &hostHeap{hostCapacityAscending: hostCapacityAscending, hosts: hosts, targetLoad: targetLoad, rackToWeightMap: rackToWeightMap} heap.Init(hHeap) return hHeap } @@ -367,7 +348,7 @@ func (hh hostHeap) Less(i, j int) bool { // since it tends to be more picky in accepting shards if leftLoadOnI > 0 && leftLoadOnJ > 0 { if hostI.Host().Rack() != hostJ.Host().Rack() { - return len(hh.rackToHostsMap[hostI.Host().Rack()]) > len(hh.rackToHostsMap[hostJ.Host().Rack()]) + return hh.rackToWeightMap[hostI.Host().Rack()] > hh.rackToWeightMap[hostJ.Host().Rack()] } } // compare left capacity on both hosts @@ -392,3 +373,27 @@ func (hh *hostHeap) Pop() interface{} { hh.hosts = hh.hosts[0 : n-1] return host } + +func addHostToPlacement(ps placement.Snapshot, addingHost placement.Host) (placement.Snapshot, placement.HostShards, error) { + if ps.HostShard(addingHost.ID()) != nil { + return nil, nil, errAddingHostAlreadyExist + } + hss := placement.NewHostShards(addingHost) + return placement.NewPlacementSnapshot(append(ps.HostShards(), hss), ps.Shards(), ps.Replicas()), hss, nil +} + +func removeHostFromPlacement(ps placement.Snapshot, leavingHost placement.Host) (placement.Snapshot, placement.HostShards, error) { + leavingHostShards := ps.HostShard(leavingHost.ID()) + if leavingHostShards == nil { + return nil, nil, errHostAbsent + } + + var hsArr []placement.HostShards + for i, phs := range ps.HostShards() { + if phs.Host().ID() == leavingHost.ID() { + hsArr = append(ps.HostShards()[:i], ps.HostShards()[i+1:]...) + break + } + } + return placement.NewPlacementSnapshot(hsArr, ps.Shards(), ps.Replicas()), leavingHostShards, nil +} diff --git a/src/cluster/placement/placement.go b/src/cluster/placement/placement.go index b99401fdb9..a62eba65c1 100644 --- a/src/cluster/placement/placement.go +++ b/src/cluster/placement/placement.go @@ -46,13 +46,13 @@ type snapshot struct { func NewEmptyPlacementSnapshot(hosts []Host, ids []uint32) Snapshot { hostShards := make([]HostShards, len(hosts), len(hosts)) for i, ph := range hosts { - hostShards[i] = NewEmptyHostShardsFromHost(ph) + hostShards[i] = NewHostShards(ph) } return snapshot{hostShards: hostShards, shards: ids, rf: 0} } -// NewPlacementSnapshot returns a placement +// NewPlacementSnapshot returns a placement snapshot func NewPlacementSnapshot(hss []HostShards, shards []uint32, rf int) Snapshot { return snapshot{hostShards: hss, rf: rf, shards: shards} } @@ -122,6 +122,19 @@ func (ps snapshot) Validate() error { return nil } +// Copy copies a snapshot +func (ps snapshot) Copy() Snapshot { + return snapshot{hostShards: copyHostShards(ps.HostShards()), rf: ps.Replicas(), shards: ps.Shards()} +} + +func copyHostShards(hss []HostShards) []HostShards { + copied := make([]HostShards, len(hss)) + for i, hs := range hss { + copied[i] = newHostShards(hs.Host(), hs.Shards()) + } + return copied +} + // NewPlacementFromJSON creates a Snapshot from JSON func NewPlacementFromJSON(data []byte) (Snapshot, error) { var ps snapshot @@ -163,6 +176,7 @@ func newHostShardsJSON(hs HostShards) hostShardsJSON { ID: hs.Host().ID(), Rack: hs.Host().Rack(), Zone: hs.Host().Zone(), + Weight: hs.Host().Weight(), Shards: shards, } } @@ -214,11 +228,12 @@ type hostShardsJSON struct { ID string `json:"id"` Rack string `json:"rack"` Zone string `json:"zone"` + Weight uint32 `json:"weight"` Shards []uint32 `json:"shards"` } func hostShardsFromJSON(hsj hostShardsJSON) (HostShards, error) { - hs := NewEmptyHostShards(hsj.ID, hsj.Rack, hsj.Zone) + hs := NewHostShards(NewHost(hsj.ID, hsj.Rack, hsj.Zone, hsj.Weight)) for _, shard := range hsj.Shards { hs.AddShard(shard) } @@ -234,15 +249,18 @@ type hostShards struct { shardsSet map[uint32]struct{} } -// NewEmptyHostShardsFromHost returns a HostShards with no shards assigned -func NewEmptyHostShardsFromHost(host Host) HostShards { - m := make(map[uint32]struct{}) - return &hostShards{host: host, shardsSet: m} +// NewHostShards returns a HostShards with no shards assigned +func NewHostShards(host Host) HostShards { + return newHostShards(host, nil) } -// NewEmptyHostShards returns a HostShards with no shards assigned -func NewEmptyHostShards(id, rack, zone string) HostShards { - return NewEmptyHostShardsFromHost(NewHost(id, rack, zone)) +// newHostShards returns a HostShards with shards +func newHostShards(host Host, shards []uint32) HostShards { + m := make(map[uint32]struct{}, len(shards)) + for _, s := range shards { + m[s] = struct{}{} + } + return &hostShards{host: host, shardsSet: m} } func (h hostShards) Host() Host { @@ -286,14 +304,15 @@ func ConvertShardSliceToMap(ids []uint32) map[uint32]int { } // NewHost returns a Host -func NewHost(id, rack, zone string) Host { - return host{id: id, rack: rack, zone: zone} +func NewHost(id, rack, zone string, weight uint32) Host { + return host{id: id, rack: rack, zone: zone, weight: weight} } type host struct { - id string - rack string - zone string + id string + rack string + zone string + weight uint32 } func (h host) ID() string { @@ -308,8 +327,12 @@ func (h host) Zone() string { return h.zone } +func (h host) Weight() uint32 { + return h.weight +} + func (h host) String() string { - return fmt.Sprintf("[id:%s, rack:%s, zone:%s]", h.id, h.rack, h.zone) + return fmt.Sprintf("[id:%s, rack:%s, zone:%s, weight:%v]", h.id, h.rack, h.zone, h.weight) } // NewOptions returns an Options instance @@ -318,8 +341,9 @@ func NewOptions() Options { } type options struct { - looseRackCheck bool - acrossZones bool + looseRackCheck bool + acrossZones bool + allowPartialReplace bool } func (o options) LooseRackCheck() bool { @@ -339,3 +363,12 @@ func (o options) SetAcrossZones(acrossZones bool) Options { o.acrossZones = acrossZones return o } + +func (o options) AllowPartialReplace() bool { + return o.allowPartialReplace +} + +func (o options) SetAllowPartialReplace(allowPartialReplace bool) Options { + o.allowPartialReplace = allowPartialReplace + return o +} diff --git a/src/cluster/placement/placement_test.go b/src/cluster/placement/placement_test.go index 443657b923..0c33298ec4 100644 --- a/src/cluster/placement/placement_test.go +++ b/src/cluster/placement/placement_test.go @@ -28,32 +28,32 @@ import ( ) func TestSnapshot(t *testing.T) { - h1 := NewEmptyHostShards("r1h1", "r1", "z1") + h1 := NewHostShards(NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) h1.AddShard(3) - h2 := NewEmptyHostShards("r2h2", "r2", "z1") + h2 := NewHostShards(NewHost("r2h2", "r2", "z1", 1)) h2.AddShard(4) h2.AddShard(5) h2.AddShard(6) - h3 := NewEmptyHostShards("r3h3", "r3", "z1") + h3 := NewHostShards(NewHost("r3h3", "r3", "z1", 1)) h3.AddShard(1) h3.AddShard(3) h3.AddShard(5) - h4 := NewEmptyHostShards("r4h4", "r4", "z1") + h4 := NewHostShards(NewHost("r4h4", "r4", "z1", 1)) h4.AddShard(2) h4.AddShard(4) h4.AddShard(6) - h5 := NewEmptyHostShards("r5h5", "r5", "z1") + h5 := NewHostShards(NewHost("r5h5", "r5", "z1", 1)) h5.AddShard(5) h5.AddShard(6) h5.AddShard(1) - h6 := NewEmptyHostShards("r6h6", "r6", "z1") + h6 := NewHostShards(NewHost("r6h6", "r6", "z1", 1)) h6.AddShard(2) h6.AddShard(3) h6.AddShard(4) @@ -75,7 +75,7 @@ func TestSnapshot(t *testing.T) { assert.Equal(t, ids, s.Shards()) assert.Equal(t, hss, s.HostShards()) - s = NewEmptyPlacementSnapshot([]Host{NewHost("h1", "r1", "z1"), NewHost("h2", "r2", "z1")}, ids) + s = NewEmptyPlacementSnapshot([]Host{NewHost("h1", "r1", "z1", 1), NewHost("h2", "r2", "z1", 1)}, ids) assert.Equal(t, 0, s.Replicas()) assert.Equal(t, ids, s.Shards()) assert.NoError(t, s.Validate()) @@ -84,12 +84,12 @@ func TestSnapshot(t *testing.T) { func TestValidate(t *testing.T) { ids := []uint32{1, 2, 3, 4, 5, 6} - h1 := NewEmptyHostShards("r1h1", "r1", "z1") + h1 := NewHostShards(NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) h1.AddShard(3) - h2 := NewEmptyHostShards("r2h2", "r2", "z1") + h2 := NewHostShards(NewHost("r2h2", "r2", "z1", 1)) h2.AddShard(4) h2.AddShard(5) h2.AddShard(6) @@ -104,7 +104,7 @@ func TestValidate(t *testing.T) { assert.Error(t, s.Validate()) // host missing a shard - h1 = NewEmptyHostShards("r1h1", "r1", "z1") + h1 = NewHostShards(NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) h1.AddShard(3) @@ -112,7 +112,7 @@ func TestValidate(t *testing.T) { h1.AddShard(5) h1.AddShard(6) - h2 = NewEmptyHostShards("r2h2", "r2", "z1") + h2 = NewHostShards(NewHost("r2h2", "r2", "z1", 1)) h2.AddShard(2) h2.AddShard(3) h2.AddShard(4) @@ -125,7 +125,7 @@ func TestValidate(t *testing.T) { assert.Equal(t, errTotalShardsMismatch, s.Validate()) // host contains shard that's unexpected to be in snapshot - h1 = NewEmptyHostShards("r1h1", "r1", "z1") + h1 = NewHostShards(NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) h1.AddShard(3) @@ -134,7 +134,7 @@ func TestValidate(t *testing.T) { h1.AddShard(6) h1.AddShard(7) - h2 = NewEmptyHostShards("r2h2", "r2", "z1") + h2 = NewHostShards(NewHost("r2h2", "r2", "z1", 1)) h2.AddShard(2) h2.AddShard(3) h2.AddShard(4) @@ -147,12 +147,12 @@ func TestValidate(t *testing.T) { assert.Equal(t, errUnexpectedShards, s.Validate()) // duplicated shards - h1 = NewEmptyHostShards("r1h1", "r1", "z1") + h1 = NewHostShards(NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(2) h1.AddShard(3) h1.AddShard(4) - h2 = NewEmptyHostShards("r2h2", "r2", "z1") + h2 = NewHostShards(NewHost("r2h2", "r2", "z1", 1)) h2.AddShard(4) h2.AddShard(5) h2.AddShard(6) @@ -163,17 +163,17 @@ func TestValidate(t *testing.T) { assert.Equal(t, errDuplicatedShards, s.Validate()) // three shard 2 and only one shard 4 - h1 = NewEmptyHostShards("r1h1", "r1", "z1") + h1 = NewHostShards(NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) h1.AddShard(3) - h2 = NewEmptyHostShards("r2h2", "r2", "z1") + h2 = NewHostShards(NewHost("r2h2", "r2", "z1", 1)) h2.AddShard(2) h2.AddShard(3) h2.AddShard(4) - h3 := NewEmptyHostShards("r3h3", "r3", "z1") + h3 := NewHostShards(NewHost("r3h3", "r3", "z1", 1)) h3.AddShard(1) h3.AddShard(2) @@ -184,21 +184,30 @@ func TestValidate(t *testing.T) { func TestSnapshotMarshalling(t *testing.T) { invalidJSON := `{ - "abc":{"ID":123,"Rack":"r1","Shards":[0,7,11]} + "abc":{"ID":123,"Rack":"r1","Zone":"z1","Weight":50,"Shards":[0,7,11]} }` data := []byte(invalidJSON) ps, err := NewPlacementFromJSON(data) assert.Nil(t, ps) assert.Error(t, err) + ps, err = NewPlacementFromJSON([]byte(`{ + "h1":{"ID":"h1","Rack":"r1","Zone":"z1","Weight":50,"Shards":[0,7,11]} + }`)) + hs := ps.HostShard("h1") + assert.NotNil(t, hs) + assert.Equal(t, "[id:h1, rack:r1, zone:z1, weight:50]", hs.Host().String()) + assert.Equal(t, 3, hs.ShardsLen()) + assert.Equal(t, map[uint32]struct{}{0: struct{}{}, 7: struct{}{}, 11: struct{}{}}, hs.(*hostShards).shardsSet) + validJSON := `{ - "r2h4": {"ID":"r2h4","Rack":"r2","Shards":[6,13,15]}, - "r3h5": {"ID":"r3h5","Rack":"r3","Shards":[2,8,19]}, - "r4h6": {"ID":"r4h6","Rack":"r4","Shards":[3,9,18]}, - "r1h1": {"ID":"r1h1","Rack":"r1","Shards":[0,7,11]}, - "r2h3": {"ID":"r2h3","Rack":"r2","Shards":[1,4,12]}, - "r5h7": {"ID":"r5h7","Rack":"r5","Shards":[10,14]}, - "r6h9": {"ID":"r6h9","Rack":"r6","Shards":[5,16,17]} + "r2h4": {"ID":"r2h4","Rack":"r2","Zone":"z1","Weight":50,"Shards":[6,13,15]}, + "r3h5": {"ID":"r3h5","Rack":"r3","Zone":"z1","Weight":50,"Shards":[2,8,19]}, + "r4h6": {"ID":"r4h6","Rack":"r4","Zone":"z1","Weight":50,"Shards":[3,9,18]}, + "r1h1": {"ID":"r1h1","Rack":"r1","Zone":"z1","Weight":50,"Shards":[0,7,11]}, + "r2h3": {"ID":"r2h3","Rack":"r2","Zone":"z1","Weight":50,"Shards":[1,4,12]}, + "r5h7": {"ID":"r5h7","Rack":"r5","Zone":"z1","Weight":50,"Shards":[10,14]}, + "r6h9": {"ID":"r6h9","Rack":"r6","Zone":"z1","Weight":50,"Shards":[5,16,17]} }` data = []byte(validJSON) ps, err = NewPlacementFromJSON(data) @@ -211,13 +220,13 @@ func TestSnapshotMarshalling(t *testing.T) { // an extra replica for shard 1 invalidPlacementJSON := `{ - "r1h1": {"ID":"r1h1","Rack":"r1","Shards":[0,1,7,11]}, - "r2h3": {"ID":"r2h3","Rack":"r2","Shards":[1,4,12]}, - "r2h4": {"ID":"r2h4","Rack":"r2","Shards":[6,13,15]}, - "r3h5": {"ID":"r3h5","Rack":"r3","Shards":[2,8,19]}, - "r4h6": {"ID":"r4h6","Rack":"r4","Shards":[3,9,18]}, - "r5h7": {"ID":"r5h7","Rack":"r5","Shards":[10,14]}, - "r6h9": {"ID":"r6h9","Rack":"r6","Shards":[5,16,17]} + "r1h1": {"ID":"r1h1","Rack":"r1","Zone":"z1","Weight":50,"Shards":[0,1,7,11]}, + "r2h3": {"ID":"r2h3","Rack":"r2","Zone":"z1","Weight":50,"Shards":[1,4,12]}, + "r2h4": {"ID":"r2h4","Rack":"r2","Zone":"z1","Weight":50,"Shards":[6,13,15]}, + "r3h5": {"ID":"r3h5","Rack":"r3","Zone":"z1","Weight":50,"Shards":[2,8,19]}, + "r4h6": {"ID":"r4h6","Rack":"r4","Zone":"z1","Weight":50,"Shards":[3,9,18]}, + "r5h7": {"ID":"r5h7","Rack":"r5","Zone":"z1","Weight":50,"Shards":[10,14]}, + "r6h9": {"ID":"r6h9","Rack":"r6","Zone":"z1","Weight":50,"Shards":[5,16,17]} }` data = []byte(invalidPlacementJSON) ps, err = NewPlacementFromJSON(data) @@ -226,13 +235,13 @@ func TestSnapshotMarshalling(t *testing.T) { // an extra replica for shard 0 on r1h1 invalidPlacementJSON = `{ - "r1h1": {"ID":"r1h1","Rack":"r1","Shards":[0,0,7,11]}, - "r2h3": {"ID":"r2h3","Rack":"r2","Shards":[1,4,12]}, - "r2h4": {"ID":"r2h4","Rack":"r2","Shards":[6,13,15]}, - "r3h5": {"ID":"r3h5","Rack":"r3","Shards":[2,8,19]}, - "r4h6": {"ID":"r4h6","Rack":"r4","Shards":[3,9,18]}, - "r5h7": {"ID":"r5h7","Rack":"r5","Shards":[10,14]}, - "r6h9": {"ID":"r6h9","Rack":"r6","Shards":[5,16,17]} + "r1h1": {"ID":"r1h1","Rack":"r1","Zone":"z1","Weight":50,"Shards":[0,0,7,11]}, + "r2h3": {"ID":"r2h3","Rack":"r2","Zone":"z1","Weight":50,"Shards":[1,4,12]}, + "r2h4": {"ID":"r2h4","Rack":"r2","Zone":"z1","Weight":50,"Shards":[6,13,15]}, + "r3h5": {"ID":"r3h5","Rack":"r3","Zone":"z1","Weight":50,"Shards":[2,8,19]}, + "r4h6": {"ID":"r4h6","Rack":"r4","Zone":"z1","Weight":50,"Shards":[3,9,18]}, + "r5h7": {"ID":"r5h7","Rack":"r5","Zone":"z1","Weight":50,"Shards":[10,14]}, + "r6h9": {"ID":"r6h9","Rack":"r6","Zone":"z1","Weight":50,"Shards":[5,16,17]} }` data = []byte(invalidPlacementJSON) ps, err = NewPlacementFromJSON(data) @@ -241,12 +250,12 @@ func TestSnapshotMarshalling(t *testing.T) { } func TestHostShards(t *testing.T) { - h1 := NewEmptyHostShards("r1h1", "r1", "z1") + h1 := NewHostShards(NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) h1.AddShard(3) - assert.Equal(t, "[id:r1h1, rack:r1, zone:z1]", h1.Host().String()) + assert.Equal(t, "[id:r1h1, rack:r1, zone:z1, weight:1]", h1.Host().String()) assert.True(t, h1.ContainsShard(1)) assert.False(t, h1.ContainsShard(100)) @@ -262,13 +271,44 @@ func TestHostShards(t *testing.T) { assert.Equal(t, "r1", h1.Host().Rack()) } +func TestCopy(t *testing.T) { + h1 := NewHostShards(NewHost("r1h1", "r1", "z1", 1)) + h1.AddShard(1) + h1.AddShard(2) + h1.AddShard(3) + + h2 := NewHostShards(NewHost("r2h2", "r2", "z1", 1)) + h2.AddShard(4) + h2.AddShard(5) + h2.AddShard(6) + + hss := []HostShards{h1, h2} + + ids := []uint32{1, 2, 3, 4, 5, 6} + s := NewPlacementSnapshot(hss, ids, 1) + copy := s.Copy() + assert.Equal(t, s.HostsLen(), copy.HostsLen()) + assert.Equal(t, s.Shards(), copy.Shards()) + assert.Equal(t, s.Replicas(), copy.Replicas()) + for _, hs := range s.HostShards() { + assert.Equal(t, copy.HostShard(hs.Host().ID()), hs) + // make sure they are different objects, updating one won't update the other + hs.AddShard(100) + assert.NotEqual(t, copy.HostShard(hs.Host().ID()), hs) + } +} + func TestOptions(t *testing.T) { o := NewOptions() assert.False(t, o.LooseRackCheck()) + assert.False(t, o.AcrossZones()) + assert.False(t, o.AllowPartialReplace()) o = o.SetLooseRackCheck(true) assert.True(t, o.LooseRackCheck()) o = o.SetAcrossZones(true) assert.True(t, o.AcrossZones()) + o = o.SetAllowPartialReplace(true) + assert.True(t, o.AllowPartialReplace()) } func testSnapshotJSONRoundTrip(t *testing.T, s Snapshot) { diff --git a/src/cluster/placement/planner/planner_test.go b/src/cluster/placement/planner/planner_test.go index cc68f39a3f..ca97b51b90 100644 --- a/src/cluster/placement/planner/planner_test.go +++ b/src/cluster/placement/planner/planner_test.go @@ -30,32 +30,32 @@ import ( ) func TestDeployment(t *testing.T) { - h1 := placement.NewEmptyHostShards("r1h1", "r1", "z1") + h1 := placement.NewHostShards(placement.NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) h1.AddShard(3) - h2 := placement.NewEmptyHostShards("r2h2", "r2", "z1") + h2 := placement.NewHostShards(placement.NewHost("r2h2", "r2", "z1", 1)) h2.AddShard(4) h2.AddShard(5) h2.AddShard(6) - h3 := placement.NewEmptyHostShards("r3h3", "r3", "z1") + h3 := placement.NewHostShards(placement.NewHost("r3h3", "r3", "z1", 1)) h3.AddShard(1) h3.AddShard(3) h3.AddShard(5) - h4 := placement.NewEmptyHostShards("r4h4", "r4", "z1") + h4 := placement.NewHostShards(placement.NewHost("r4h4", "r4", "z1", 1)) h4.AddShard(2) h4.AddShard(4) h4.AddShard(6) - h5 := placement.NewEmptyHostShards("r5h5", "r5", "z1") + h5 := placement.NewHostShards(placement.NewHost("r5h5", "r5", "z1", 1)) h5.AddShard(5) h5.AddShard(6) h5.AddShard(1) - h6 := placement.NewEmptyHostShards("r6h6", "r6", "z1") + h6 := placement.NewHostShards(placement.NewHost("r6h6", "r6", "z1", 1)) h6.AddShard(2) h6.AddShard(3) h6.AddShard(4) @@ -75,39 +75,39 @@ func TestDeployment(t *testing.T) { } func TestDeploymentWithThreeReplica(t *testing.T) { - h1 := placement.NewEmptyHostShards("r1h1", "r1", "z1") + h1 := placement.NewHostShards(placement.NewHost("r1h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) - h2 := placement.NewEmptyHostShards("r2h2", "r2", "z1") + h2 := placement.NewHostShards(placement.NewHost("r2h2", "r2", "z1", 1)) h2.AddShard(3) h2.AddShard(4) - h3 := placement.NewEmptyHostShards("r3h3", "r3", "z1") + h3 := placement.NewHostShards(placement.NewHost("r3h3", "r3", "z1", 1)) h3.AddShard(5) h3.AddShard(6) - h4 := placement.NewEmptyHostShards("r4h4", "r4", "z1") + h4 := placement.NewHostShards(placement.NewHost("r4h4", "r4", "z1", 1)) h4.AddShard(1) h4.AddShard(3) - h5 := placement.NewEmptyHostShards("r5h5", "r5", "z1") + h5 := placement.NewHostShards(placement.NewHost("r5h5", "r5", "z1", 1)) h5.AddShard(4) h5.AddShard(6) - h6 := placement.NewEmptyHostShards("r6h6", "r6", "z1") + h6 := placement.NewHostShards(placement.NewHost("r6h6", "r6", "z1", 1)) h6.AddShard(2) h6.AddShard(5) - h7 := placement.NewEmptyHostShards("r7h7", "r7", "z1") + h7 := placement.NewHostShards(placement.NewHost("r7h7", "r7", "z1", 1)) h7.AddShard(2) h7.AddShard(3) - h8 := placement.NewEmptyHostShards("r8h8", "r8", "z1") + h8 := placement.NewHostShards(placement.NewHost("r8h8", "r8", "z1", 1)) h8.AddShard(4) h8.AddShard(5) - h9 := placement.NewEmptyHostShards("r9h9", "r9", "z1") + h9 := placement.NewHostShards(placement.NewHost("r9h9", "r9", "z1", 1)) h9.AddShard(6) h9.AddShard(1) @@ -126,25 +126,31 @@ func TestDeploymentWithThreeReplica(t *testing.T) { } func TestRemoveHostShards(t *testing.T) { - h1 := placement.NewEmptyHostShards("r1h1", "r1", "z1") - h2 := placement.NewEmptyHostShards("r2h2", "r2", "z1") - h3 := placement.NewEmptyHostShards("r3h3", "r3", "z1") - h4 := placement.NewEmptyHostShards("r4h4", "r4", "z1") + h1 := placement.NewHostShards(placement.NewHost("r1h1", "r1", "z1", 1)) + h2 := placement.NewHostShards(placement.NewHost("r2h2", "r2", "z1", 1)) + h3 := placement.NewHostShards(placement.NewHost("r3h3", "r3", "z1", 1)) + h4 := placement.NewHostShards(placement.NewHost("r4h4", "r4", "z1", 1)) hss := []placement.HostShards{h1, h2, h3, h4} left := removeHostShards(hss, h4) assert.Equal(t, 3, len(left)) - left = removeHostShards(hss, placement.NewEmptyHostShards("r5h5", "r5", "z1")) + left = removeHostShards(hss, placement.NewHostShards(placement.NewHost("r5h5", "r5", "z1", 1))) assert.Equal(t, 4, len(left)) } func TestSort(t *testing.T) { var steps sortableSteps - steps = append(steps, []placement.HostShards{placement.NewEmptyHostShards("", "", ""), placement.NewEmptyHostShards("", "", "")}) - steps = append(steps, []placement.HostShards{placement.NewEmptyHostShards("", "", ""), placement.NewEmptyHostShards("", "", ""), placement.NewEmptyHostShards("", "", "")}) - steps = append(steps, []placement.HostShards{placement.NewEmptyHostShards("", "", "")}) + steps = append(steps, []placement.HostShards{ + placement.NewHostShards(placement.NewHost("", "", "", 1)), + placement.NewHostShards(placement.NewHost("", "", "", 1))}) + steps = append(steps, []placement.HostShards{ + placement.NewHostShards(placement.NewHost("", "", "", 1)), + placement.NewHostShards(placement.NewHost("", "", "", 1)), + placement.NewHostShards(placement.NewHost("", "", "", 1))}) + steps = append(steps, []placement.HostShards{ + placement.NewHostShards(placement.NewHost("", "", "", 1))}) sort.Sort(steps) assert.Equal(t, 3, len(steps)) diff --git a/src/cluster/placement/service/placementservice.go b/src/cluster/placement/service/placementservice.go index aece41e237..95a7565d9e 100644 --- a/src/cluster/placement/service/placementservice.go +++ b/src/cluster/placement/service/placementservice.go @@ -22,6 +22,7 @@ package service import ( "errors" + "fmt" "sort" "github.com/m3db/m3cluster/placement" @@ -137,12 +138,12 @@ func (ps placementService) ReplaceHost(service string, leavingHost placement.Hos return errHostAbsent } - var addingHost placement.Host - if addingHost, err = ps.findReplaceHost(s, candidateHosts, leavingHostShard); err != nil { + addingHosts, err := ps.findReplaceHost(s, candidateHosts, leavingHostShard) + if err != nil { return err } - if s, err = ps.algo.ReplaceHost(s, leavingHost, addingHost); err != nil { + if s, err = ps.algo.ReplaceHost(s, leavingHost, addingHosts); err != nil { return err } return ps.ss.SaveSnapshotForService(service, s) @@ -162,25 +163,6 @@ func getNewHostsToPlacement(s placement.Snapshot, hosts []placement.Host) []plac return hs } -type rackLen struct { - rack string - len int -} - -type rackLens []rackLen - -func (rls rackLens) Len() int { - return len(rls) -} - -func (rls rackLens) Less(i, j int) bool { - return rls[i].len < rls[j].len -} - -func (rls rackLens) Swap(i, j int) { - rls[i], rls[j] = rls[j], rls[i] -} - func (ps placementService) findAddingHost(s placement.Snapshot, candidateHosts []placement.Host) (placement.Host, error) { // filter out already existing hosts candidateHosts = getNewHostsToPlacement(s, candidateHosts) @@ -203,14 +185,18 @@ func (ps placementService) findAddingHost(s placement.Snapshot, candidateHosts [ } // otherwise sort the racks in the current placement by capacity and find a host from least sized rack - rackLens := make(rackLens, 0, len(placementRackHostMap)) + racks := make(sortableThings, 0, len(placementRackHostMap)) for rack, hss := range placementRackHostMap { - rackLens = append(rackLens, rackLen{rack: rack, len: len(hss)}) + weight := 0 + for _, hs := range hss { + weight += int(hs.Weight()) + } + racks = append(racks, sortableValue{value: rack, weight: weight}) } - sort.Sort(rackLens) + sort.Sort(racks) - for _, rackLen := range rackLens { - if hs, exist := candidateRackHostMap[rackLen.rack]; exist { + for _, rackLen := range racks { + if hs, exist := candidateRackHostMap[rackLen.value.(string)]; exist { for _, host := range hs { return host, nil } @@ -220,10 +206,13 @@ func (ps placementService) findAddingHost(s placement.Snapshot, candidateHosts [ return nil, errNoValidHost } -func (ps placementService) findReplaceHost(s placement.Snapshot, candidateHosts []placement.Host, leaving placement.HostShards) (placement.Host, error) { +func (ps placementService) findReplaceHost( + s placement.Snapshot, + candidateHosts []placement.Host, + leaving placement.HostShards, +) ([]placement.Host, error) { // filter out already existing hosts candidateHosts = getNewHostsToPlacement(s, candidateHosts) - candidateHosts, err := filterZones(s, ps.options, candidateHosts) if err != nil { return nil, err @@ -233,39 +222,114 @@ func (ps placementService) findReplaceHost(s placement.Snapshot, candidateHosts return nil, errNoValidHost } // build rackHostMap from candidate hosts - candidateRackHostMap := buildRackHostMap(candidateHosts) + rackHostMap := buildRackHostMap(candidateHosts) - // if there is a host from the same rack can be added, return it. - if hs, exist := candidateRackHostMap[leaving.Host().Rack()]; exist { - return hs[0], nil + // otherwise sort the candidate hosts by the number of conflicts + ph := algo.NewPlacementHelper(s, ps.options) + hosts := make([]sortableValue, 0, len(rackHostMap)) + for rack, hostsInRack := range rackHostMap { + conflicts := 0 + for _, shard := range leaving.Shards() { + if !ph.HasNoRackConflict(shard, leaving, rack) { + conflicts++ + } + } + for _, host := range hostsInRack { + hosts = append(hosts, sortableValue{value: host, weight: conflicts}) + } + } + + groups := groupHostsByConflict(hosts, ps.options.LooseRackCheck()) + if len(groups) == 0 { + return nil, errNoValidHost } - return ps.findHostWithRackCheck(s, candidateRackHostMap, leaving) + result, leftWeight := fillWeight(groups, int(leaving.Host().Weight())) + + if leftWeight > 0 { + return nil, fmt.Errorf("could not find enough host to replace %s, %v weight could not be replaced", + leaving.Host().String(), leftWeight) + } + return result, nil } -func (ps placementService) findHostWithRackCheck(p placement.Snapshot, rackHostMap map[string][]placement.Host, leaving placement.HostShards) (placement.Host, error) { - // otherwise sort the candidate hosts by the number of conflicts - ph := algo.NewPlacementHelper(ps.options, p) +func groupHostsByConflict(hostsSortedByConflicts []sortableValue, allowConflict bool) [][]placement.Host { + sort.Sort(sortableThings(hostsSortedByConflicts)) + var groups [][]placement.Host + lastSeenConflict := -1 + for _, host := range hostsSortedByConflicts { + if !allowConflict && host.weight > 0 { + break + } + if host.weight > lastSeenConflict { + lastSeenConflict = host.weight + groups = append(groups, []placement.Host{}) + } + if lastSeenConflict == host.weight { + groups[len(groups)-1] = append(groups[len(groups)-1], host.value.(placement.Host)) + } + } + return groups +} - rackLens := make(rackLens, 0, len(rackHostMap)) - for rack := range rackHostMap { - rackConflicts := 0 - for _, shard := range leaving.Shards() { - if !ph.HasNoRackConflict(shard, leaving, rack) { - rackConflicts++ - } +func fillWeight(groups [][]placement.Host, targetWeight int) ([]placement.Host, int) { + var ( + result []placement.Host + hostsInGroup []placement.Host + ) + for _, group := range groups { + hostsInGroup, targetWeight = knapsack(group, targetWeight) + result = append(result, hostsInGroup...) + if targetWeight <= 0 { + break } - rackLens = append(rackLens, rackLen{rack: rack, len: rackConflicts}) } - sort.Sort(rackLens) - if !ps.options.LooseRackCheck() { - rackLens = filterConflictRacks(rackLens) + return result, targetWeight +} + +func knapsack(hosts []placement.Host, targetWeight int) ([]placement.Host, int) { + totalWeight := 0 + for _, host := range hosts { + totalWeight += int(host.Weight()) + } + if totalWeight <= targetWeight { + return hosts[:], targetWeight - totalWeight + } + // totalWeight > targetWeight, there is a combination of hosts to meet targetWeight for sure + // we do dp until totalWeight rather than targetWeight here because we need to cover the targetWeight + // which is a little bit different than the knapsack problem that goes + weights := make([]int, totalWeight+1) + combination := make([][]placement.Host, totalWeight+1) + + // dp: weights[i][j] = max(weights[i-1][j], weights[i-1][j-host.Weight] + host.Weight) + // when there are multiple combination to reach a target weight, we prefer the one with less hosts + for i := range hosts { + // this loop needs to go from len to 1 because updating weights[] is being updated in place + for j := totalWeight; j >= 1; j-- { + weight := int(hosts[i].Weight()) + if j-weight < 0 { + continue + } + newWeight := weights[j-weight] + weight + if newWeight > weights[j] { + weights[j] = weights[j-weight] + weight + combination[j] = append(combination[j-weight], hosts[i]) + } else if newWeight == weights[j] { + // if can reach same weight, find a combination with less hosts + if len(combination[j-weight])+1 < len(combination[j]) { + combination[j] = append(combination[j-weight], hosts[i]) + } + } + } } - if len(rackLens) > 0 { - return rackHostMap[rackLens[0].rack][0], nil + for i := targetWeight; i <= totalWeight; i++ { + if weights[i] >= targetWeight { + return combination[i], targetWeight - weights[i] + } } - return nil, errNoValidHost + + panic("should never reach here") } func filterZones(p placement.Snapshot, opts placement.Options, candidateHosts []placement.Host) ([]placement.Host, error) { @@ -293,15 +357,6 @@ func filterZones(p placement.Snapshot, opts placement.Options, candidateHosts [] return validHosts, nil } -func filterConflictRacks(rls rackLens) rackLens { - for i, r := range rls { - if r.len > 0 { - return rls[:i] - } - } - return rls -} - func buildRackHostMap(candidateHosts []placement.Host) map[string][]placement.Host { result := make(map[string][]placement.Host, len(candidateHosts)) for _, host := range candidateHosts { @@ -338,3 +393,22 @@ func (ps placementService) validateInitHosts(hosts []placement.Host) error { } return nil } + +type sortableValue struct { + value interface{} + weight int +} + +type sortableThings []sortableValue + +func (things sortableThings) Len() int { + return len(things) +} + +func (things sortableThings) Less(i, j int) bool { + return things[i].weight < things[j].weight +} + +func (things sortableThings) Swap(i, j int) { + things[i], things[j] = things[j], things[i] +} diff --git a/src/cluster/placement/service/placementservice_test.go b/src/cluster/placement/service/placementservice_test.go index 1db58c523c..06d2d4c0b7 100644 --- a/src/cluster/placement/service/placementservice_test.go +++ b/src/cluster/placement/service/placementservice_test.go @@ -25,9 +25,8 @@ import ( "errors" "io/ioutil" "os" - "testing" - "sort" + "testing" "github.com/m3db/m3cluster/placement" "github.com/stretchr/testify/assert" @@ -35,7 +34,7 @@ import ( func TestGoodWorkflow(t *testing.T) { ms := NewMockStorage() - ps := NewPlacementService(placement.NewOptions().SetLooseRackCheck(false), ms) + ps := NewPlacementService(placement.NewOptions(), ms) testGoodWorkflow(t, ps, ms) ps = NewPlacementService(placement.NewOptions().SetLooseRackCheck(true), ms) @@ -43,50 +42,61 @@ func TestGoodWorkflow(t *testing.T) { } func testGoodWorkflow(t *testing.T, ps placement.Service, ms placement.SnapshotStorage) { - err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1"), placement.NewHost("r2h2", "r2", "z1")}, 10, 1) + h1 := placement.NewHost("r1h1", "r1", "z1", 2) + h2 := placement.NewHost("r2h2", "r2", "z1", 2) + h3 := placement.NewHost("r3h3", "r3", "z1", 2) + err := ps.BuildInitialPlacement("serviceA", []placement.Host{h1, h2}, 10, 1) assert.NoError(t, err) err = ps.AddReplica("serviceA") assert.NoError(t, err) - err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r3h3", "r3", "z1")}) + err = ps.AddHost("serviceA", []placement.Host{h3}) assert.NoError(t, err) - err = ps.RemoveHost("serviceA", placement.NewHost("r1h1", "r1", "z1")) - assert.NoError(t, err) - - err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1")}) + err = ps.RemoveHost("serviceA", h1) assert.NoError(t, err) err = ps.ReplaceHost("serviceA", - placement.NewHost("r2h2", "r2", "z1"), - []placement.Host{placement.NewHost("r2h3", "r2", "z1"), placement.NewHost("r4h4", "r4", "z1"), placement.NewHost("r5h5", "r5", "z1")}, + h2, + []placement.Host{ + placement.NewHost("h21", "r2", "z1", 1), + placement.NewHost("h4", "r4", "z1", 1), + h3, // already in placement + placement.NewHost("h31", "r3", "z1", 1), // conflict + }, ) assert.NoError(t, err) s, err := ms.ReadSnapshotForService("serviceA") assert.NoError(t, err) - assert.NotNil(t, s.HostShard("r2h3")) // host added from preferred rack + assert.Equal(t, 3, s.HostsLen()) + assert.NotNil(t, s.HostShard("h21")) + assert.NotNil(t, s.HostShard("h4")) - err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r2h4", "r2", "z1")}) + err = ps.AddHost("serviceA", []placement.Host{h1}) assert.NoError(t, err) - err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r3h4", "r3", "z1")}) + err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r2h4", "r2", "z1", 1)}) assert.NoError(t, err) - err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r3h5", "r3", "z1")}) + + err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r3h4", "r3", "z1", 1)}) + assert.NoError(t, err) + err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r3h5", "r3", "z1", 1)}) assert.NoError(t, err) hosts := []placement.Host{ - placement.NewHost("r1h5", "r1", "z1"), - placement.NewHost("r3h4", "r3", "z1"), - placement.NewHost("r3h5", "r3", "z1"), - placement.NewHost("r3h6", "r3", "z1"), - placement.NewHost("r2h3", "r2", "z1"), + placement.NewHost("r1h5", "r1", "z1", 1), + placement.NewHost("r3h4", "r3", "z1", 1), + placement.NewHost("r3h5", "r3", "z1", 1), + placement.NewHost("r3h6", "r3", "z1", 1), + placement.NewHost("r2h3", "r2", "z1", 1), + placement.NewHost("r4h41", "r4", "z1", 1), } err = ps.AddHost("serviceA", hosts) assert.NoError(t, err) s, err = ms.ReadSnapshotForService("serviceA") assert.NoError(t, err) - assert.NotNil(t, s.HostShard("r1h5")) // host added from most needed rack + assert.NotNil(t, s.HostShard("r4h41")) // host added from least weighted rack cleanUpTestFiles(t, "serviceA") } @@ -94,11 +104,17 @@ func testGoodWorkflow(t *testing.T, ps placement.Service, ms placement.SnapshotS func TestBadInitialPlacement(t *testing.T) { ps := NewPlacementService(placement.NewOptions(), NewMockStorage()) - err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1"), placement.NewHost("r2h2", "r2", "z1")}, 100, 2) + err := ps.BuildInitialPlacement("serviceA", []placement.Host{ + placement.NewHost("r1h1", "r1", "z1", 1), + placement.NewHost("r2h2", "r2", "z1", 1), + }, 100, 2) assert.NoError(t, err) // no shards - err = ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1"), placement.NewHost("r2h2", "r2", "z1")}, 0, 1) + err = ps.BuildInitialPlacement("serviceA", []placement.Host{ + placement.NewHost("r1h1", "r1", "z1", 1), + placement.NewHost("r2h2", "r2", "z1", 1), + }, 0, 1) assert.Error(t, err) // not enough hosts @@ -106,11 +122,17 @@ func TestBadInitialPlacement(t *testing.T) { assert.Error(t, err) // not enough racks - err = ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1"), placement.NewHost("r1h2", "r1", "z1")}, 100, 2) + err = ps.BuildInitialPlacement("serviceA", []placement.Host{ + placement.NewHost("r1h1", "r1", "z1", 1), + placement.NewHost("r1h2", "r1", "z1", 1), + }, 100, 2) assert.Error(t, err) // too many zones - err = ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1"), placement.NewHost("r2h2", "r2", "z2")}, 100, 2) + err = ps.BuildInitialPlacement("serviceA", []placement.Host{ + placement.NewHost("r1h1", "r1", "z1", 1), + placement.NewHost("r2h2", "r2", "z2", 1), + }, 100, 2) assert.Error(t, err) assert.Equal(t, errHostsAcrossZones, err) } @@ -118,7 +140,7 @@ func TestBadInitialPlacement(t *testing.T) { func TestBadAddReplica(t *testing.T) { ps := NewPlacementService(placement.NewOptions(), NewMockStorage()) - err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1")}, 10, 1) + err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1", 1)}, 10, 1) assert.NoError(t, err) // not enough racks/hosts @@ -135,89 +157,112 @@ func TestBadAddReplica(t *testing.T) { func TestBadAddHost(t *testing.T) { ps := NewPlacementService(placement.NewOptions(), NewMockStorage()) - err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1")}, 10, 1) + err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1", 1)}, 10, 1) assert.NoError(t, err) // adding host already exist - err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1")}) + err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1", 1)}) assert.Error(t, err) // too many zones - err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r2h2", "r2", "z2")}) + err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r2h2", "r2", "z2", 1)}) assert.Error(t, err) assert.Equal(t, errNoValidHost, err) // algo error - psWithErrorAlgo := placementService{algo: errorAlgorithm{}, ss: NewMockStorage(), options: placement.NewOptions().SetLooseRackCheck(false)} - err = psWithErrorAlgo.AddHost("serviceA", []placement.Host{placement.NewHost("r2h2", "r2", "z1")}) + psWithErrorAlgo := placementService{algo: errorAlgorithm{}, ss: NewMockStorage(), options: placement.NewOptions()} + err = psWithErrorAlgo.AddHost("serviceA", []placement.Host{placement.NewHost("r2h2", "r2", "z1", 1)}) assert.Error(t, err) // could not find snapshot for service - err = ps.AddHost("badService", []placement.Host{placement.NewHost("r2h2", "r2", "z1")}) + err = ps.AddHost("badService", []placement.Host{placement.NewHost("r2h2", "r2", "z1", 1)}) assert.Error(t, err) ps = NewPlacementService(placement.NewOptions().SetAcrossZones(true), NewMockStorage()) err = ps.BuildInitialPlacement("serviceA", - []placement.Host{placement.NewHost("h1", "r1", "z1"), placement.NewHost("h2", "r2", "z2")}, + []placement.Host{placement.NewHost("h1", "r1", "z1", 1), placement.NewHost("h2", "r2", "z2", 1)}, 10, 1, ) assert.NoError(t, err) ps = NewPlacementService(placement.NewOptions().SetAcrossZones(false), NewMockStorage()) - err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1")}) + err = ps.AddHost("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1", 1)}) assert.Error(t, err) assert.Equal(t, errDisableAcrossZones, err) cleanUpTestFiles(t, "serviceA") } func TestBadRemoveHost(t *testing.T) { - ps := NewPlacementService(placement.NewOptions().SetLooseRackCheck(false), NewMockStorage()) + ps := NewPlacementService(placement.NewOptions(), NewMockStorage()) - err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1")}, 10, 1) + err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1", 1)}, 10, 1) assert.NoError(t, err) // leaving host not exist - err = ps.RemoveHost("serviceA", placement.NewHost("r2h2", "r2", "z1")) + err = ps.RemoveHost("serviceA", placement.NewHost("r2h2", "r2", "z1", 1)) assert.Error(t, err) // not enough racks/hosts after removal - err = ps.RemoveHost("serviceA", placement.NewHost("r1h1", "r1", "z1")) + err = ps.RemoveHost("serviceA", placement.NewHost("r1h1", "r1", "z1", 1)) assert.Error(t, err) // could not find snapshot for service - err = ps.RemoveHost("bad service", placement.NewHost("r1h1", "r1", "z1")) + err = ps.RemoveHost("bad service", placement.NewHost("r1h1", "r1", "z1", 1)) assert.Error(t, err) cleanUpTestFiles(t, "serviceA") } func TestBadReplaceHost(t *testing.T) { - ps := NewPlacementService(placement.NewOptions().SetLooseRackCheck(false), NewMockStorage()) + ps := NewPlacementService(placement.NewOptions(), NewMockStorage()) - err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1"), placement.NewHost("r4h4", "r4", "z1")}, 10, 1) + err := ps.BuildInitialPlacement("serviceA", []placement.Host{ + placement.NewHost("r1h1", "r1", "z1", 1), + placement.NewHost("r4h4", "r4", "z1", 1), + }, 10, 1) assert.NoError(t, err) // leaving host not exist - err = ps.ReplaceHost("serviceA", placement.NewHost("r1h2", "r1", "z1"), []placement.Host{placement.NewHost("r2h2", "r2", "z1")}) + err = ps.ReplaceHost( + "serviceA", + placement.NewHost("r1h2", "r1", "z1", 1), + []placement.Host{placement.NewHost("r2h2", "r2", "z1", 1)}, + ) assert.Error(t, err) // adding host already exist - err = ps.ReplaceHost("serviceA", placement.NewHost("r1h1", "r1", "z1"), []placement.Host{placement.NewHost("r4h4", "r4", "z1")}) + err = ps.ReplaceHost( + "serviceA", + placement.NewHost("r1h1", "r1", "z1", 1), + []placement.Host{placement.NewHost("r4h4", "r4", "z1", 1)}, + ) assert.Error(t, err) // not enough rack after replace err = ps.AddReplica("serviceA") assert.NoError(t, err) - err = ps.ReplaceHost("serviceA", placement.NewHost("r4h4", "r4", "z1"), []placement.Host{placement.NewHost("r1h2", "r1", "z1")}) + err = ps.ReplaceHost( + "serviceA", + placement.NewHost("r4h4", "r4", "z1", 1), + []placement.Host{placement.NewHost("r1h2", "r1", "z1", 1)}, + ) assert.Error(t, err) // could not find snapshot for service - err = ps.ReplaceHost("badService", placement.NewHost("r1h1", "r1", "z1"), []placement.Host{placement.NewHost("r2h2", "r2", "z1")}) + err = ps.ReplaceHost( + "badService", + placement.NewHost("r1h1", "r1", "z1", 1), + []placement.Host{placement.NewHost("r2h2", "r2", "z1", 1)}, + ) assert.Error(t, err) // catch algo errors - psWithErrorAlgo := placementService{algo: errorAlgorithm{}, ss: NewMockStorage(), options: placement.NewOptions().SetLooseRackCheck(false)} - err = psWithErrorAlgo.ReplaceHost("serviceA", placement.NewHost("r1h1", "r1", "z1"), []placement.Host{placement.NewHost("r2h2", "r2", "z1")}) + psWithErrorAlgo := placementService{algo: errorAlgorithm{}, ss: NewMockStorage(), options: placement.NewOptions()} + err = psWithErrorAlgo.ReplaceHost( + "serviceA", + placement.NewHost("r1h1", "r1", "z1", 1), + []placement.Host{placement.NewHost("r2h2", "r2", "z1", 1)}, + ) assert.Error(t, err) cleanUpTestFiles(t, "serviceA") @@ -226,54 +271,75 @@ func TestBadReplaceHost(t *testing.T) { func TestReplaceHostWithLooseRackCheck(t *testing.T) { ps := NewPlacementService(placement.NewOptions().SetLooseRackCheck(true), NewMockStorage()) - err := ps.BuildInitialPlacement("serviceA", []placement.Host{placement.NewHost("r1h1", "r1", "z1"), placement.NewHost("r4h4", "r4", "z1")}, 10, 1) + err := ps.BuildInitialPlacement( + "serviceA", + []placement.Host{ + placement.NewHost("r1h1", "r1", "z1", 1), + placement.NewHost("r4h4", "r4", "z1", 1), + }, 10, 1) assert.NoError(t, err) // leaving host not exist - err = ps.ReplaceHost("serviceA", placement.NewHost("r1h2", "r1", "z1"), []placement.Host{placement.NewHost("r2h2", "r2", "z1")}) + err = ps.ReplaceHost( + "serviceA", + placement.NewHost("r1h2", "r1", "z1", 1), + []placement.Host{placement.NewHost("r2h2", "r2", "z1", 1)}, + ) assert.Error(t, err) // adding host already exist - err = ps.ReplaceHost("serviceA", placement.NewHost("r1h1", "r1", "z1"), []placement.Host{placement.NewHost("r4h4", "r4", "z1")}) + err = ps.ReplaceHost( + "serviceA", + placement.NewHost("r1h1", "r1", "z1", 1), + []placement.Host{placement.NewHost("r4h4", "r4", "z1", 1)}, + ) assert.Error(t, err) // could not find snapshot for service - err = ps.ReplaceHost("badService", placement.NewHost("r1h1", "r1", "z1"), []placement.Host{placement.NewHost("r2h2", "r2", "z1")}) + err = ps.ReplaceHost( + "badService", + placement.NewHost("r1h1", "r1", "z1", 1), + []placement.Host{placement.NewHost("r2h2", "r2", "z1", 1)}, + ) assert.Error(t, err) // NO ERROR when not enough rack after replace err = ps.AddReplica("serviceA") assert.NoError(t, err) - err = ps.ReplaceHost("serviceA", placement.NewHost("r4h4", "r4", "z1"), []placement.Host{placement.NewHost("r1h2", "r1", "z1")}) + err = ps.ReplaceHost( + "serviceA", + placement.NewHost("r4h4", "r4", "z1", 1), + []placement.Host{placement.NewHost("r1h2", "r1", "z1", 1)}, + ) assert.NoError(t, err) cleanUpTestFiles(t, "serviceA") } func TestFindReplaceHost(t *testing.T) { - h1 := placement.NewEmptyHostShards("r1h1", "r11", "z1") + h1 := placement.NewHostShards(placement.NewHost("r1h1", "r11", "z1", 1)) h1.AddShard(1) h1.AddShard(2) h1.AddShard(3) - h10 := placement.NewEmptyHostShards("r1h10", "r11", "z1") + h10 := placement.NewHostShards(placement.NewHost("r1h10", "r11", "z1", 1)) h10.AddShard(4) h10.AddShard(5) - h2 := placement.NewEmptyHostShards("r2h2", "r12", "z1") + h2 := placement.NewHostShards(placement.NewHost("r2h2", "r12", "z1", 1)) h2.AddShard(6) h2.AddShard(7) h2.AddShard(8) h2.AddShard(9) - h3 := placement.NewEmptyHostShards("r3h3", "r13", "z1") + h3 := placement.NewHostShards(placement.NewHost("r3h3", "r13", "z1", 3)) h3.AddShard(1) h3.AddShard(3) h3.AddShard(4) h3.AddShard(5) h3.AddShard(6) - h4 := placement.NewEmptyHostShards("r4h4", "r14", "z1") + h4 := placement.NewHostShards(placement.NewHost("r4h4", "r14", "z1", 1)) h4.AddShard(2) h4.AddShard(7) h4.AddShard(8) @@ -285,8 +351,8 @@ func TestFindReplaceHost(t *testing.T) { s := placement.NewPlacementSnapshot(hss, ids, 2) candidates := []placement.Host{ - placement.NewHost("h11", "r11", "z1"), - placement.NewHost("h22", "r22", "z2"), + placement.NewHost("h11", "r11", "z1", 1), + placement.NewHost("h22", "r22", "z2", 1), // bad zone } ps := NewPlacementService(placement.NewOptions(), NewMockStorage()).(placementService) @@ -294,23 +360,34 @@ func TestFindReplaceHost(t *testing.T) { assert.Error(t, err) assert.Nil(t, hs) + noConflictCandidates := []placement.Host{ + placement.NewHost("h11", "r0", "z1", 1), + placement.NewHost("h22", "r0", "z2", 1), + } + hs, err = ps.findReplaceHost(s, noConflictCandidates, h3) + assert.Error(t, err) + assert.Contains(t, err.Error(), "could not find enough host to replace") + assert.Nil(t, hs) + ps = NewPlacementService(placement.NewOptions().SetLooseRackCheck(true), NewMockStorage()).(placementService) hs, err = ps.findReplaceHost(s, candidates, h4) assert.NoError(t, err) // gonna prefer r1 because r1 would only conflict shard 2, r2 would conflict 7,8,9 - assert.Equal(t, "r11", hs.Rack()) + assert.Equal(t, 1, len(hs)) + assert.Equal(t, "r11", hs[0].Rack()) ps = NewPlacementService(placement.NewOptions().SetAcrossZones(true), NewMockStorage()).(placementService) hs, err = ps.findReplaceHost(s, candidates, h4) assert.NoError(t, err) // gonna prefer r2 because across zone is allowed and r2 has no conflict - assert.Equal(t, "r22", hs.Rack()) + assert.Equal(t, 1, len(hs)) + assert.Equal(t, "r22", hs[0].Rack()) - h1 = placement.NewEmptyHostShards("h1", "r1", "z1") + h1 = placement.NewHostShards(placement.NewHost("h1", "r1", "z1", 1)) h1.AddShard(1) h1.AddShard(2) - h2 = placement.NewEmptyHostShards("h2", "r2", "z2") + h2 = placement.NewHostShards(placement.NewHost("h2", "r2", "z2", 1)) h2.AddShard(3) h2.AddShard(4) @@ -323,31 +400,149 @@ func TestFindReplaceHost(t *testing.T) { assert.Nil(t, hs) } +func TestGroupHostsByConflict(t *testing.T) { + h1 := placement.NewHost("h1", "", "", 1) + h2 := placement.NewHost("h2", "", "", 1) + h3 := placement.NewHost("h3", "", "", 1) + h4 := placement.NewHost("h4", "", "", 2) + hostConflicts := []sortableValue{ + sortableValue{value: h1, weight: 1}, + sortableValue{value: h2, weight: 0}, + sortableValue{value: h3, weight: 3}, + sortableValue{value: h4, weight: 2}, + } + + groups := groupHostsByConflict(hostConflicts, true) + assert.Equal(t, 4, len(groups)) + assert.Equal(t, h2, groups[0][0]) + assert.Equal(t, h1, groups[1][0]) + assert.Equal(t, h4, groups[2][0]) + assert.Equal(t, h3, groups[3][0]) + + groups = groupHostsByConflict(hostConflicts, false) + assert.Equal(t, 1, len(groups)) + assert.Equal(t, h2, groups[0][0]) +} + +func TestKnapSack(t *testing.T) { + h1 := placement.NewHost("h1", "", "", 40000) + h2 := placement.NewHost("h2", "", "", 20000) + h3 := placement.NewHost("h3", "", "", 80000) + h4 := placement.NewHost("h4", "", "", 50000) + h5 := placement.NewHost("h5", "", "", 190000) + hosts := []placement.Host{h1, h2, h3, h4, h5} + + res, leftWeight := knapsack(hosts, 10000) + assert.Equal(t, -10000, leftWeight) + assert.Equal(t, []placement.Host{h2}, res) + + res, leftWeight = knapsack(hosts, 20000) + assert.Equal(t, 0, leftWeight) + assert.Equal(t, []placement.Host{h2}, res) + + res, leftWeight = knapsack(hosts, 30000) + assert.Equal(t, -10000, leftWeight) + assert.Equal(t, []placement.Host{h1}, res) + + res, leftWeight = knapsack(hosts, 60000) + assert.Equal(t, 0, leftWeight) + assert.Equal(t, []placement.Host{h1, h2}, res) + + res, leftWeight = knapsack(hosts, 120000) + assert.Equal(t, 0, leftWeight) + assert.Equal(t, []placement.Host{h1, h3}, res) + + res, leftWeight = knapsack(hosts, 170000) + assert.Equal(t, 0, leftWeight) + assert.Equal(t, []placement.Host{h1, h3, h4}, res) + + res, leftWeight = knapsack(hosts, 190000) + assert.Equal(t, 0, leftWeight) + // will prefer h5 than h1+h2+h3+h4 + assert.Equal(t, []placement.Host{h5}, res) + + res, leftWeight = knapsack(hosts, 200000) + assert.Equal(t, -10000, leftWeight) + assert.Equal(t, []placement.Host{h2, h5}, res) + + res, leftWeight = knapsack(hosts, 210000) + assert.Equal(t, 0, leftWeight) + assert.Equal(t, []placement.Host{h2, h5}, res) + + res, leftWeight = knapsack(hosts, 400000) + assert.Equal(t, 20000, leftWeight) + assert.Equal(t, []placement.Host{h1, h2, h3, h4, h5}, res) +} + +func TestFillWeight(t *testing.T) { + h1 := placement.NewHost("h1", "", "", 4) + h2 := placement.NewHost("h2", "", "", 2) + h3 := placement.NewHost("h3", "", "", 8) + h4 := placement.NewHost("h4", "", "", 5) + h5 := placement.NewHost("h5", "", "", 19) + + h6 := placement.NewHost("h6", "", "", 3) + h7 := placement.NewHost("h7", "", "", 7) + groups := [][]placement.Host{ + []placement.Host{h1, h2, h3, h4, h5}, + []placement.Host{h6, h7}, + } + + // When targetWeight is smaller than 38, the first group will satisfy + res, leftWeight := fillWeight(groups, 1) + assert.Equal(t, -1, leftWeight) + assert.Equal(t, []placement.Host{h2}, res) + + res, leftWeight = fillWeight(groups, 2) + assert.Equal(t, 0, leftWeight) + assert.Equal(t, []placement.Host{h2}, res) + + res, leftWeight = fillWeight(groups, 17) + assert.Equal(t, 0, leftWeight) + assert.Equal(t, []placement.Host{h1, h3, h4}, res) + + res, leftWeight = fillWeight(groups, 20) + assert.Equal(t, -1, leftWeight) + assert.Equal(t, []placement.Host{h2, h5}, res) + + // When targetWeight is bigger than 38, need to get host from group 2 + res, leftWeight = fillWeight(groups, 40) + assert.Equal(t, -1, leftWeight) + assert.Equal(t, []placement.Host{h1, h2, h3, h4, h5, h6}, res) + + res, leftWeight = fillWeight(groups, 41) + assert.Equal(t, 0, leftWeight) + assert.Equal(t, []placement.Host{h1, h2, h3, h4, h5, h6}, res) + + res, leftWeight = fillWeight(groups, 47) + assert.Equal(t, -1, leftWeight) + assert.Equal(t, []placement.Host{h1, h2, h3, h4, h5, h6, h7}, res) + + res, leftWeight = fillWeight(groups, 48) + assert.Equal(t, 0, leftWeight) + assert.Equal(t, []placement.Host{h1, h2, h3, h4, h5, h6, h7}, res) + + res, leftWeight = fillWeight(groups, 50) + assert.Equal(t, 2, leftWeight) + assert.Equal(t, []placement.Host{h1, h2, h3, h4, h5, h6, h7}, res) +} + func TestRackLenSort(t *testing.T) { - r1 := rackLen{rack: "r1", len: 1} - r2 := rackLen{rack: "r2", len: 2} - r3 := rackLen{rack: "r3", len: 3} - r4 := rackLen{rack: "r4", len: 2} - r5 := rackLen{rack: "r5", len: 1} - r6 := rackLen{rack: "r6", len: 2} - r7 := rackLen{rack: "r7", len: 3} - rs := rackLens{r1, r2, r3, r4, r5, r6, r7} + r1 := sortableValue{value: "r1", weight: 1} + r2 := sortableValue{value: "r2", weight: 2} + r3 := sortableValue{value: "r3", weight: 3} + r4 := sortableValue{value: "r4", weight: 2} + r5 := sortableValue{value: "r5", weight: 1} + r6 := sortableValue{value: "r6", weight: 2} + r7 := sortableValue{value: "r7", weight: 3} + rs := sortableThings{r1, r2, r3, r4, r5, r6, r7} sort.Sort(rs) seen := 0 for _, rl := range rs { - assert.True(t, seen <= rl.len) - seen = rl.len + assert.True(t, seen <= rl.weight) + seen = rl.weight } - - filtered := filterConflictRacks(rs) - assert.Equal(t, rackLens{}, filtered) - - filtered = filterConflictRacks(rackLens{rackLen{rack: "r0", len: 0}, rackLen{rack: "r1", len: 0}}) - assert.Equal(t, rackLens{rackLen{rack: "r0", len: 0}, rackLen{rack: "r1", len: 0}}, filtered) - - filtered = filterConflictRacks(rackLens{rackLen{rack: "r0", len: 0}, rackLen{rack: "r1", len: 1}}) - assert.Equal(t, rackLens{rackLen{rack: "r0", len: 0}}, filtered) } func cleanUpTestFiles(t *testing.T, service string) { @@ -375,7 +570,7 @@ func (errorAlgorithm) RemoveHost(p placement.Snapshot, h placement.Host) (placem return nil, errors.New("error in errorAlgorithm") } -func (errorAlgorithm) ReplaceHost(p placement.Snapshot, leavingHost, addingHost placement.Host) (placement.Snapshot, error) { +func (errorAlgorithm) ReplaceHost(p placement.Snapshot, leavingHost placement.Host, addingHost []placement.Host) (placement.Snapshot, error) { return nil, errors.New("error in errorAlgorithm") } diff --git a/src/cluster/placement/types.go b/src/cluster/placement/types.go index ae680c9678..fcade8cc45 100644 --- a/src/cluster/placement/types.go +++ b/src/cluster/placement/types.go @@ -36,8 +36,8 @@ type Algorithm interface { // RemoveHost removes a host from the cluster RemoveHost(p Snapshot, h Host) (Snapshot, error) - // ReplaceHost replace a host with a new host - ReplaceHost(p Snapshot, leavingHost, addingHost Host) (Snapshot, error) + // ReplaceHost replace a host with new hosts + ReplaceHost(p Snapshot, leavingHost Host, addingHosts []Host) (Snapshot, error) } // DeploymentPlanner generates deployment steps for a placement @@ -68,6 +68,9 @@ type Snapshot interface { // Validate checks if the snapshot is valid Validate() error + + // Copy copies the Snapshot + Copy() Snapshot } // HostShards represents a host and its assigned shards @@ -80,12 +83,13 @@ type HostShards interface { ContainsShard(shard uint32) bool } -// Host contains the information needed for placement +// Host represents a weighted host type Host interface { fmt.Stringer ID() string Rack() string Zone() string + Weight() uint32 } // Service handles the placement related operations for registered services @@ -117,4 +121,9 @@ type Options interface { // AcrossZone enables the placement have hosts across zones AcrossZones() bool SetAcrossZones(acrossZones bool) Options + + // AllowPartialReplace allows shards from the leaving host to be + // placed on hosts other than the new hosts in a replace operation + AllowPartialReplace() bool + SetAllowPartialReplace(allowPartialReplace bool) Options }