diff --git a/src/dbnode/namespace/convert_test.go b/src/dbnode/namespace/convert_test.go index f1db9cad28..82b590f29b 100644 --- a/src/dbnode/namespace/convert_test.go +++ b/src/dbnode/namespace/convert_test.go @@ -66,7 +66,7 @@ var ( } validNamespaceOpts = []nsproto.NamespaceOptions{ - nsproto.NamespaceOptions{ + { BootstrapEnabled: true, FlushEnabled: true, WritesToCommitLog: true, @@ -78,7 +78,7 @@ var ( ExtendedOptions: validExtendedOpts, StagingState: &nsproto.StagingState{Status: nsproto.StagingStatus_INITIALIZING}, }, - nsproto.NamespaceOptions{ + { BootstrapEnabled: true, FlushEnabled: true, WritesToCommitLog: true, @@ -92,7 +92,7 @@ var ( } validNamespaceSchemaOpts = []nsproto.NamespaceOptions{ - nsproto.NamespaceOptions{ + { RetentionOptions: &validRetentionOpts, SchemaOptions: testSchemaOptions, }, @@ -100,7 +100,7 @@ var ( invalidRetentionOpts = []nsproto.RetentionOptions{ // block size < buffer past - nsproto.RetentionOptions{ + { RetentionPeriodNanos: toNanos(1200), // 20h BlockSizeNanos: toNanos(2), // 2m BufferFutureNanos: toNanos(12), // 12m @@ -109,7 +109,7 @@ var ( BlockDataExpiryAfterNotAccessPeriodNanos: toNanos(30), // 30m }, // block size > retention - nsproto.RetentionOptions{ + { RetentionPeriodNanos: toNanos(1200), // 20h BlockSizeNanos: toNanos(1260), // 21h BufferFutureNanos: toNanos(12), // 12m @@ -150,7 +150,7 @@ func TestNamespaceToRetentionInvalid(t *testing.T) { } } -func TestToNamespaceValid(t *testing.T) { +func TestToMetadataValid(t *testing.T) { for _, nsopts := range validNamespaceOpts { nsOpts, err := namespace.ToMetadata("abc", &nsopts) require.NoError(t, err) @@ -158,7 +158,20 @@ func TestToNamespaceValid(t *testing.T) { } } -func TestToNamespaceInvalid(t *testing.T) { +func TestToMetadataNilIndexOpts(t *testing.T) { + nsopts := validNamespaceOpts[0] + + nsopts.RetentionOptions.BlockSizeNanos = 7200000000000 / 2 + nsopts.IndexOptions = nil + + nsOpts, err := namespace.ToMetadata("id", &nsopts) + require.NoError(t, err) + assert.Equal(t, + time.Duration(nsopts.RetentionOptions.BlockSizeNanos), + nsOpts.Options().IndexOptions().BlockSize()) +} + +func TestToMetadataInvalid(t *testing.T) { for _, nsopts := range validNamespaceOpts { _, err := namespace.ToMetadata("", &nsopts) require.Error(t, err) diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 87aa6b3da1..8f148baad2 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -1411,7 +1411,7 @@ type NewTileAggregatorFn func(iOpts instrument.Options) TileAggregator // NamespaceHooks allows dynamic plugging into the namespace lifecycle. type NamespaceHooks interface { - // OnCreatedNamespace gets invoked after each namespace is created. + // OnCreatedNamespace gets invoked after each namespace is initialized. OnCreatedNamespace(Namespace, GetNamespaceFn) error } diff --git a/src/query/api/v1/handler/database/common.go b/src/query/api/v1/handler/database/common.go index 525268e4d8..70854ea06b 100644 --- a/src/query/api/v1/handler/database/common.go +++ b/src/query/api/v1/handler/database/common.go @@ -25,6 +25,7 @@ import ( dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" + "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/util/queryhttp" "github.com/m3db/m3/src/x/instrument" ) @@ -45,9 +46,10 @@ func RegisterRoutes( embeddedDbCfg *dbconfig.DBConfiguration, defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, + namespaceValidator options.NamespaceValidator, ) error { createHandler, err := NewCreateHandler(client, cfg, embeddedDbCfg, - defaults, instrumentOpts) + defaults, instrumentOpts, namespaceValidator) if err != nil { return err } diff --git a/src/query/api/v1/handler/database/create.go b/src/query/api/v1/handler/database/create.go index de9e7054f8..6860da9bdd 100644 --- a/src/query/api/v1/handler/database/create.go +++ b/src/query/api/v1/handler/database/create.go @@ -40,6 +40,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/namespace" "github.com/m3db/m3/src/query/api/v1/handler/placement" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" + "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" xerrors "github.com/m3db/m3/src/x/errors" @@ -148,6 +149,7 @@ func NewCreateHandler( embeddedDbCfg *dbconfig.DBConfiguration, defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, + namespaceValidator options.NamespaceValidator, ) (http.Handler, error) { placementHandlerOptions, err := placement.NewHandlerOptions(client, cfg, nil, instrumentOpts) @@ -157,7 +159,7 @@ func NewCreateHandler( return &createHandler{ placementInitHandler: placement.NewInitHandler(placementHandlerOptions), placementGetHandler: placement.NewGetHandler(placementHandlerOptions), - namespaceAddHandler: namespace.NewAddHandler(client, instrumentOpts), + namespaceAddHandler: namespace.NewAddHandler(client, instrumentOpts, namespaceValidator), namespaceGetHandler: namespace.NewGetHandler(client, instrumentOpts), namespaceDeleteHandler: namespace.NewDeleteHandler(client, instrumentOpts), embeddedDbCfg: embeddedDbCfg, @@ -316,7 +318,7 @@ func (h *createHandler) parseAndValidateRequest( ) (*admin.DatabaseCreateRequest, []*admin.NamespaceAddRequest, *admin.PlacementInitRequest, error) { requirePlacement := existingPlacement == nil - defer r.Body.Close() + defer r.Body.Close() //nolint:errcheck rBody, err := xhttp.DurationToNanosBytes(r.Body) if err != nil { wrapped := fmt.Errorf("error converting duration to nano bytes: %s", err.Error()) @@ -582,7 +584,7 @@ func defaultedPlacementInitRequest( numShards = shardMultiplier replicationFactor = 1 instances = []*placementpb.Instance{ - &placementpb.Instance{ + { Id: DefaultLocalHostID, IsolationGroup: DefaultLocalIsolationGroup, Zone: DefaultLocalZone, diff --git a/src/query/api/v1/handler/database/create_test.go b/src/query/api/v1/handler/database/create_test.go index 35f46b2f43..a313faf858 100644 --- a/src/query/api/v1/handler/database/create_test.go +++ b/src/query/api/v1/handler/database/create_test.go @@ -40,6 +40,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler/namespace" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" + "github.com/m3db/m3/src/query/api/v1/validators" "github.com/m3db/m3/src/x/instrument" xjson "github.com/m3db/m3/src/x/json" xtest "github.com/m3db/m3/src/x/test" @@ -101,7 +102,7 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -234,7 +235,7 @@ func TestLocalTypeClusteredPlacementAlreadyExists(t *testing.T) { mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -280,7 +281,7 @@ func TestLocalTypeWithNumShards(t *testing.T) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -410,7 +411,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -539,7 +540,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -682,7 +683,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsHostsProvided(t *testing.T) { mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(nil, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -737,7 +738,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsExistingIsLocal(t *testing.T) { mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -783,7 +784,7 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) { mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -953,7 +954,7 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -1109,7 +1110,7 @@ func TestClusterTypeMissingHostnames(t *testing.T) { mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -1145,7 +1146,7 @@ func TestBadType(t *testing.T) { mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - nil, svcDefaultOptions, instrument.NewOptions()) + nil, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() @@ -1180,7 +1181,7 @@ func TestLocalTypeWithAggregatedNamespace(t *testing.T) { fakeKV := fake.NewStore() mockClient.EXPECT().Store(gomock.Any()).Return(fakeKV, nil).AnyTimes() createHandler, err := NewCreateHandler(mockClient, config.Configuration{}, - testDBCfg, svcDefaultOptions, instrument.NewOptions()) + testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator) require.NoError(t, err) w := httptest.NewRecorder() diff --git a/src/query/api/v1/handler/namespace/add.go b/src/query/api/v1/handler/namespace/add.go index 85ebdf108c..d0e864b012 100644 --- a/src/query/api/v1/handler/namespace/add.go +++ b/src/query/api/v1/handler/namespace/add.go @@ -22,7 +22,6 @@ package namespace import ( "bytes" - "errors" "fmt" "net/http" "path" @@ -32,6 +31,8 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" + "github.com/m3db/m3/src/query/api/v1/options" + "github.com/m3db/m3/src/query/api/v1/validators" "github.com/m3db/m3/src/query/generated/proto/admin" "github.com/m3db/m3/src/query/util/logging" xerrors "github.com/m3db/m3/src/x/errors" @@ -48,21 +49,27 @@ var ( // AddHTTPMethod is the HTTP method used with this resource. AddHTTPMethod = http.MethodPost - - errNamespaceExists = xerrors.NewInvalidParamsError(errors.New("namespace with same ID already exists")) ) // AddHandler is the handler for namespace adds. -type AddHandler Handler +type AddHandler struct { + Handler + + validator options.NamespaceValidator +} // NewAddHandler returns a new instance of AddHandler. func NewAddHandler( client clusterclient.Client, instrumentOpts instrument.Options, + validator options.NamespaceValidator, ) *AddHandler { return &AddHandler{ - client: client, - instrumentOpts: instrumentOpts, + Handler: Handler{ + client: client, + instrumentOpts: instrumentOpts, + }, + validator: validator, } } @@ -84,13 +91,13 @@ func (h *AddHandler) ServeHTTP( opts := handleroptions.NewServiceOptions(svc, r.Header, nil) nsRegistry, err := h.Add(md, opts) if err != nil { - if err == errNamespaceExists { + if err == validators.ErrNamespaceExists { logger.Error("namespace already exists", zap.Error(err)) xhttp.WriteError(w, xhttp.NewError(err, http.StatusConflict)) return } - logger.Error("unable to get namespace", zap.Error(err)) + logger.Error("unable to add namespace", zap.Error(err)) xhttp.WriteError(w, err) return } @@ -103,7 +110,7 @@ func (h *AddHandler) ServeHTTP( } func (h *AddHandler) parseRequest(r *http.Request) (*admin.NamespaceAddRequest, error) { - defer r.Body.Close() + defer r.Body.Close() // nolint:errcheck rBody, err := xhttp.DurationToNanosBytes(r.Body) if err != nil { return nil, xerrors.NewInvalidParamsError(err) @@ -129,10 +136,6 @@ func (h *AddHandler) Add( return emptyReg, xerrors.NewInvalidParamsError(fmt.Errorf("bad namespace metadata: %v", err)) } - if err := validateNewMetadata(md); err != nil { - return emptyReg, xerrors.NewInvalidParamsError(fmt.Errorf("invalid new namespace metadata: %v", err)) - } - store, err := h.client.Store(opts.KVOverrideOptions()) if err != nil { return emptyReg, err @@ -143,15 +146,11 @@ func (h *AddHandler) Add( return emptyReg, err } - // Since this endpoint is `/add` and not in-place update, return an error if - // the NS already exists. NewMap will return an error if there's duplicate - // entries with the same name, but it's abstracted away behind a MultiError so - // we can't easily check that it's a conflict in the handler. - for _, ns := range currentMetadata { - if ns.ID().Equal(md.ID()) { - // NB: errNamespaceExists already an invalid params error. - return emptyReg, errNamespaceExists + if err := h.validator.ValidateNewNamespace(md, currentMetadata); err != nil { + if err == validators.ErrNamespaceExists { + return emptyReg, err } + return emptyReg, xerrors.NewInvalidParamsError(err) } newMDs := append(currentMetadata, md) @@ -176,17 +175,3 @@ func (h *AddHandler) Add( return *protoRegistry, nil } - -// Validate new namespace inputs only. Validation that applies to namespaces regardless of create/update/etc -// belongs in the option-specific Validate functions which are invoked on every change operation. -func validateNewMetadata(m namespace.Metadata) error { - indexBlockSize := m.Options().RetentionOptions().BlockSize() - retentionBlockSize := m.Options().IndexOptions().BlockSize() - if indexBlockSize != retentionBlockSize { - return fmt.Errorf("index and retention block size must match (%v, %v)", - indexBlockSize, - retentionBlockSize, - ) - } - return nil -} diff --git a/src/query/api/v1/handler/namespace/add_test.go b/src/query/api/v1/handler/namespace/add_test.go index d0aae7c984..0f26d83596 100644 --- a/src/query/api/v1/handler/namespace/add_test.go +++ b/src/query/api/v1/handler/namespace/add_test.go @@ -21,17 +21,17 @@ package namespace import ( + "errors" "io/ioutil" "net/http" "net/http/httptest" "strings" "testing" - "github.com/gogo/protobuf/jsonpb" "github.com/m3db/m3/src/cluster/kv" nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/dbnode/namespace" - "github.com/m3db/m3/src/query/generated/proto/admin" + "github.com/m3db/m3/src/query/api/v1/validators" "github.com/m3db/m3/src/x/instrument" xjson "github.com/m3db/m3/src/x/json" xtest "github.com/m3db/m3/src/x/test" @@ -79,7 +79,7 @@ func TestNamespaceAddHandler(t *testing.T) { defer ctrl.Finish() mockClient, mockKV := setupNamespaceTest(t, ctrl) - addHandler := NewAddHandler(mockClient, instrument.NewOptions()) + addHandler := NewAddHandler(mockClient, instrument.NewOptions(), validators.NamespaceValidator) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil) // Error case where required fields are not set @@ -164,7 +164,7 @@ func TestNamespaceAddHandler_Conflict(t *testing.T) { defer ctrl.Finish() mockClient, mockKV := setupNamespaceTest(t, ctrl) - addHandler := NewAddHandler(mockClient, instrument.NewOptions()) + addHandler := NewAddHandler(mockClient, instrument.NewOptions(), validators.NamespaceValidator) mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil) // Ensure adding an existing namespace returns 409 @@ -173,7 +173,7 @@ func TestNamespaceAddHandler_Conflict(t *testing.T) { registry := nsproto.Registry{ Namespaces: map[string]*nsproto.NamespaceOptions{ - "testNamespace": &nsproto.NamespaceOptions{ + "testNamespace": { BootstrapEnabled: true, FlushEnabled: true, SnapshotEnabled: true, @@ -203,40 +203,56 @@ func TestNamespaceAddHandler_Conflict(t *testing.T) { assert.Equal(t, http.StatusConflict, resp.StatusCode) } -func TestValidateNewMetadata(t *testing.T) { - // Valid. - addReq := new(admin.NamespaceAddRequest) - require.NoError(t, jsonpb.Unmarshal(strings.NewReader(testAddJSON), addReq)) - md, err := namespace.ToMetadata(addReq.Name, addReq.Options) - require.NoError(t, err) - require.NoError(t, validateNewMetadata(md)) - - // Valid without index options. - addReq = new(admin.NamespaceAddRequest) - require.NoError(t, jsonpb.Unmarshal(strings.NewReader(testAddJSON), addReq)) - addReq.Options.RetentionOptions.BlockSizeNanos = 7200000000000 / 2 - addReq.Options.IndexOptions = nil - md, err = namespace.ToMetadata(addReq.Name, addReq.Options) - require.NoError(t, err) - require.NoError(t, validateNewMetadata(md)) - - // Invalid without retention options. - addReq = new(admin.NamespaceAddRequest) - require.NoError(t, jsonpb.Unmarshal(strings.NewReader(testAddJSON), addReq)) - addReq.Options.RetentionOptions = nil - addReq.Options.IndexOptions.BlockSizeNanos = 7200000000000 / 2 - md, err = namespace.ToMetadata(addReq.Name, addReq.Options) - require.Error(t, err) - require.Equal(t, "retention options must be set", err.Error()) - - // Prevent mismatching block sizes. - addReq = new(admin.NamespaceAddRequest) - require.NoError(t, jsonpb.Unmarshal(strings.NewReader(testAddJSON), addReq)) - addReq.Options.RetentionOptions.BlockSizeNanos = 7200000000000 - addReq.Options.IndexOptions.BlockSizeNanos = 7200000000000 * 2 - md, err = namespace.ToMetadata(addReq.Name, addReq.Options) - require.NoError(t, err) - err = validateNewMetadata(md) - require.Error(t, err) - require.Equal(t, "index and retention block size must match (2h0m0s, 4h0m0s)", err.Error()) +func TestNamespaceAddHandler_InvokesNewNamespaceValidator(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient, mockKV := setupNamespaceTest(t, ctrl) + validator := &testNamespaceValidator{} + addHandler := NewAddHandler(mockClient, instrument.NewOptions(), validator) + mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil) + + req := httptest.NewRequest("POST", "/namespace", strings.NewReader(testAddJSON)) + require.NotNil(t, req) + + registry := nsproto.Registry{ + Namespaces: map[string]*nsproto.NamespaceOptions{ + "firstNamespace": { + BootstrapEnabled: true, + FlushEnabled: true, + SnapshotEnabled: true, + WritesToCommitLog: true, + CleanupEnabled: false, + RepairEnabled: false, + RetentionOptions: &nsproto.RetentionOptions{ + RetentionPeriodNanos: 172800000000000, + BlockSizeNanos: 7200000000000, + BufferFutureNanos: 600000000000, + BufferPastNanos: 600000000000, + BlockDataExpiry: true, + BlockDataExpiryAfterNotAccessPeriodNanos: 3600000000000, + }, + }, + }, + } + + mockValue := kv.NewMockValue(ctrl) + mockValue.EXPECT().Unmarshal(gomock.Any()).Return(nil).SetArg(0, registry) + mockValue.EXPECT().Version().Return(0) + mockKV.EXPECT().Get(M3DBNodeNamespacesKey).Return(mockValue, nil) + + w := httptest.NewRecorder() + addHandler.ServeHTTP(svcDefaults, w, req) + resp := w.Result() + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) + assert.Equal(t, 1, validator.invocationCount) +} + +type testNamespaceValidator struct { + invocationCount int +} + +func (v *testNamespaceValidator) ValidateNewNamespace(namespace.Metadata, []namespace.Metadata) error { + v.invocationCount++ + return errors.New("expected validation error") } diff --git a/src/query/api/v1/handler/namespace/common.go b/src/query/api/v1/handler/namespace/common.go index f58ccf05d2..525abfae5d 100644 --- a/src/query/api/v1/handler/namespace/common.go +++ b/src/query/api/v1/handler/namespace/common.go @@ -32,6 +32,7 @@ import ( nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" + "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/storage/m3" "github.com/m3db/m3/src/query/util/queryhttp" "github.com/m3db/m3/src/x/instrument" @@ -113,6 +114,7 @@ func RegisterRoutes( clusters m3.Clusters, defaults []handleroptions.ServiceOptionsDefault, instrumentOpts instrument.Options, + namespaceValidator options.NamespaceValidator, ) error { applyMiddleware := func( f func(svc handleroptions.ServiceNameAndDefaults, @@ -140,7 +142,7 @@ func RegisterRoutes( // Add M3DB namespaces. if err := r.Register(queryhttp.RegisterOptions{ Path: M3DBAddURL, - Handler: applyMiddleware(NewAddHandler(client, instrumentOpts).ServeHTTP, defaults), + Handler: applyMiddleware(NewAddHandler(client, instrumentOpts, namespaceValidator).ServeHTTP, defaults), Methods: []string{AddHTTPMethod}, }); err != nil { return err diff --git a/src/query/api/v1/handler/namespace/update.go b/src/query/api/v1/handler/namespace/update.go index 01eb3eb3b2..5dd152a74e 100644 --- a/src/query/api/v1/handler/namespace/update.go +++ b/src/query/api/v1/handler/namespace/update.go @@ -54,7 +54,6 @@ var ( fieldNameRetentionPeriod = "RetentionPeriodNanos" fieldNameRuntimeOptions = "RuntimeOptions" fieldNameAggregationOptions = "AggregationOptions" - fieldNameExtendedOptions = "ExtendedOptions" errEmptyNamespaceName = errors.New("must specify namespace name") errEmptyNamespaceOptions = errors.New("update options cannot be empty") @@ -64,7 +63,6 @@ var ( fieldNameRetentionOptions: {}, fieldNameRuntimeOptions: {}, fieldNameAggregationOptions: {}, - fieldNameExtendedOptions: {}, } ) diff --git a/src/query/api/v1/handler/namespace/update_test.go b/src/query/api/v1/handler/namespace/update_test.go index 988c14b1ce..38ba2282ab 100644 --- a/src/query/api/v1/handler/namespace/update_test.go +++ b/src/query/api/v1/handler/namespace/update_test.go @@ -62,10 +62,6 @@ const ( } } ] - }, - "extendedOptions": { - "@type": "testm3db.io/m3.test.PingResponse", - "Value": "bar" } } } @@ -199,7 +195,7 @@ func TestNamespaceUpdateHandler(t *testing.T) { "schemaOptions": nil, "stagingState": xjson.Map{"status": "UNKNOWN"}, "coldWritesEnabled": false, - "extendedOptions": xtest.NewExtendedOptionsJson("bar"), + "extendedOptions": xtest.NewExtendedOptionsJson("foo"), }, }, }, diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 83b34ec6bf..447146caa7 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -51,7 +51,7 @@ import ( "github.com/gorilla/mux" "github.com/opentracing-contrib/go-stdlib/nethttp" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/prometheus/prometheus/util/httputil" "go.uber.org/zap" ) @@ -395,7 +395,7 @@ func (h *Handler) RegisterRoutes() error { if clusterClient != nil { err = database.RegisterRoutes(h.registry, clusterClient, h.options.Config(), h.options.EmbeddedDbCfg(), - serviceOptionDefaults, instrumentOpts) + serviceOptionDefaults, instrumentOpts, h.options.NamespaceValidator()) if err != nil { return err } @@ -407,7 +407,8 @@ func (h *Handler) RegisterRoutes() error { } err = namespace.RegisterRoutes(h.registry, clusterClient, - h.options.Clusters(), serviceOptionDefaults, instrumentOpts) + h.options.Clusters(), serviceOptionDefaults, instrumentOpts, + h.options.NamespaceValidator()) if err != nil { return err } diff --git a/src/query/api/v1/options/handler.go b/src/query/api/v1/options/handler.go index db69f58611..429c990531 100644 --- a/src/query/api/v1/options/handler.go +++ b/src/query/api/v1/options/handler.go @@ -30,7 +30,9 @@ import ( "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" + dbnamespace "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" + "github.com/m3db/m3/src/query/api/v1/validators" "github.com/m3db/m3/src/query/executor" graphite "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/models" @@ -206,9 +208,13 @@ type HandlerOptions interface { // SetStoreMetricsType enables/disables storing of metrics type. SetStoreMetricsType(value bool) HandlerOptions - // StoreMetricsType returns true if storing of metrics type is enabled. StoreMetricsType() bool + + // SetNamespaceValidator sets the NamespaceValidator. + SetNamespaceValidator(NamespaceValidator) HandlerOptions + // NamespaceValidator returns the NamespaceValidator. + NamespaceValidator() NamespaceValidator } // HandlerOptions represents handler options. @@ -235,6 +241,7 @@ type handlerOptions struct { instantQueryRouter QueryRouter graphiteStorageOpts graphite.M3WrappedStorageOptions m3dbOpts m3db.Options + namespaceValidator NamespaceValidator storeMetricsType bool } @@ -297,6 +304,7 @@ func NewHandlerOptions( graphiteStorageOpts: graphiteStorageOpts, m3dbOpts: m3dbOpts, storeMetricsType: storeMetricsType, + namespaceValidator: validators.NamespaceValidator, }, nil } @@ -548,3 +556,19 @@ func (o *handlerOptions) SetStoreMetricsType(value bool) HandlerOptions { func (o *handlerOptions) StoreMetricsType() bool { return o.storeMetricsType } + +func (o *handlerOptions) SetNamespaceValidator(value NamespaceValidator) HandlerOptions { + opts := *o + opts.namespaceValidator = value + return &opts +} + +func (o *handlerOptions) NamespaceValidator() NamespaceValidator { + return o.namespaceValidator +} + +// NamespaceValidator defines namespace validation logics. +type NamespaceValidator interface { + // ValidateNewNamespace gets invoked when creating a new namespace. + ValidateNewNamespace(newNs dbnamespace.Metadata, existing []dbnamespace.Metadata) error +} diff --git a/src/query/api/v1/validators/validators.go b/src/query/api/v1/validators/validators.go new file mode 100644 index 0000000000..4b521699b6 --- /dev/null +++ b/src/query/api/v1/validators/validators.go @@ -0,0 +1,69 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//package validators contains validation logics for the api. +package validators + +import ( + "errors" + "fmt" + + "github.com/m3db/m3/src/dbnode/namespace" + xerrors "github.com/m3db/m3/src/x/errors" +) + +var ( + // NamespaceValidator is an instance of namespaceValidator. + NamespaceValidator = &namespaceValidator{} + + // ErrNamespaceExists is returned when trying to create a namespace with id that already exists. + ErrNamespaceExists = errors.New("namespace with the same ID already exists") +) + +type namespaceValidator struct{} + +// Validate new namespace inputs only. Validation that applies to namespaces +// regardless of create/update/etc belongs in the option-specific Validate +// functions which are invoked on every change operation. +func (h *namespaceValidator) ValidateNewNamespace( + ns namespace.Metadata, + existing []namespace.Metadata, +) error { + var ( + id = ns.ID() + indexBlockSize = ns.Options().RetentionOptions().BlockSize() + retentionBlockSize = ns.Options().IndexOptions().BlockSize() + ) + + if indexBlockSize != retentionBlockSize { + return xerrors.NewInvalidParamsError( + fmt.Errorf("index and retention block size must match (%v, %v)", + indexBlockSize, + retentionBlockSize)) + } + + for _, existingNs := range existing { + if id.Equal(existingNs.ID()) { + return ErrNamespaceExists + } + } + + return nil +} diff --git a/src/query/api/v1/validators/validators_test.go b/src/query/api/v1/validators/validators_test.go new file mode 100644 index 0000000000..2381384113 --- /dev/null +++ b/src/query/api/v1/validators/validators_test.go @@ -0,0 +1,62 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package validators + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/x/ident" +) + +var ( + id = ident.BytesID("id") + opts = namespace.NewOptions() +) + +func TestValidateNewNamespace(t *testing.T) { + valid, err := namespace.NewMetadata(id, opts) + require.NoError(t, err) + + assert.NoError(t, NamespaceValidator.ValidateNewNamespace(valid, nil)) +} + +func TestValidateNewNamespaceFailOnBlockSize(t *testing.T) { + mismatchingBlockOpts := opts. + SetRetentionOptions(opts.RetentionOptions().SetBlockSize(7200000000000)). + SetIndexOptions(opts.IndexOptions().SetBlockSize(7200000000000 * 2)) + mismatchingBlocks, err := namespace.NewMetadata(id, mismatchingBlockOpts) + require.NoError(t, err) + + err = NamespaceValidator.ValidateNewNamespace(mismatchingBlocks, nil) + assert.EqualError(t, err, "index and retention block size must match (2h0m0s, 4h0m0s)") +} + +func TestValidateNewNamespaceFailDuplicate(t *testing.T) { + ns, err := namespace.NewMetadata(id, opts) + require.NoError(t, err) + + err = NamespaceValidator.ValidateNewNamespace(ns, []namespace.Metadata{ns}) + assert.Equal(t, ErrNamespaceExists, err) +} diff --git a/src/query/server/query.go b/src/query/server/query.go index 303ea23fd4..e82d31232d 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -72,7 +72,7 @@ import ( "github.com/go-kit/kit/log" kitlogzap "github.com/go-kit/kit/log/zap" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" extprom "github.com/prometheus/client_golang/prometheus" prometheuspromql "github.com/prometheus/prometheus/promql" diff --git a/src/query/ts/m3db/options.go b/src/query/ts/m3db/options.go index a808e62647..43867c654e 100644 --- a/src/query/ts/m3db/options.go +++ b/src/query/ts/m3db/options.go @@ -69,16 +69,10 @@ type encodedBlockOptions struct { instrumented bool } -type nextDetails struct { - peek peekValue - iter encoding.SeriesIterator - collector consolidators.StepCollector -} - // NewOptions creates a default encoded block options which dictates how // encoded blocks are generated. func NewOptions() Options { - bytesPool := pool.NewCheckedBytesPool([]pool.Bucket{pool.Bucket{ + bytesPool := pool.NewCheckedBytesPool([]pool.Bucket{{ Capacity: defaultCapacity, Count: defaultCount, }}, nil, func(s []pool.Bucket) pool.BytesPool {