Skip to content

Commit

Permalink
Consider weights in placement algorithm (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw9 authored and Chao Wang committed Oct 24, 2016
1 parent ac40c2c commit 9240080
Show file tree
Hide file tree
Showing 9 changed files with 985 additions and 532 deletions.
84 changes: 50 additions & 34 deletions src/cluster/placement/algo/algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package algo
import (
"container/heap"
"errors"
"fmt"

"github.com/m3db/m3cluster/placement"
)
Expand All @@ -44,7 +45,7 @@ func NewRackAwarePlacementAlgorithm(opt placement.Options) placement.Algorithm {
}

func (a rackAwarePlacementAlgorithm) BuildInitialPlacement(hosts []placement.Host, shards []uint32) (placement.Snapshot, error) {
ph := newInitPlacementHelper(a.options, hosts, shards)
ph := newInitHelper(hosts, shards, a.options)

if err := ph.PlaceShards(shards, nil); err != nil {
return nil, err
Expand All @@ -53,18 +54,18 @@ func (a rackAwarePlacementAlgorithm) BuildInitialPlacement(hosts []placement.Hos
}

func (a rackAwarePlacementAlgorithm) AddReplica(ps placement.Snapshot) (placement.Snapshot, error) {
ph := newPlacementHelperWithTargetRF(a.options, ps, ps.Replicas()+1)
ps = ps.Copy()
ph := newAddReplicaHelper(ps, a.options)
if err := ph.PlaceShards(ps.Shards(), nil); err != nil {
return nil, err
}
return ph.GenerateSnapshot(), nil
}

func (a rackAwarePlacementAlgorithm) RemoveHost(ps placement.Snapshot, leavingHost placement.Host) (placement.Snapshot, error) {
var ph PlacementHelper
var leavingHostShards placement.HostShards
var err error
if ph, leavingHostShards, err = newRemoveHostPlacementHelper(a.options, ps, leavingHost); err != nil {
ps = ps.Copy()
ph, leavingHostShards, err := newRemoveHostHelper(ps, leavingHost, a.options)
if err != nil {
return nil, err
}
// place the shards from the leaving host to the rest of the cluster
Expand All @@ -75,45 +76,67 @@ func (a rackAwarePlacementAlgorithm) RemoveHost(ps placement.Snapshot, leavingHo
}

func (a rackAwarePlacementAlgorithm) AddHost(ps placement.Snapshot, addingHost placement.Host) (placement.Snapshot, error) {
var addingHostShards placement.HostShards
var err error
if addingHostShards, err = getNewHostShardsForSnapshot(ps, addingHost); err != nil {
return nil, err
}
return a.addHostShards(ps, addingHostShards)
ps = ps.Copy()
return a.addHostShards(ps, placement.NewHostShards(addingHost))
}

func (a rackAwarePlacementAlgorithm) ReplaceHost(ps placement.Snapshot, leavingHost, addingHost placement.Host) (placement.Snapshot, error) {
ph, leavingHostShards, addingHostShards, err := newReplaceHostPlacementHelper(a.options, ps, leavingHost, addingHost)
func (a rackAwarePlacementAlgorithm) ReplaceHost(ps placement.Snapshot, leavingHost placement.Host, addingHosts []placement.Host) (placement.Snapshot, error) {
ps = ps.Copy()
ph, leavingHostShards, addingHostShards, err := newReplaceHostHelper(ps, leavingHost, addingHosts, a.options)
if err != nil {
return nil, err
}

var shardsUnassigned []uint32
// move shards from leaving host to adding host
for _, shard := range leavingHostShards.Shards() {
if moved := ph.MoveShard(shard, leavingHostShards, addingHostShards); !moved {
shardsUnassigned = append(shardsUnassigned, shard)
hh := ph.BuildHostHeap(addingHostShards, true)
for leavingHostShards.ShardsLen() > 0 {
if hh.Len() == 0 {
break
}
tryHost := heap.Pop(hh).(placement.HostShards)
if moved := ph.MoveOneShard(leavingHostShards, tryHost); moved {
heap.Push(hh, tryHost)
}
}

// if there are shards that can not be moved to adding host
// distribute them to the cluster
if err := ph.PlaceShards(shardsUnassigned, leavingHostShards); err != nil {
return nil, err
if !a.options.AllowPartialReplace() {
if leavingHostShards.ShardsLen() > 0 {
return nil, fmt.Errorf("could not fully replace all shards from %s, %v shards left unassigned", leavingHost.ID(), leavingHostShards.ShardsLen())
}
return ph.GenerateSnapshot(), nil
}

// add the adding host to the cluster and bring its load up to target load
cl := ph.GenerateSnapshot()
if leavingHostShards.ShardsLen() == 0 {
return ph.GenerateSnapshot(), nil
}
// place the shards from the leaving host to the rest of the cluster
if err := ph.PlaceShards(leavingHostShards.Shards(), leavingHostShards); err != nil {
return nil, err
}
// fill up to the target load for added hosts if have not already done so
newPS := ph.GenerateSnapshot()
for _, addingHost := range addingHosts {
newPS, leavingHostShards, err = removeHostFromPlacement(newPS, addingHost)
if err != nil {
return nil, err
}
newPS, err = a.addHostShards(newPS, leavingHostShards)
if err != nil {
return nil, err
}
}

return a.addHostShards(cl, addingHostShards)
return newPS, nil
}

func (a rackAwarePlacementAlgorithm) addHostShards(ps placement.Snapshot, addingHostShard placement.HostShards) (placement.Snapshot, error) {
ph := newAddHostShardsPlacementHelper(a.options, ps, addingHostShard)
if ps.HostShard(addingHostShard.Host().ID()) != nil {
return nil, errAddingHostAlreadyExist
}
ph := newAddHostShardsHelper(ps, addingHostShard, a.options)
targetLoad := ph.GetTargetLoadForHost(addingHostShard.Host().ID())
// try to take shards from the most loaded hosts until the adding host reaches target load
hh := ph.GetHostHeap()
hh := ph.BuildHostHeap(ps.HostShards(), false)
for addingHostShard.ShardsLen() < targetLoad {
if hh.Len() == 0 {
return nil, errCouldNotReachTargetLoad
Expand All @@ -126,10 +149,3 @@ func (a rackAwarePlacementAlgorithm) addHostShards(ps placement.Snapshot, adding

return ph.GenerateSnapshot(), nil
}

func getNewHostShardsForSnapshot(ps placement.Snapshot, addingHost placement.Host) (placement.HostShards, error) {
if ps.HostShard(addingHost.ID()) != nil {
return nil, errAddingHostAlreadyExist
}
return placement.NewEmptyHostShardsFromHost(addingHost), nil
}
Loading

0 comments on commit 9240080

Please sign in to comment.