Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests] Adds in-process implementation of resources.M3Resources #3792

Merged
merged 2 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/integration/resources/inprocess/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package inprocess

import (
"time"

"github.com/cenkalti/backoff/v3"
"go.uber.org/zap"
)

const (
retryMaxInterval = 5 * time.Second
retryMaxTime = time.Minute
)

func retry(op func() error) error {
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = retryMaxInterval
bo.MaxElapsedTime = retryMaxTime
return backoff.Retry(op, bo)
}

func newLogger() (*zap.Logger, error) {
logCfg := zap.NewDevelopmentConfig()
logCfg.DisableStacktrace = true

return logCfg.Build()
}
13 changes: 1 addition & 12 deletions src/integration/resources/inprocess/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"strconv"
"time"

"github.com/cenkalti/backoff/v3"
"go.uber.org/zap"
"gopkg.in/yaml.v2"

Expand All @@ -50,9 +49,6 @@ import (
const (
interruptTimeout = 5 * time.Second
shutdownTimeout = time.Minute

retryMaxInterval = 5 * time.Second
retryMaxTime = time.Minute
)

// coordinator is an in-process implementation of resources.Coordinator for use
Expand Down Expand Up @@ -134,7 +130,7 @@ func NewCoordinator(cfg config.Configuration, opts CoordinatorOptions) (resource

// Configure logger
if opts.Logger == nil {
opts.Logger, err = zap.NewDevelopment()
opts.Logger, err = newLogger()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -368,10 +364,3 @@ func updateCoordinatorFilepaths(cfg config.Configuration) (config.Configuration,

return cfg, tmpDirs, nil
}

func retry(op func() error) error {
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = retryMaxInterval
bo.MaxElapsedTime = retryMaxTime
return backoff.Retry(op, bo)
}
50 changes: 33 additions & 17 deletions src/integration/resources/inprocess/dbnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"os/exec"
"strconv"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -130,7 +132,7 @@ func NewDBNode(cfg config.Configuration, opts DBNodeOptions) (resources.Node, er

// Configure logger
if opts.Logger == nil {
opts.Logger, err = zap.NewDevelopment()
opts.Logger, err = newLogger()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,24 +166,38 @@ func (d *dbNode) start() {
d.started = true
}

func (d *dbNode) HostDetails(port int) (*admin.Host, error) {
// TODO(nate): implement once working on helpers for spinning up
// a multi-node cluster since that's what it's currently being
// used for based on the docker-based implementation.
// NOTES:
// - id we can generate
// - isolation_group set a constant?
// - weight just use 1024?
// - address is 0.0.0.0
// - port is the listen address (get from config)
func (d *dbNode) HostDetails(_ int) (*admin.Host, error) {
_, p, err := net.SplitHostPort(d.cfg.DB.ListenAddressOrDefault())
if err != nil {
return nil, err
}

port, err := strconv.Atoi(p)
if err != nil {
return nil, err
}

hostID, err := d.cfg.DB.HostIDOrDefault().Resolve()
if err != nil {
return nil, err
}

discoverCfg := d.cfg.DB.DiscoveryOrDefault()
envConfig, err := discoverCfg.EnvironmentConfig(hostID)
if err != nil {
return nil, err
}

return &admin.Host{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsolationGroup is empty? Will that affect the sharding algorithm?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I've got a change to support in an upcoming PR. For now, it's OK to be left blank.

Id: "foo",
IsolationGroup: "foo-a",
Zone: "embedded",
Weight: 1024,
Address: "0.0.0.0",
Port: uint32(port),
Id: hostID,
// TODO(nate): add support for multiple etcd services. Practically, this
// is very rare so using the zero-indexed value here will almost always be
// correct.
Zone: envConfig.Services[0].Service.Zone,
// TODO(nate): weight should be configurable
Weight: 1024,
Address: "0.0.0.0",
Port: uint32(port),
}, nil
}

Expand Down
68 changes: 68 additions & 0 deletions src/integration/resources/inprocess/inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package inprocess

import (
"github.com/m3db/m3/src/integration/resources"
"github.com/m3db/m3/src/x/errors"
)

type inprocessM3Resources struct {
coordinator resources.Coordinator
dbNodes resources.Nodes
}

// ResourceOptions are the options for creating new
// resources.M3Resources.
type ResourceOptions struct {
Coordinator resources.Coordinator
DBNodes resources.Nodes
}

// NewM3Resources returns an implementation of resources.M3Resources
// backed by in-process implementations of the M3 components.
func NewM3Resources(options ResourceOptions) resources.M3Resources {
return &inprocessM3Resources{
coordinator: options.Coordinator,
dbNodes: options.DBNodes,
}
}

func (i *inprocessM3Resources) Cleanup() error {
err := errors.NewMultiError()
if i.coordinator != nil {
err = err.Add(i.coordinator.Close())
}

for _, d := range i.dbNodes {
err = err.Add(d.Close())
}

return err.FinalError()
}

func (i *inprocessM3Resources) Nodes() resources.Nodes {
return i.dbNodes
}

func (i *inprocessM3Resources) Coordinator() resources.Coordinator {
return i.coordinator
}
46 changes: 46 additions & 0 deletions src/integration/resources/inprocess/inprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// +build integration_v2
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package inprocess

import (
"testing"

"github.com/m3db/m3/src/integration/resources"

"github.com/stretchr/testify/require"
)

func TestSetupInprocessCluster(t *testing.T) {
dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{})
require.NoError(t, err)

coord, err := NewCoordinatorFromYAML(defaultCoordConfig, CoordinatorOptions{})
require.NoError(t, err)

cluster := NewM3Resources(ResourceOptions{
Coordinator: coord,
DBNodes: resources.Nodes{dbnode},
})

require.NoError(t, resources.SetupCluster(cluster, nil))
require.NoError(t, cluster.Cleanup())
}