Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Extract new namespace validation logics #2919

Merged
merged 14 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions src/dbnode/namespace/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var (
}

validNamespaceOpts = []nsproto.NamespaceOptions{
nsproto.NamespaceOptions{
{
BootstrapEnabled: true,
FlushEnabled: true,
WritesToCommitLog: true,
Expand All @@ -78,7 +78,7 @@ var (
ExtendedOptions: validExtendedOpts,
StagingState: &nsproto.StagingState{Status: nsproto.StagingStatus_INITIALIZING},
},
nsproto.NamespaceOptions{
{
BootstrapEnabled: true,
FlushEnabled: true,
WritesToCommitLog: true,
Expand All @@ -92,15 +92,15 @@ var (
}

validNamespaceSchemaOpts = []nsproto.NamespaceOptions{
nsproto.NamespaceOptions{
{
RetentionOptions: &validRetentionOpts,
SchemaOptions: testSchemaOptions,
},
}

invalidRetentionOpts = []nsproto.RetentionOptions{
// block size < buffer past
nsproto.RetentionOptions{
{
RetentionPeriodNanos: toNanos(1200), // 20h
BlockSizeNanos: toNanos(2), // 2m
BufferFutureNanos: toNanos(12), // 12m
Expand All @@ -109,7 +109,7 @@ var (
BlockDataExpiryAfterNotAccessPeriodNanos: toNanos(30), // 30m
},
// block size > retention
nsproto.RetentionOptions{
{
RetentionPeriodNanos: toNanos(1200), // 20h
BlockSizeNanos: toNanos(1260), // 21h
BufferFutureNanos: toNanos(12), // 12m
Expand Down Expand Up @@ -150,15 +150,28 @@ func TestNamespaceToRetentionInvalid(t *testing.T) {
}
}

func TestToNamespaceValid(t *testing.T) {
func TestToMetadataValid(t *testing.T) {
for _, nsopts := range validNamespaceOpts {
nsOpts, err := namespace.ToMetadata("abc", &nsopts)
require.NoError(t, err)
assertEqualMetadata(t, "abc", nsopts, nsOpts)
}
}

func TestToNamespaceInvalid(t *testing.T) {
func TestToMetadataNilIndexOpts(t *testing.T) {
nsopts := validNamespaceOpts[0]

nsopts.RetentionOptions.BlockSizeNanos = 7200000000000 / 2
nsopts.IndexOptions = nil

nsOpts, err := namespace.ToMetadata("id", &nsopts)
require.NoError(t, err)
assert.Equal(t,
time.Duration(nsopts.RetentionOptions.BlockSizeNanos),
nsOpts.Options().IndexOptions().BlockSize())
}

func TestToMetadataInvalid(t *testing.T) {
for _, nsopts := range validNamespaceOpts {
_, err := namespace.ToMetadata("", &nsopts)
require.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ type NewTileAggregatorFn func(iOpts instrument.Options) TileAggregator

// NamespaceHooks allows dynamic plugging into the namespace lifecycle.
type NamespaceHooks interface {
// OnCreatedNamespace gets invoked after each namespace is created.
// OnCreatedNamespace gets invoked after each namespace is initialized.
OnCreatedNamespace(Namespace, GetNamespaceFn) error
}

Expand Down
4 changes: 3 additions & 1 deletion src/query/api/v1/handler/database/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config"
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/util/queryhttp"
"github.com/m3db/m3/src/x/instrument"
)
Expand All @@ -45,9 +46,10 @@ func RegisterRoutes(
embeddedDbCfg *dbconfig.DBConfiguration,
defaults []handleroptions.ServiceOptionsDefault,
instrumentOpts instrument.Options,
namespaceValidator options.NamespaceValidator,
) error {
createHandler, err := NewCreateHandler(client, cfg, embeddedDbCfg,
defaults, instrumentOpts)
defaults, instrumentOpts, namespaceValidator)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions src/query/api/v1/handler/database/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/namespace"
"github.com/m3db/m3/src/query/api/v1/handler/placement"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
xerrors "github.com/m3db/m3/src/x/errors"
Expand Down Expand Up @@ -148,6 +149,7 @@ func NewCreateHandler(
embeddedDbCfg *dbconfig.DBConfiguration,
defaults []handleroptions.ServiceOptionsDefault,
instrumentOpts instrument.Options,
namespaceValidator options.NamespaceValidator,
) (http.Handler, error) {
placementHandlerOptions, err := placement.NewHandlerOptions(client,
cfg, nil, instrumentOpts)
Expand All @@ -157,7 +159,7 @@ func NewCreateHandler(
return &createHandler{
placementInitHandler: placement.NewInitHandler(placementHandlerOptions),
placementGetHandler: placement.NewGetHandler(placementHandlerOptions),
namespaceAddHandler: namespace.NewAddHandler(client, instrumentOpts),
namespaceAddHandler: namespace.NewAddHandler(client, instrumentOpts, namespaceValidator),
namespaceGetHandler: namespace.NewGetHandler(client, instrumentOpts),
namespaceDeleteHandler: namespace.NewDeleteHandler(client, instrumentOpts),
embeddedDbCfg: embeddedDbCfg,
Expand Down Expand Up @@ -316,7 +318,7 @@ func (h *createHandler) parseAndValidateRequest(
) (*admin.DatabaseCreateRequest, []*admin.NamespaceAddRequest, *admin.PlacementInitRequest, error) {
requirePlacement := existingPlacement == nil

defer r.Body.Close()
defer r.Body.Close() //nolint:errcheck
rBody, err := xhttp.DurationToNanosBytes(r.Body)
if err != nil {
wrapped := fmt.Errorf("error converting duration to nano bytes: %s", err.Error())
Expand Down Expand Up @@ -582,7 +584,7 @@ func defaultedPlacementInitRequest(
numShards = shardMultiplier
replicationFactor = 1
instances = []*placementpb.Instance{
&placementpb.Instance{
{
Id: DefaultLocalHostID,
IsolationGroup: DefaultLocalIsolationGroup,
Zone: DefaultLocalZone,
Expand Down
25 changes: 13 additions & 12 deletions src/query/api/v1/handler/database/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler/namespace"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/validators"
"github.com/m3db/m3/src/x/instrument"
xjson "github.com/m3db/m3/src/x/json"
xtest "github.com/m3db/m3/src/x/test"
Expand Down Expand Up @@ -101,7 +102,7 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -234,7 +235,7 @@ func TestLocalTypeClusteredPlacementAlreadyExists(t *testing.T) {

mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl)
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -280,7 +281,7 @@ func TestLocalTypeWithNumShards(t *testing.T) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)

w := httptest.NewRecorder()
Expand Down Expand Up @@ -410,7 +411,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -539,7 +540,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -682,7 +683,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsHostsProvided(t *testing.T) {
mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(nil, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -737,7 +738,7 @@ func TestClusterTypeHostsPlacementAlreadyExistsExistingIsLocal(t *testing.T) {

mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl)
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -783,7 +784,7 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) {
mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl)
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -953,7 +954,7 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) {
mockClient.EXPECT().Store(gomock.Any()).Return(mockKV, nil).AnyTimes()

createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -1109,7 +1110,7 @@ func TestClusterTypeMissingHostnames(t *testing.T) {
mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound)

createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -1145,7 +1146,7 @@ func TestBadType(t *testing.T) {
mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound)

createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
nil, svcDefaultOptions, instrument.NewOptions())
nil, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)
w := httptest.NewRecorder()

Expand Down Expand Up @@ -1180,7 +1181,7 @@ func TestLocalTypeWithAggregatedNamespace(t *testing.T) {
fakeKV := fake.NewStore()
mockClient.EXPECT().Store(gomock.Any()).Return(fakeKV, nil).AnyTimes()
createHandler, err := NewCreateHandler(mockClient, config.Configuration{},
testDBCfg, svcDefaultOptions, instrument.NewOptions())
testDBCfg, svcDefaultOptions, instrument.NewOptions(), validators.NamespaceValidator)
require.NoError(t, err)

w := httptest.NewRecorder()
Expand Down
55 changes: 20 additions & 35 deletions src/query/api/v1/handler/namespace/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package namespace

import (
"bytes"
"errors"
"fmt"
"net/http"
"path"
Expand All @@ -32,6 +31,8 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/api/v1/validators"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
xerrors "github.com/m3db/m3/src/x/errors"
Expand All @@ -48,21 +49,27 @@ var (

// AddHTTPMethod is the HTTP method used with this resource.
AddHTTPMethod = http.MethodPost

errNamespaceExists = xerrors.NewInvalidParamsError(errors.New("namespace with same ID already exists"))
)

// AddHandler is the handler for namespace adds.
type AddHandler Handler
type AddHandler struct {
Handler

validator options.NamespaceValidator
}

// NewAddHandler returns a new instance of AddHandler.
func NewAddHandler(
client clusterclient.Client,
instrumentOpts instrument.Options,
validator options.NamespaceValidator,
) *AddHandler {
return &AddHandler{
client: client,
instrumentOpts: instrumentOpts,
Handler: Handler{
client: client,
instrumentOpts: instrumentOpts,
},
validator: validator,
}
}

Expand All @@ -84,13 +91,13 @@ func (h *AddHandler) ServeHTTP(
opts := handleroptions.NewServiceOptions(svc, r.Header, nil)
nsRegistry, err := h.Add(md, opts)
if err != nil {
if err == errNamespaceExists {
if err == validators.ErrNamespaceExists {
logger.Error("namespace already exists", zap.Error(err))
xhttp.WriteError(w, xhttp.NewError(err, http.StatusConflict))
return
}

logger.Error("unable to get namespace", zap.Error(err))
logger.Error("unable to add namespace", zap.Error(err))
xhttp.WriteError(w, err)
return
}
Expand All @@ -103,7 +110,7 @@ func (h *AddHandler) ServeHTTP(
}

func (h *AddHandler) parseRequest(r *http.Request) (*admin.NamespaceAddRequest, error) {
defer r.Body.Close()
defer r.Body.Close() // nolint:errcheck
rBody, err := xhttp.DurationToNanosBytes(r.Body)
if err != nil {
return nil, xerrors.NewInvalidParamsError(err)
Expand All @@ -129,10 +136,6 @@ func (h *AddHandler) Add(
return emptyReg, xerrors.NewInvalidParamsError(fmt.Errorf("bad namespace metadata: %v", err))
}

if err := validateNewMetadata(md); err != nil {
return emptyReg, xerrors.NewInvalidParamsError(fmt.Errorf("invalid new namespace metadata: %v", err))
}

store, err := h.client.Store(opts.KVOverrideOptions())
if err != nil {
return emptyReg, err
Expand All @@ -143,15 +146,11 @@ func (h *AddHandler) Add(
return emptyReg, err
}

// Since this endpoint is `/add` and not in-place update, return an error if
// the NS already exists. NewMap will return an error if there's duplicate
// entries with the same name, but it's abstracted away behind a MultiError so
// we can't easily check that it's a conflict in the handler.
for _, ns := range currentMetadata {
if ns.ID().Equal(md.ID()) {
// NB: errNamespaceExists already an invalid params error.
return emptyReg, errNamespaceExists
if err := h.validator.ValidateNewNamespace(md, currentMetadata); err != nil {
if err == validators.ErrNamespaceExists {
return emptyReg, err
}
return emptyReg, xerrors.NewInvalidParamsError(err)
}

newMDs := append(currentMetadata, md)
Expand All @@ -176,17 +175,3 @@ func (h *AddHandler) Add(

return *protoRegistry, nil
}

// Validate new namespace inputs only. Validation that applies to namespaces regardless of create/update/etc
// belongs in the option-specific Validate functions which are invoked on every change operation.
func validateNewMetadata(m namespace.Metadata) error {
indexBlockSize := m.Options().RetentionOptions().BlockSize()
retentionBlockSize := m.Options().IndexOptions().BlockSize()
if indexBlockSize != retentionBlockSize {
return fmt.Errorf("index and retention block size must match (%v, %v)",
indexBlockSize,
retentionBlockSize,
)
}
return nil
}
Loading