Skip to content

Commit

Permalink
[coordinator] Validate placement on set placement endpoint unless for…
Browse files Browse the repository at this point in the history
…ce set (#2922)
  • Loading branch information
robskillington authored Nov 18, 2020
1 parent b79f5f5 commit 052fd40
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 36 deletions.
55 changes: 54 additions & 1 deletion src/cluster/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/shard"
xerrors "github.com/m3db/m3/src/x/errors"
)

const (
Expand Down Expand Up @@ -297,6 +298,13 @@ func (placements Placements) ActiveIndex(timeNanos int64) int {
// - There is one Initializing shard for each Leaving shard.
// - The instances with same shard_set_id owns the same shards.
func Validate(p Placement) error {
if err := validate(p); err != nil {
return xerrors.NewInvalidParamsError(err)
}
return nil
}

func validate(p Placement) error {
if p.IsMirrored() && !p.IsSharded() {
return errMirrorNotSharded
}
Expand All @@ -311,6 +319,7 @@ func Validate(p Placement) error {
totalLeaving := 0
totalInit := 0
totalInitWithSourceID := 0
instancesLeavingShardsWithMatchingInitShards := make(map[string]map[uint32]string)
maxShardSetID := p.MaxShardSetID()
instancesByShardSetID := make(map[uint32]Instance, p.NumInstances())
for _, instance := range p.Instances() {
Expand Down Expand Up @@ -340,8 +349,52 @@ func Validate(p Placement) error {
totalInit++
shardCountMap[s.ID()] = count + 1
totalCapacity++
if s.SourceID() != "" {
if sourceID := s.SourceID(); sourceID != "" {
totalInitWithSourceID++

// Check the instance.
leaving, ok := p.Instance(sourceID)
if !ok {
return fmt.Errorf(
"instance %s has initializing shard %d with "+
"source ID %s but no such instance in placement",
instance.ID(), s.ID(), sourceID)
}

// Check has leaving shard.
leavingShard, ok := leaving.Shards().Shard(s.ID())
if !ok {
return fmt.Errorf(
"instance %s has initializing shard %d with "+
"source ID %s but leaving instance has no such shard",
instance.ID(), s.ID(), sourceID)
}

// Check the shard is leaving.
if state := leavingShard.State(); state != shard.Leaving {
return fmt.Errorf(
"instance %s has initializing shard %d with "+
"source ID %s but leaving instance has shard with state %s",
instance.ID(), s.ID(), sourceID, state.String())
}

// Make sure does not get double matched.
matches, ok := instancesLeavingShardsWithMatchingInitShards[sourceID]
if !ok {
matches = make(map[uint32]string)
instancesLeavingShardsWithMatchingInitShards[sourceID] = matches
}

match, ok := matches[s.ID()]
if ok {
return fmt.Errorf(
"instance %s has initializing shard %d with "+
"source ID %s but leaving instance has shard already matched by %s",
instance.ID(), s.ID(), sourceID, match)
}

// Track that it's matched.
matches[s.ID()] = instance.ID()
}
case shard.Leaving:
totalLeaving++
Expand Down
100 changes: 92 additions & 8 deletions src/cluster/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/cluster/shard"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPlacement(t *testing.T) {
Expand Down Expand Up @@ -139,7 +140,7 @@ func TestMismatchShards(t *testing.T) {

// mismatch shards
p := NewPlacement().SetInstances([]Instance{i1, i2}).SetShards([]uint32{1, 2, 3}).SetReplicaFactor(1)
assert.Error(t, Validate(p))
require.Error(t, Validate(p))
}

func TestNonSharded(t *testing.T) {
Expand All @@ -155,8 +156,8 @@ func TestNonSharded(t *testing.T) {
func TestValidateMirrorButNotSharded(t *testing.T) {
p := NewPlacement().SetIsMirrored(true)
err := Validate(p)
assert.Error(t, err)
assert.Equal(t, errMirrorNotSharded, err)
require.Error(t, err)
assert.Equal(t, errMirrorNotSharded.Error(), err.Error())
}

func TestValidateMissingShard(t *testing.T) {
Expand All @@ -170,7 +171,7 @@ func TestValidateMissingShard(t *testing.T) {
ids := []uint32{1, 2}
p := NewPlacement().SetInstances([]Instance{i1, i2}).SetShards(ids).SetReplicaFactor(2).SetIsSharded(true)
err := Validate(p)
assert.Error(t, err)
require.Error(t, err)
assert.Equal(t, "invalid placement, the total available shards in the placement is 3, expecting 4", err.Error())
}

Expand All @@ -189,8 +190,9 @@ func TestValidateUnexpectedShard(t *testing.T) {
SetReplicaFactor(2).
SetIsSharded(true)

assert.Error(t, Validate(p))
assert.Equal(t, errUnexpectedShards, Validate(p))
err := Validate(p)
require.Error(t, err)
assert.Equal(t, errUnexpectedShards.Error(), err.Error())
}

func TestValidateDuplicatedShards(t *testing.T) {
Expand All @@ -208,8 +210,9 @@ func TestValidateDuplicatedShards(t *testing.T) {
SetInstances([]Instance{i1, i2}).
SetShards([]uint32{2, 3, 4, 4, 5, 6}).
SetReplicaFactor(1)
assert.Error(t, Validate(p))
assert.Equal(t, errDuplicatedShards, Validate(p))
err := Validate(p)
require.Error(t, err)
assert.Equal(t, errDuplicatedShards.Error(), err.Error())
}

func TestValidateWrongReplicaForSomeShards(t *testing.T) {
Expand Down Expand Up @@ -276,6 +279,87 @@ func TestValidateLeavingNotMatchInitializingWithSourceID(t *testing.T) {
assert.Equal(t, err.Error(), "invalid placement, 2 shards in Leaving state, not equal 1 in Initializing state with source id")
}

func TestValidateLeavingAndInitializingWithSourceIDMissing(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Leaving))
i1.Shards().Add(shard.NewShard(2).SetState(shard.Leaving))
i1.Shards().Add(shard.NewShard(3).SetState(shard.Available))

i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)
i2.Shards().Add(shard.NewShard(1).SetState(shard.Initializing).SetSourceID("unknown"))
i2.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))

p := NewPlacement().
SetInstances([]Instance{i1, i2}).
SetShards([]uint32{1, 2, 3}).
SetReplicaFactor(1).
SetIsSharded(true)
err := Validate(p)
require.Error(t, err)
assert.Equal(t, err.Error(), "instance i2 has initializing shard 1 with source ID unknown but no such instance in placement")
}

func TestValidateLeavingAndInitializingWithSourceIDNoSuchShard(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Available))
i1.Shards().Add(shard.NewShard(2).SetState(shard.Leaving))

i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)
i2.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))
i2.Shards().Add(shard.NewShard(3).SetState(shard.Initializing).SetSourceID("i1"))

p := NewPlacement().
SetInstances([]Instance{i1, i2}).
SetShards([]uint32{1, 2, 3}).
SetReplicaFactor(1).
SetIsSharded(true)
err := Validate(p)
require.Error(t, err)
assert.Equal(t, err.Error(), "instance i2 has initializing shard 3 with source ID i1 but leaving instance has no such shard")
}

func TestValidateLeavingAndInitializingWithSourceIDShardNotLeaving(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Available))
i1.Shards().Add(shard.NewShard(2).SetState(shard.Leaving))
i1.Shards().Add(shard.NewShard(3).SetState(shard.Available))

i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)
i2.Shards().Add(shard.NewShard(1).SetState(shard.Initializing).SetSourceID("i1"))
i2.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))

