Skip to content

Commit

Permalink
[tests] Improved embedded coordinator support (#3779)
Browse files Browse the repository at this point in the history
This commit fixes a couple small things re: embedded coordinators:

1) Fails fast if dbnode is not started when creating an embedded
coordinator.

2) Adds support for port and filepath replacement in embedded
config.
  • Loading branch information
nbroyles authored and Antanukas committed Oct 4, 2021
1 parent 820880d commit 902d244
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 11 deletions.
25 changes: 18 additions & 7 deletions src/integration/resources/inprocess/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ const (
// coordinator is an in-process implementation of resources.Coordinator for use
// in integration tests.
type coordinator struct {
cfg config.Configuration
client common.CoordinatorClient
logger *zap.Logger
tmpDirs []string
cfg config.Configuration
client common.CoordinatorClient
logger *zap.Logger
tmpDirs []string
embedded bool

interruptCh chan<- error
shutdownCh <-chan struct{}
Expand Down Expand Up @@ -170,7 +171,11 @@ func NewCoordinator(cfg config.Configuration, opts CoordinatorOptions) (resource
// NewEmbeddedCoordinator creates a coordinator from one embedded within an existing
// db node. This method expects that the DB node has already been started before
// being called.
func NewEmbeddedCoordinator(d dbNode) (resources.Coordinator, error) {
func NewEmbeddedCoordinator(d *dbNode) (resources.Coordinator, error) {
if !d.started {
return nil, errors.New("dbnode must be started to create the embedded coordinator")
}

_, p, err := net.SplitHostPort(d.cfg.Coordinator.ListenAddressOrDefault())
if err != nil {
return nil, err
Expand All @@ -189,6 +194,7 @@ func NewEmbeddedCoordinator(d dbNode) (resources.Coordinator, error) {
Logger: d.logger,
RetryFunc: retry,
}),
embedded: true,
logger: d.logger,
interruptCh: d.interruptCh,
shutdownCh: d.shutdownCh,
Expand Down Expand Up @@ -248,6 +254,12 @@ func (c *coordinator) WaitForShardsReady() error {
}

func (c *coordinator) Close() error {
if c.embedded {
// NB(nate): for embedded coordinators, close is handled by the dbnode that
// it is spun up inside of.
return nil
}

defer func() {
for _, dir := range c.tmpDirs {
if err := os.RemoveAll(dir); err != nil {
Expand All @@ -256,7 +268,6 @@ func (c *coordinator) Close() error {
}
}()

// TODO: confirm this works correctly when using an embedded coordinator
select {
case c.interruptCh <- xos.NewInterruptError("in-process coordinator being shut down"):
case <-time.After(interruptTimeout):
Expand All @@ -266,7 +277,7 @@ func (c *coordinator) Close() error {
select {
case <-c.shutdownCh:
case <-time.After(shutdownTimeout):
return errors.New("timeout waiting for shutdown notification. server closing may" +
return errors.New("timeout waiting for shutdown notification. coordinator closing may" +
" not be completely graceful")
}

Expand Down
45 changes: 45 additions & 0 deletions src/integration/resources/inprocess/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@ func TestCreateAnotherCoordinatorInProcess(t *testing.T) {
require.NoError(t, coord.Close())
}

func TestNewEmbeddedCoordinator(t *testing.T) {
dbnode, err := NewDBNodeFromYAML(embeddedCoordConfig, DBNodeOptions{})
require.NoError(t, err)

d, ok := dbnode.(*dbNode)
require.True(t, ok)
require.True(t, d.started)

_, err = NewEmbeddedCoordinator(d)
require.NoError(t, err)

require.NoError(t, dbnode.Close())
}

func TestNewEmbeddedCoordinatorNotStarted(t *testing.T) {
var dbnode dbNode
_, err := NewEmbeddedCoordinator(&dbnode)
require.Error(t, err)
}

// TODO(nate): add more tests exercising other endpoints once dbnode impl is landed

const defaultCoordConfig = `
Expand All @@ -59,3 +79,28 @@ clusters:
endpoints:
- 127.0.0.1:2379
`

const embeddedCoordConfig = `
coordinator:
clusters:
- namespaces:
- namespace: default
type: unaggregated
retention: 1h
client:
config:
service:
env: default_env
zone: embedded
service: m3db
cacheDir: "*"
etcdClusters:
- zone: embedded
endpoints:
- 127.0.0.1:2379
db:
filesystem:
filePathPrefix: "*"
writeNewSeriesAsync: false
`
21 changes: 17 additions & 4 deletions src/integration/resources/inprocess/dbnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type dbNode struct {
cfg config.Configuration
logger *zap.Logger
tmpDirs []string
started bool

interruptCh chan<- error
shutdownCh <-chan struct{}
Expand Down Expand Up @@ -160,6 +161,7 @@ func (d *dbNode) start() {

d.interruptCh = interruptCh
d.shutdownCh = shutdownCh
d.started = true
}

func (d *dbNode) HostDetails(port int) (*admin.Host, error) {
Expand Down Expand Up @@ -287,6 +289,7 @@ func (d *dbNode) Close() error {
" not be completely graceful")
}
}
d.started = false

return nil
}
Expand All @@ -301,13 +304,13 @@ func updateDBNodePorts(cfg config.Configuration) (config.Configuration, error) {
cfg.DB.ListenAddress = &addr
}

if cfg.Coordinator != nil && cfg.Coordinator.ListenAddress != nil {
addr, _, _, err := nettest.MaybeGeneratePort(*cfg.Coordinator.ListenAddress)
if cfg.Coordinator != nil {
coordCfg, err := updateCoordinatorPorts(*cfg.Coordinator)
if err != nil {
return cfg, err
}

cfg.Coordinator.ListenAddress = &addr
cfg.Coordinator = &coordCfg
}

return cfg, nil
Expand All @@ -320,12 +323,22 @@ func updateDBNodeFilepaths(cfg config.Configuration) (config.Configuration, []st
if prefix != nil && *prefix == "*" {
dir, err := ioutil.TempDir("", "m3db-*")
if err != nil {
return cfg, tmpDirs, err
return cfg, nil, err
}

tmpDirs = append(tmpDirs, dir)
cfg.DB.Filesystem.FilePathPrefix = &dir
}

if cfg.Coordinator != nil {
coordCfg, coordDirs, err := updateCoordinatorFilepaths(*cfg.Coordinator)
if err != nil {
return cfg, nil, err
}
tmpDirs = append(tmpDirs, coordDirs...)

cfg.Coordinator = &coordCfg
}

return cfg, tmpDirs, nil
}

0 comments on commit 902d244

Please sign in to comment.