Skip to content

Commit

Permalink
[coordinator] Extract new namespace validation logics (#2919)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Nov 25, 2020
1 parent 5f8ca2c commit 9f5d293
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 120 deletions.
27 changes: 20 additions & 7 deletions src/dbnode/namespace/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var (
}

validNamespaceOpts = []nsproto.NamespaceOptions{
nsproto.NamespaceOptions{
{
BootstrapEnabled: true,
FlushEnabled: true,
WritesToCommitLog: true,
Expand All @@ -78,7 +78,7 @@ var (
ExtendedOptions: validExtendedOpts,
StagingState: &nsproto.StagingState{Status: nsproto.StagingStatus_INITIALIZING},
},
nsproto.NamespaceOptions{
{
BootstrapEnabled: true,
FlushEnabled: true,
WritesToCommitLog: true,
Expand All @@ -92,15 +92,15 @@ var (
}

validNamespaceSchemaOpts = []nsproto.NamespaceOptions{
nsproto.NamespaceOptions{
{
RetentionOptions: &validRetentionOpts,
SchemaOptions: testSchemaOptions,
},
}

invalidRetentionOpts = []nsproto.RetentionOptions{
// block size < buffer past
nsproto.RetentionOptions{
{
RetentionPeriodNanos: toNanos(1200), // 20h
BlockSizeNanos: toNanos(2), // 2m
BufferFutureNanos: toNanos(12), // 12m
Expand All @@ -109,7 +109,7 @@ var (
BlockDataExpiryAfterNotAccessPeriodNanos: toNanos(30), // 30m
},
// block size > retention
nsproto.RetentionOptions{
{
RetentionPeriodNanos: toNanos(1200), // 20h
BlockSizeNanos: toNanos(1260), // 21h
BufferFutureNanos: toNanos(12), // 12m
Expand Down Expand Up @@ -150,15 +150,28 @@ func TestNamespaceToRetentionInvalid(t *testing.T) {
}
}

func TestToNamespaceValid(t *testing.T) {
func TestToMetadataValid(t *testing.T) {
for _, nsopts := range validNamespaceOpts {
nsOpts, err := namespace.ToMetadata("abc", &nsopts)
require.NoError(t, err)
assertEqualMetadata(t, "abc", nsopts, nsOpts)
}
}

func TestToNamespaceInvalid(t *testing.T) {
func TestToMetadataNilIndexOpts(t *testing.T) {
nsopts := validNamespaceOpts[0]

nsopts.RetentionOptions.BlockSizeNanos = 7200000000000 / 2
nsopts.IndexOptions = nil

nsOpts, err := namespace.ToMetadata("id", &nsopts)
require.NoError(t, err)
assert.Equal(t,
time.Duration(nsopts.RetentionOptions.BlockSizeNanos),
nsOpts.Options().IndexOptions().BlockSize())
}

func TestToMetadataInvalid(t *testing.T) {
for _, nsopts := range validNamespaceOpts {
_, err := namespace.ToMetadata("", &nsopts)
require.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ type NewTileAggregatorFn func(iOpts instrument.Options) TileAggregator

// NamespaceHooks allows dynamic plugging into the namespace lifecycle.
type NamespaceHooks interface {
// OnCreatedNamespace gets invoked after each namespace is created.
// OnCreatedNamespace gets invoked after each namespace is initialized.
OnCreatedNamespace(Namespace, GetNamespaceFn) error
}

Expand Down
4 changes: 3 additions & 1 deletion src/query/api/v1/handler/database/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config"
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/util/queryhttp"
"github.com/m3db/m3/src/x/instrument"
)
Expand All @@ -45,9 +46,10 @@ func RegisterRoutes(
embeddedDbCfg *dbconfig.DBConfiguration,
defaults []handleroptions.ServiceOptionsDefault,
instrumentOpts instrument.Options,
namespaceValidator options.NamespaceValidator,
) error {
createHandler, err := NewCreateHandler(client, cfg, embeddedDbCfg,
defaults, instrumentOpts)
defaults, instrumentOpts, namespaceValidator)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions src/query/api/v1/handler/database/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/namespace"
"github.com/m3db/m3/src/query/api/v1/handler/placement"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
xerrors "github.com/m3db/m3/src/x/errors"
Expand Down Expand Up @@ -148,6 +149,7 @@ func NewCreateHandler(
embeddedDbCfg *dbconfig.DBConfiguration,
defaults []handleroptions.ServiceOptionsDefault,
instrumentOpts instrument.Options,
namespaceValidator options.NamespaceValidator,
) (http.Handler, error) {
placementHandlerOptions, err := placement.NewHandlerOptions(client,
cfg, nil, instrumentOpts)
Expand All @@ -157,7 +159,7 @@ func NewCreateHandler(
return &createHandler{
placementInitHandler: placement.NewInitHandler(placementHandlerOptions),
placementGetHandler: placement.NewGetHandler(placementHandlerOptions),
namespaceAddHandler: namespace.NewAddHandler(client, instrumentOpts),
namespaceAddHandler: namespace.NewAddHandler(client, instrumentOpts, namespaceValidator),
namespaceGetHandler: namespace.NewGetHandler(client, instrumentOpts),
namespaceDeleteHandler: namespace.NewDeleteHandler(client, instrumentOpts),
embeddedDbCfg: embeddedDbCfg,
Expand Down Expand Up @@ -316,7 +318,7 @@ func (h *createHandler) parseAndValidateRequest(
) (*admin.DatabaseCreateRequest, []*admin.NamespaceAddRequest, *admin.PlacementInitRequest, error) {
requirePlacement := existingPlacement == nil

defer r.Body.Close()
defer r.Body.Close() //nolint:errcheck
rBody, err := xhttp.DurationToNanosBytes(r.Body)
if err != nil {
wrapped := fmt.Errorf("error converting duration to nano bytes: %s", err.Error())
Expand Down Expand Up @@ -582,7 +584,7 @@ func defaultedPlacementInitRequest(
numShards = shardMultiplier
replicationFactor = 1
instances = []*placementpb.Instance{
&placementpb.Instance{
{
Id: DefaultLocalHostID,
IsolationGroup: DefaultLocalIsolationGroup,
Zone: DefaultLocalZone,
Expand Down
25 changes: 13 additions & 12 deletions src/query/api/v1/handler/database/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler/namespace"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/validators"
"github.com/m3db/m3/src/x/instrument"
xjson "github.com/m3db/m3/src/x/json"
xtest "github.com/m3db/m3/src/x/test"
Expand Down Expand Up @@ -101,7 +102,7 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -234,7 +235,7 @@ func TestLocalTypeClusteredPlacementAlreadyExists(t *testing.T) {

mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl)
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -280,7 +281,7 @@ func TestLocalTypeWithNumShards(t *testing.T) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)

w := httptest.NewRecorder()
Expand Down Expand Up @@ -410,7 +411,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -539,7 +540,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -682,7 +683,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsHostsProvided(t *testing.T) {
mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(nil, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -737,7 +738,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsExistingIsLocal(t *testing.T) {

mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl)
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -783,7 +784,7 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -953,7 +954,7 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) {
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()

createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -1109,7 +1110,7 @@ func TestClusterTypeMissingHostnames(t *testing.T) {
mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound)

createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -1145,7 +1146,7 @@ func TestBadType(t *testing.T) {
mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound)

createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
nil, svcDefaultOptions, instrument.NewOptions())
nil, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -1180,7 +1181,7 @@ func TestLocalTypeWithAggregatedNamespace(t *testing.T) {
fakeKV := fake.NewStore()
mockClient.EXPECT().Store(gomock.Any()).Return(fakeKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)

w := httptest.NewRecorder()
Expand Down
55 changes: 20 additions & 35 deletions src/query/api/v1/handler/namespace/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package namespace

import (
"bytes"
"errors"
"fmt"
"net/http"
"path"
Expand All @@ -32,6 +31,8 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/api/v1/validators"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
xerrors "github.com/m3db/m3/src/x/errors"
Expand All @@ -48,21 +49,27 @@ var (

// AddHTTPMethod is the HTTP method used with this resource.
AddHTTPMethod = http.MethodPost

errNamespaceExists = xerrors.NewInvalidParamsError(errors.New("namespace with same ID already exists"))
)

// AddHandler is the handler for namespace adds.
type AddHandler Handler
type AddHandler struct {
Handler

validator options.NamespaceValidator
}

// NewAddHandler returns a new instance of AddHandler.
func NewAddHandler(
client clusterclient.Client,
instrumentOpts instrument.Options,
validator options.NamespaceValidator,
) *AddHandler {
return &AddHandler{
client: client,
instrumentOpts: instrumentOpts,
Handler: Handler{
client: client,
instrumentOpts: instrumentOpts,
},
validator: validator,
}
}

Expand All @@ -84,13 +91,13 @@ func (h *AddHandler) ServeHTTP(
opts := handleroptions.NewServiceOptions(svc, r.Header, nil)
nsRegistry, err := h.Add(md, opts)
if err != nil {
if err == errNamespaceExists {
if err == validators.ErrNamespaceExists {
logger.Error("namespace already exists", zap.Error(err))
xhttp.WriteError(w, xhttp.NewError(err, http.StatusConflict))
return
}

logger.Error("unable to get namespace", zap.Error(err))
logger.Error("unable to add namespace", zap.Error(err))
xhttp.WriteError(w, err)
return
}
Expand All @@ -103,7 +110,7 @@ func (h *AddHandler) ServeHTTP(
}

func (h *AddHandler) parseRequest(r *http.Request) (*admin.NamespaceAddRequest, error) {
defer r.Body.Close()
defer r.Body.Close() // nolint:errcheck
rBody, err := xhttp.DurationToNanosBytes(r.Body)
if err != nil {
return nil, xerrors.NewInvalidParamsError(err)
Expand All @@ -129,10 +136,6 @@ func (h *AddHandler) Add(
return emptyReg, xerrors.NewInvalidParamsError(fmt.Errorf("bad namespace metadata: %v", err))
}

if err := validateNewMetadata(md); err != nil {
return emptyReg, xerrors.NewInvalidParamsError(fmt.Errorf("invalid new namespace metadata: %v", err))
}

store, err := h.client.Store(opts.KVOverrideOptions())
if err != nil {
return emptyReg, err
Expand All @@ -143,15 +146,11 @@ func (h *AddHandler) Add(
return emptyReg, err
}

// Since this endpoint is `/add` and not in-place update, return an error if
// the NS already exists. NewMap will return an error if there's duplicate
// entries with the same name, but it's abstracted away behind a MultiError so
// we can't easily check that it's a conflict in the handler.
for _, ns := range currentMetadata {
if ns.ID().Equal(md.ID()) {
// NB: errNamespaceExists already an invalid params error.
return emptyReg, errNamespaceExists
if err := h.validator.ValidateNewNamespace(md, currentMetadata); err != nil {
if err == validators.ErrNamespaceExists {
return emptyReg, err
}
return emptyReg, xerrors.NewInvalidParamsError(err)
}

newMDs := append(currentMetadata, md)
Expand All @@ -176,17 +175,3 @@ func (h *AddHandler) Add(

return *protoRegistry, nil
}

// Validate new namespace inputs only. Validation that applies to namespaces regardless of create/update/etc
// belongs in the option-specific Validate functions which are invoked on every change operation.
func validateNewMetadata(m namespace.Metadata) error {
indexBlockSize := m.Options().RetentionOptions().BlockSize()
retentionBlockSize := m.Options().IndexOptions().BlockSize()
if indexBlockSize != retentionBlockSize {
return fmt.Errorf("index and retention block size must match (%v, %v)",
indexBlockSize,
retentionBlockSize,
)
}
return nil
}
Loading

0 comments on commit 9f5d293

Please sign in to comment.