p := NewPlacement().
SetInstances([]Instance{i1, i2}).
SetShards([]uint32{1, 2, 3}).
SetReplicaFactor(1).
SetIsSharded(true)
err := Validate(p)
require.Error(t, err)
assert.Equal(t, err.Error(), "instance i2 has initializing shard 1 with source ID i1 but leaving instance has shard with state Available")
}

func TestValidateLeavingAndInitializingWithSourceIDDoubleMatched(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Available))
i1.Shards().Add(shard.NewShard(2).SetState(shard.Leaving))
i1.Shards().Add(shard.NewShard(3).SetState(shard.Available))

i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)
i2.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))

i3 := NewEmptyInstance("i3", "r2", "z1", "endpoint", 1)
i3.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i1"))

p := NewPlacement().
SetInstances([]Instance{i1, i2, i3}).
SetShards([]uint32{1, 2, 3}).
SetReplicaFactor(1).
SetIsSharded(true)
err := Validate(p)
require.Error(t, err)
assert.Equal(t, err.Error(), "instance i3 has initializing shard 2 with source ID i1 but leaving instance has shard already matched by i2")
}

func TestValidateNoEndpoint(t *testing.T) {
i1 := NewEmptyInstance("i1", "r1", "z1", "", 1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Available))
Expand Down
11 changes: 11 additions & 0 deletions src/query/api/v1/handler/placement/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package placement

