Skip to content

Commit

Permalink
[tests] Adds in-process implementation of resources.M3Resources (#3792)
Browse files Browse the repository at this point in the history
This commit adds an implementation of resources.M3Resources that
is backed by in-process components. Implementation is able
of being configured using resources.SetupCluster which takes
those components and turns them into a cluster ready to
receive reads and writes.
  • Loading branch information
nbroyles authored Oct 4, 2021
1 parent 250cdae commit 6b1a24d
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 29 deletions.
47 changes: 47 additions & 0 deletions src/integration/resources/inprocess/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package inprocess

import (
"time"

"github.com/cenkalti/backoff/v3"
"go.uber.org/zap"
)

const (
retryMaxInterval = 5 * time.Second
retryMaxTime = time.Minute
)

func retry(op func() error) error {
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = retryMaxInterval
bo.MaxElapsedTime = retryMaxTime
return backoff.Retry(op, bo)
}

func newLogger() (*zap.Logger, error) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true

return logCfg.Build()
}
13 changes: 1 addition & 12 deletions src/integration/resources/inprocess/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"strconv"
"time"

"github.com/cenkalti/backoff/v3"
"go.uber.org/zap"
"gopkg.in/yaml.v2"

Expand All @@ -50,9 +49,6 @@ import (
const (
interruptTimeout = 5 * time.Second
shutdownTimeout = time.Minute

retryMaxInterval = 5 * time.Second
retryMaxTime = time.Minute
)

// coordinator is an in-process implementation of resources.Coordinator for use
Expand Down Expand Up @@ -134,7 +130,7 @@ func NewCoordinator(cfg config.Configuration, opts CoordinatorOptions) (resource

// Configure logger
if opts.Logger == nil {
opts.Logger, err = zap.NewDevelopment()
opts.Logger, err = newLogger()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -368,10 +364,3 @@ func updateCoordinatorFilepaths(cfg config.Configuration) (config.Configuration,

return cfg, tmpDirs, nil
}

func retry(op func() error) error {
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = retryMaxInterval
bo.MaxElapsedTime = retryMaxTime
return backoff.Retry(op, bo)
}
50 changes: 33 additions & 17 deletions src/integration/resources/inprocess/dbnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"os/exec"
"strconv"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -130,7 +132,7 @@ func NewDBNode(cfg config.Configuration, opts DBNodeOptions) (resources.Node, er

// Configure logger
if opts.Logger == nil {
opts.Logger, err = zap.NewDevelopment()
opts.Logger, err = newLogger()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,24 +166,38 @@ func (d *dbNode) start() {
d.started = true
}

func (d *dbNode) HostDetails(port int) (*admin.Host, error) {
// TODO(nate): implement once working on helpers for spinning up
// a multi-node cluster since that's what it's currently being
// used for based on the docker-based implementation.
// NOTES:
// - id we can generate
// - isolation_group set a constant?
// - weight just use 1024?
// - address is 0.0.0.0
// - port is the listen address (get from config)
func (d *dbNode) HostDetails(_ int) (*admin.Host, error) {
_, p, err := net.SplitHostPort(d.cfg.DB.ListenAddressOrDefault())
if err != nil {
return nil, err
}

port, err := strconv.Atoi(p)
if err != nil {
return nil, err
}

hostID, err := d.cfg.DB.HostIDOrDefault().Resolve()
if err != nil {
return nil, err
}

discoverCfg := d.cfg.DB.DiscoveryOrDefault()
envConfig, err := discoverCfg.EnvironmentConfig(hostID)
if err != nil {
return nil, err
}

return &admin.Host{
Id: "foo",
IsolationGroup: "foo-a",
Zone: "embedded",
Weight: 1024,
Address: "0.0.0.0",
Port: uint32(port),
Id: hostID,
// TODO(nate): add support for multiple etcd services. Practically, this
// is very rare so using the zero-indexed value here will almost always be
// correct.
Zone: envConfig.Services[0].Service.Zone,
// TODO(nate): weight should be configurable
Weight: 1024,
Address: "0.0.0.0",
Port: uint32(port),
}, nil
}

Expand Down
68 changes: 68 additions & 0 deletions src/integration/resources/inprocess/inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package inprocess

import (
"github.com/m3db/m3/src/integration/resources"
"github.com/m3db/m3/src/x/errors"
)

type inprocessM3Resources struct {
coordinator resources.Coordinator
dbNodes resources.Nodes
}

// ResourceOptions are the options for creating new
// resources.M3Resources.
type ResourceOptions struct {
Coordinator resources.Coordinator
DBNodes resources.Nodes
}

// NewM3Resources returns an implementation of resources.M3Resources
// backed by in-process implementations of the M3 components.
func NewM3Resources(options ResourceOptions) resources.M3Resources {
return &inprocessM3Resources{
coordinator: options.Coordinator,
dbNodes: options.DBNodes,
}
}

func (i *inprocessM3Resources) Cleanup() error {
err := errors.NewMultiError()
if i.coordinator != nil {
err = err.Add(i.coordinator.Close())
}

for _, d := range i.dbNodes {
err = err.Add(d.Close())
}

return err.FinalError()
}

func (i *inprocessM3Resources) Nodes() resources.Nodes {
return i.dbNodes
}

func (i *inprocessM3Resources) Coordinator() resources.Coordinator {
return i.coordinator
}
46 changes: 46 additions & 0 deletions src/integration/resources/inprocess/inprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// +build integration_v2
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package inprocess

import (
"testing"

"github.com/m3db/m3/src/integration/resources"

"github.com/stretchr/testify/require"
)

func TestSetupInprocessCluster(t *testing.T) {
dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{})
require.NoError(t, err)

coord, err := NewCoordinatorFromYAML(defaultCoordConfig, CoordinatorOptions{})
require.NoError(t, err)

cluster := NewM3Resources(ResourceOptions{
Coordinator: coord,
DBNodes: resources.Nodes{dbnode},
})

require.NoError(t, resources.SetupCluster(cluster, nil))
require.NoError(t, cluster.Cleanup())
}

0 comments on commit 6b1a24d

Please sign in to comment.