Skip to content

Commit

Permalink
Reinstated proto schema constructor code
Browse files Browse the repository at this point in the history
  • Loading branch information
simonrobb committed Aug 20, 2019
1 parent 63c55c4 commit eb7e7ec
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestClientNewClientValidatesOptions(t *testing.T) {
multiOpts := NewMockMultiClusterOptions(ctrl)
opts := NewMockOptions(ctrl)
multiOpts.EXPECT().Options().Return(opts)
opts.EXPECT().Validate().Return(nil)
opts.EXPECT().Validate().Return(anError)

_, err := NewClient(multiOpts)
assert.Error(t, err)
Expand Down
61 changes: 32 additions & 29 deletions src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/dbnode/topology"
xtchannel "github.com/m3db/m3/src/dbnode/x/tchannel"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/retry"
)
Expand Down Expand Up @@ -249,6 +250,7 @@ func (c Configuration) NewAdminClient(
cfgParams.HashingSeed = c.HashingConfiguration.Seed
}

var syncEnvCfg environment.ConfigureResult
topoInit := params.TopologyInitializer
asyncTopoInits := []topology.Initializer{}

Expand All @@ -267,6 +269,7 @@ func (c Configuration) NewAdminClient(
if envCfg.Async {
asyncTopoInits = append(asyncTopoInits, envCfg.TopologyInitializer)
} else {
syncEnvCfg = envCfg
topoInit = envCfg.TopologyInitializer
}
}
Expand Down Expand Up @@ -318,35 +321,35 @@ func (c Configuration) NewAdminClient(
return m3tsz.NewReaderIterator(r, intOptimized, encodingOpts)
})

// if c.Proto != nil && c.Proto.Enabled {
// v = v.SetEncodingProto(encodingOpts)

// schemaRegistry := namespace.NewSchemaRegistry(true, nil)
// if c.Proto.TestOnly {
// // Load schema registry from file.
// deployID := "fromfile"
// for nsID, protoConfig := range c.Proto.SchemaRegistry {
// err = namespace.LoadSchemaRegistryFromFile(schemaRegistry, ident.StringID(nsID), deployID, protoConfig.SchemaFilePath, protoConfig.MessageName)
// if err != nil {
// return nil, xerrors.Wrapf(err, "could not load schema registry from file %s for namespace %s", protoConfig.SchemaFilePath, nsID)
// }
// }
// } else {
// // Load schema registry from m3db metadata store.
// err := loadSchemaRegistryFromKVStore(schemaRegistry, envCfg.KVStore)
// if err != nil {
// return nil, xerrors.Wrap(err, "could not load schema registry from m3db metadata store")
// }
// // Validate the schema deploy ID.
// for nsID, protoConfig := range c.Proto.SchemaRegistry {
// _, err := schemaRegistry.GetSchema(ident.StringID(nsID), protoConfig.SchemaDeployID)
// if err != nil {
// return nil, xerrors.Wrapf(err, "could not find schema for namespace: %s with schema deploy ID: %s", nsID, protoConfig.SchemaDeployID)
// }
// }
// }
// v = v.SetSchemaRegistry(schemaRegistry)
// }
if c.Proto != nil && c.Proto.Enabled {
v = v.SetEncodingProto(encodingOpts)

schemaRegistry := namespace.NewSchemaRegistry(true, nil)
if c.Proto.TestOnly {
// Load schema registry from file.
deployID := "fromfile"
for nsID, protoConfig := range c.Proto.SchemaRegistry {
err = namespace.LoadSchemaRegistryFromFile(schemaRegistry, ident.StringID(nsID), deployID, protoConfig.SchemaFilePath, protoConfig.MessageName)
if err != nil {
return nil, xerrors.Wrapf(err, "could not load schema registry from file %s for namespace %s", protoConfig.SchemaFilePath, nsID)
}
}
} else {
// Load schema registry from m3db metadata store.
err := loadSchemaRegistryFromKVStore(schemaRegistry, syncEnvCfg.KVStore)
if err != nil {
return nil, xerrors.Wrap(err, "could not load schema registry from m3db metadata store")
}
// Validate the schema deploy ID.
for nsID, protoConfig := range c.Proto.SchemaRegistry {
_, err := schemaRegistry.GetSchema(ident.StringID(nsID), protoConfig.SchemaDeployID)
if err != nil {
return nil, xerrors.Wrapf(err, "could not find schema for namespace: %s with schema deploy ID: %s", nsID, protoConfig.SchemaDeployID)
}
}
}
v = v.SetSchemaRegistry(schemaRegistry)
}

u := NewAdminMultiClusterOptions().
SetOptions(v).
Expand Down

0 comments on commit eb7e7ec

Please sign in to comment.