diff --git a/src/dbnode/client/client_test.go b/src/dbnode/client/client_test.go index cc03431e5a..39b9780ed3 100644 --- a/src/dbnode/client/client_test.go +++ b/src/dbnode/client/client_test.go @@ -53,7 +53,7 @@ func TestClientNewClientValidatesOptions(t *testing.T) { multiOpts := NewMockMultiClusterOptions(ctrl) opts := NewMockOptions(ctrl) multiOpts.EXPECT().Options().Return(opts) - opts.EXPECT().Validate().Return(nil) + opts.EXPECT().Validate().Return(anError) _, err := NewClient(multiOpts) assert.Error(t, err) diff --git a/src/dbnode/client/config.go b/src/dbnode/client/config.go index de35f84625..b64d510a13 100644 --- a/src/dbnode/client/config.go +++ b/src/dbnode/client/config.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/dbnode/topology" xtchannel "github.com/m3db/m3/src/dbnode/x/tchannel" xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/retry" ) @@ -249,6 +250,7 @@ func (c Configuration) NewAdminClient( cfgParams.HashingSeed = c.HashingConfiguration.Seed } + var syncEnvCfg environment.ConfigureResult topoInit := params.TopologyInitializer asyncTopoInits := []topology.Initializer{} @@ -267,6 +269,7 @@ func (c Configuration) NewAdminClient( if envCfg.Async { asyncTopoInits = append(asyncTopoInits, envCfg.TopologyInitializer) } else { + syncEnvCfg = envCfg topoInit = envCfg.TopologyInitializer } } @@ -318,35 +321,35 @@ func (c Configuration) NewAdminClient( return m3tsz.NewReaderIterator(r, intOptimized, encodingOpts) }) - // if c.Proto != nil && c.Proto.Enabled { - // v = v.SetEncodingProto(encodingOpts) - - // schemaRegistry := namespace.NewSchemaRegistry(true, nil) - // if c.Proto.TestOnly { - // // Load schema registry from file. - // deployID := "fromfile" - // for nsID, protoConfig := range c.Proto.SchemaRegistry { - // err = namespace.LoadSchemaRegistryFromFile(schemaRegistry, ident.StringID(nsID), deployID, protoConfig.SchemaFilePath, protoConfig.MessageName) - // if err != nil { - // return nil, xerrors.Wrapf(err, "could not load schema registry from file %s for namespace %s", protoConfig.SchemaFilePath, nsID) - // } - // } - // } else { - // // Load schema registry from m3db metadata store. - // err := loadSchemaRegistryFromKVStore(schemaRegistry, envCfg.KVStore) - // if err != nil { - // return nil, xerrors.Wrap(err, "could not load schema registry from m3db metadata store") - // } - // // Validate the schema deploy ID. - // for nsID, protoConfig := range c.Proto.SchemaRegistry { - // _, err := schemaRegistry.GetSchema(ident.StringID(nsID), protoConfig.SchemaDeployID) - // if err != nil { - // return nil, xerrors.Wrapf(err, "could not find schema for namespace: %s with schema deploy ID: %s", nsID, protoConfig.SchemaDeployID) - // } - // } - // } - // v = v.SetSchemaRegistry(schemaRegistry) - // } + if c.Proto != nil && c.Proto.Enabled { + v = v.SetEncodingProto(encodingOpts) + + schemaRegistry := namespace.NewSchemaRegistry(true, nil) + if c.Proto.TestOnly { + // Load schema registry from file. + deployID := "fromfile" + for nsID, protoConfig := range c.Proto.SchemaRegistry { + err = namespace.LoadSchemaRegistryFromFile(schemaRegistry, ident.StringID(nsID), deployID, protoConfig.SchemaFilePath, protoConfig.MessageName) + if err != nil { + return nil, xerrors.Wrapf(err, "could not load schema registry from file %s for namespace %s", protoConfig.SchemaFilePath, nsID) + } + } + } else { + // Load schema registry from m3db metadata store. + err := loadSchemaRegistryFromKVStore(schemaRegistry, syncEnvCfg.KVStore) + if err != nil { + return nil, xerrors.Wrap(err, "could not load schema registry from m3db metadata store") + } + // Validate the schema deploy ID. + for nsID, protoConfig := range c.Proto.SchemaRegistry { + _, err := schemaRegistry.GetSchema(ident.StringID(nsID), protoConfig.SchemaDeployID) + if err != nil { + return nil, xerrors.Wrapf(err, "could not find schema for namespace: %s with schema deploy ID: %s", nsID, protoConfig.SchemaDeployID) + } + } + } + v = v.SetSchemaRegistry(schemaRegistry) + } u := NewAdminMultiClusterOptions(). SetOptions(v).