Skip to content

Commit

Permalink
[m3admin] Allow /set for new placements (#2625)
Browse files Browse the repository at this point in the history
Previously `/set` required that a placement already exists in order to
set it. This hinders operations such as preemptively creating a
placement or replacing one that had been deleted via safe APIs.

This PR allows using `/set` even if a placement didn't previously exist
at that key.
  • Loading branch information
schallert authored Sep 14, 2020
1 parent 07c034d commit d38ff54
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 23 deletions.
56 changes: 36 additions & 20 deletions src/query/api/v1/handler/placement/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ var (
M3CoordinatorServicePlacementPathName, setPathName)
)

// SetHandler is the type for placement replaces.
// SetHandler is the type for manually setting a placement value. If none
// currently exists, this will set the initial placement. Otherwise it will
// override the existing placement.
type SetHandler Handler

// NewSetHandler returns a new SetHandler.
Expand Down Expand Up @@ -93,41 +95,55 @@ func (h *SetHandler) ServeHTTP(
return
}

var isNewPlacement bool
curPlacement, err := service.Placement()
if err == kv.ErrNotFound {
logger.Error("placement not found", zap.Any("req", req), zap.Error(err))
xhttp.Error(w, err, http.StatusNotFound)
return
}
if err != nil {
logger.Error("unable to get current placement", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
if err != kv.ErrNotFound {
logger.Error("unable to get current placement", zap.Error(err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

isNewPlacement = true
logger.Info("no placement found, creating new placement")
}

var (
placementProto = req.Placement
placementVersion int
)
newPlacement, err := placement.NewPlacementFromProto(req.Placement)
if err != nil {
logger.Error("unable to create new placement from proto", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}

dryRun := !req.Confirm
var (
placementProto = req.Placement
dryRun = !req.Confirm

updatedPlacement placement.Placement
placementVersion int
)

if dryRun {
logger.Info("performing dry run for set placement, not confirmed")
placementVersion = curPlacement.Version() + 1
if isNewPlacement {
placementVersion = 0
} else {
placementVersion = curPlacement.Version() + 1
}
} else {
logger.Info("performing live run for set placement, confirmed")
// Ensure the placement we're updating is still the one on which we validated
// all shards are available.
updatedPlacement, err := service.CheckAndSet(newPlacement,
curPlacement.Version())

if isNewPlacement {
updatedPlacement, err = service.SetIfNotExist(newPlacement)
} else {
// Ensure the placement we're updating is still the one on which we validated
// all shards are available.
updatedPlacement, err = service.CheckAndSet(newPlacement,
curPlacement.Version())
}

if err != nil {
logger.Error("unable to update placement", zap.Error(err))
logger.Error("unable to update placement", zap.Error(err), zap.Bool("isNewPlacement", isNewPlacement))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}
Expand Down
74 changes: 71 additions & 3 deletions src/query/api/v1/handler/placement/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
Expand All @@ -43,7 +44,7 @@ import (
var (
setExistingTestPlacementProto = &placementpb.Placement{
Instances: map[string]*placementpb.Instance{
"host1": &placementpb.Instance{
"host1": {
Id: "host1",
IsolationGroup: "rack1",
Zone: "test",
Expand All @@ -56,7 +57,7 @@ var (
}
setNewTestPlacementProto = &placementpb.Placement{
Instances: map[string]*placementpb.Instance{
"host1": &placementpb.Instance{
"host1": {
Id: "host1",
IsolationGroup: "rack1",
Zone: "test",
Expand All @@ -65,7 +66,7 @@ var (
Hostname: "host1",
Port: 1234,
},
"host2": &placementpb.Instance{
"host2": {
Id: "host2",
IsolationGroup: "rack1",
Zone: "test",
Expand Down Expand Up @@ -151,3 +152,70 @@ func TestPlacementSetHandler(t *testing.T) {
assert.Equal(t, expected, actual, xtest.Diff(expected, actual))
})
}

func TestPlacementSetHandler_NewPlacement(t *testing.T) {
runForAllAllowedServices(func(serviceName string) {
var url string
switch serviceName {
case handleroptions.M3DBServiceName:
url = M3DBSetURL
case handleroptions.M3AggregatorServiceName:
url = M3AggSetURL
case handleroptions.M3CoordinatorServiceName:
url = M3CoordinatorSetURL
default:
require.FailNow(t, "unexpected service name")
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient, mockPlacementService := SetupPlacementTest(t, ctrl)
handlerOpts, err := NewHandlerOptions(
mockClient, config.Configuration{}, nil, instrument.NewOptions())
require.NoError(t, err)
handler := NewSetHandler(handlerOpts)

// Test placement init success
reqBody, err := (&jsonpb.Marshaler{}).MarshalToString(setTestPlacementReqProto)
require.NoError(t, err)

req := httptest.NewRequest(SetHTTPMethod, url, strings.NewReader(reqBody))
require.NotNil(t, req)

mockPlacementService.EXPECT().
Placement().
Return(nil, kv.ErrNotFound)

newPlacement, err := placement.NewPlacementFromProto(setNewTestPlacementProto)
require.NoError(t, err)

mockPlacementService.EXPECT().
SetIfNotExist(gomock.Any()).
Return(newPlacement, nil)

svcDefaults := handleroptions.ServiceNameAndDefaults{
ServiceName: serviceName,
}

w := httptest.NewRecorder()
handler.ServeHTTP(svcDefaults, w, req)
resp := w.Result()
body := w.Body.String()
assert.Equal(t, http.StatusOK, resp.StatusCode)

expectedBody, err := (&jsonpb.Marshaler{
EmitDefaults: true,
}).MarshalToString(&admin.PlacementSetResponse{
Placement: setNewTestPlacementProto,
DryRun: !setTestPlacementReqProto.Confirm,
})
require.NoError(t, err)

expected := xtest.MustPrettyJSONString(t, expectedBody)
actual := xtest.MustPrettyJSONString(t, body)

assert.Equal(t, expected, actual, xtest.Diff(expected, actual))
assert.Equal(t, 0, newPlacement.Version())
})
}

0 comments on commit d38ff54

Please sign in to comment.