Skip to content

Commit

Permalink
[m3cluster] Expose placement algorithm in placement service (#2858)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Nov 9, 2020
1 parent 6536b7c commit ce720e8
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 54 deletions.
5 changes: 2 additions & 3 deletions src/cluster/placement/service/mirrored_custom_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ const (
instG3I1 = "g3_i1"
instG3I2 = "g3_i2"
instG3I3 = "g3_i3"

)

var (
Expand Down Expand Up @@ -205,7 +204,7 @@ func mirroredCustomGroupSelectorSetup(t *testing.T) *mirroredCustomGroupSelector
tctx.Groups = testGroups

opts := placement.NewOptions().
SetValidZone(zone).
SetValidZone(zone).
SetIsMirrored(true)

tctx.Selector = selector.NewMirroredCustomGroupSelector(
Expand All @@ -217,7 +216,7 @@ func mirroredCustomGroupSelectorSetup(t *testing.T) *mirroredCustomGroupSelector

tctx.KVStore = mem.NewStore()
tctx.Storage = placementstorage.NewPlacementStorage(tctx.KVStore, "placement", tctx.Opts)
tctx.Service = NewPlacementService(tctx.Storage, tctx.Opts)
tctx.Service = NewPlacementService(tctx.Storage, WithPlacementOptions(tctx.Opts))
return tctx
}

Expand Down
5 changes: 2 additions & 3 deletions src/cluster/placement/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
// given placement.
// If initialPlacement is nil, BuildInitialPlacement must be called before any operations on the
// placement.
func NewPlacementOperator(initialPlacement placement.Placement, opts placement.Options) placement.Operator {
func NewPlacementOperator(initialPlacement placement.Placement, opts ...Option) placement.Operator {
store := newDummyStore(initialPlacement)
return &placementOperator{
placementServiceImpl: newPlacementServiceImpl(opts, store),
placementServiceImpl: newPlacementServiceImpl(store, opts...),
store: store,
}
}
Expand Down Expand Up @@ -97,4 +97,3 @@ func (d *dummyStore) Placement() (placement.Placement, error) {
}
return d.curPlacement, nil
}

15 changes: 7 additions & 8 deletions src/cluster/placement/service/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,20 @@ import (
"testing"

"github.com/m3db/m3/src/cluster/placement"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOperator(t *testing.T) {
type testDeps struct {
options placement.Options
op placement.Operator
op placement.Operator
}
setup := func(t *testing.T) testDeps {
options := placement.NewOptions().SetAllowAllZones(true)
return testDeps{
options: options,
op: NewPlacementOperator(nil, options),
op: NewPlacementOperator(nil, WithPlacementOptions(options)),
}
}

Expand All @@ -60,7 +59,7 @@ func TestOperator(t *testing.T) {

t.Run("end-to-end flow", func(t *testing.T) {
tdeps := setup(t)
op := NewPlacementOperator(nil, tdeps.options)
op := NewPlacementOperator(nil, WithPlacementOptions(tdeps.options))
store := newMockStorage()

pl, err := op.BuildInitialPlacement([]placement.Instance{newTestInstance()}, 10, 1)
Expand All @@ -81,7 +80,7 @@ func TestOperator(t *testing.T) {
require.NoError(t, err)

// expect exactly one version increment, from store.SetIfNotExist
assert.Equal(t, initialVersion + 1, pl.Version())
assert.Equal(t, initialVersion+1, pl.Version())

// spot check the results
allAvailable := true
Expand All @@ -94,15 +93,15 @@ func TestOperator(t *testing.T) {
})
}

type dummyStoreTestDeps struct{
type dummyStoreTestDeps struct {
store *dummyStore
pl placement.Placement
pl placement.Placement
}

func dummyStoreSetup(t *testing.T) dummyStoreTestDeps {
return dummyStoreTestDeps{
store: newDummyStore(nil),
pl: placement.NewPlacement(),
pl: placement.NewPlacement(),
}
}

Expand Down
66 changes: 54 additions & 12 deletions src/cluster/placement/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/m3db/m3/src/cluster/placement/algo"
"github.com/m3db/m3/src/cluster/placement/selector"
"github.com/m3db/m3/src/cluster/shard"

"go.uber.org/zap"
)

Expand All @@ -37,36 +36,79 @@ type placementService struct {
}

// NewPlacementService returns an instance of placement service.
func NewPlacementService(s placement.Storage, opts placement.Options) placement.Service {
func NewPlacementService(s placement.Storage, opts ...Option) placement.Service {
return &placementService{
Storage: s,
placementServiceImpl: newPlacementServiceImpl(
opts,
s,

opts...,
),
}
}

type options struct {
placementAlgorithm placement.Algorithm
placementOpts placement.Options
}

// Option is an interface for PlacementService options.
type Option interface {
apply(*options)
}

// WithAlgorithm sets the algorithm implementation that will be used by PlacementService.
func WithAlgorithm(algo placement.Algorithm) Option {
return &algorithmOption{placementAlgorithm: algo}
}

type algorithmOption struct {
placementAlgorithm placement.Algorithm
}

func (a *algorithmOption) apply(opts *options) {
opts.placementAlgorithm = a.placementAlgorithm
}

type placementOptionsOption struct {
opts placement.Options
}

func (a *placementOptionsOption) apply(opts *options) {
opts.placementOpts = a.opts
}

// WithPlacementOptions sets the placement options for PlacementService.
func WithPlacementOptions(opts placement.Options) Option {
return &placementOptionsOption{opts: opts}
}

func newPlacementServiceImpl(
opts placement.Options,
storage minimalPlacementStorage,
opts ...Option,
) *placementServiceImpl {
if opts == nil {
opts = placement.NewOptions()
o := options{
placementOpts: placement.NewOptions(),
}

for _, opt := range opts {
opt.apply(&o)
}

if o.placementAlgorithm == nil {
o.placementAlgorithm = algo.NewAlgorithm(o.placementOpts)
}

instanceSelector := opts.InstanceSelector()
instanceSelector := o.placementOpts.InstanceSelector()
if instanceSelector == nil {
instanceSelector = selector.NewInstanceSelector(opts)
instanceSelector = selector.NewInstanceSelector(o.placementOpts)
}

return &placementServiceImpl{
store: storage,
opts: opts,
algo: algo.NewAlgorithm(opts),
opts: o.placementOpts,
algo: o.placementAlgorithm,
selector: instanceSelector,
logger: opts.InstrumentOptions().Logger(),
logger: o.placementOpts.InstrumentOptions().Logger(),
}
}

Expand Down
Loading

0 comments on commit ce720e8

Please sign in to comment.