import (
"fmt"
"net/http"
"path"
"time"
Expand Down Expand Up @@ -116,6 +117,16 @@ func (h *SetHandler) ServeHTTP(
return
}

if err := placement.Validate(newPlacement); err != nil {
if !req.Force {
logger.Error("unable to validate new placement", zap.Error(err))
xhttp.WriteError(w,
xerrors.NewRenamedError(err, fmt.Errorf("unable to validate new placement: %w", err)))
return
}
logger.Warn("unable to validate new placement, continuing with force", zap.Error(err))
}

var (
placementProto = req.Placement
dryRun = !req.Confirm
Expand Down
115 changes: 115 additions & 0 deletions src/query/api/v1/handler/placement/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,118 @@ func TestPlacementSetHandler_NewPlacement(t *testing.T) {
assert.Equal(t, 0, newPlacement.Version())
})
}

func TestPlacementSetHandler_ValidatePlacementWithoutForce(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient, mockPlacementService := SetupPlacementTest(t, ctrl)
handlerOpts, err := NewHandlerOptions(
mockClient, config.Configuration{}, nil, instrument.NewOptions())
require.NoError(t, err)
handler := NewSetHandler(handlerOpts)

badReqProto := &admin.PlacementSetRequest{
Placement: &placementpb.Placement{
Instances: map[string]*placementpb.Instance{
"host1": {
Id: "host1",
IsolationGroup: "rack1",
Zone: "test",
Weight: 1,
Endpoint: "http://host1:1234",
Hostname: "host1",
Port: 1234,
Shards: []*placementpb.Shard{
&placementpb.Shard{
Id: 0,
State: placementpb.ShardState_AVAILABLE,
},
&placementpb.Shard{
Id: 1,
State: placementpb.ShardState_AVAILABLE,
},
},
},
"host2": {
Id: "host2",
IsolationGroup: "rack1",
Zone: "test",
Weight: 1,
Endpoint: "http://host2:1234",
Hostname: "host2",
Port: 1234,
Shards: []*placementpb.Shard{
&placementpb.Shard{
Id: 0,
State: placementpb.ShardState_INITIALIZING,
SourceId: "host1",
},
&placementpb.Shard{
Id: 1,
State: placementpb.ShardState_INITIALIZING,
SourceId: "host1",
},
},
},
},
IsSharded: true,
NumShards: 2,
ReplicaFactor: 2,
},
Version: 0,
Confirm: true,
}

reqBody, err := (&jsonpb.Marshaler{}).MarshalToString(badReqProto)
require.NoError(t, err)

req := httptest.NewRequest(SetHTTPMethod, M3DBSetURL, strings.NewReader(reqBody))
require.NotNil(t, req)

existingPlacementProto := &placementpb.Placement{
Instances: map[string]*placementpb.Instance{
"host1": {
Id: "host1",
IsolationGroup: "rack1",
Zone: "test",
Weight: 1,
Endpoint: "http://host1:1234",
Hostname: "host1",
Port: 1234,
Shards: []*placementpb.Shard{
&placementpb.Shard{
Id: 0,
State: placementpb.ShardState_AVAILABLE,
},
&placementpb.Shard{
Id: 1,
State: placementpb.ShardState_AVAILABLE,
},
},
},
},
IsSharded: true,
NumShards: 2,
ReplicaFactor: 1,
}

existingPlacement, err := placement.NewPlacementFromProto(existingPlacementProto)
require.NoError(t, err)

mockPlacementService.EXPECT().
Placement().
Return(existingPlacement, nil)

svcDefaults := handleroptions.ServiceNameAndDefaults{
ServiceName: handleroptions.M3DBServiceName,
}

w := httptest.NewRecorder()
handler.ServeHTTP(svcDefaults, w, req)
resp := w.Result()
body := w.Body.String()
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.True(t, strings.Contains(body, "unable to validate new placement"))
assert.True(t, strings.Contains(body, "instance host2 has initializing shard 0 with source ID host1 but leaving instance has shard with state Available"))
}
Loading

0 comments on commit 052fd40

Please sign in to comment.