Skip to content

Commit

Permalink
[aggregator] Do not require aggregator ID to be concat with port and …
Browse files Browse the repository at this point in the history
…add debug logs (#2012)
  • Loading branch information
robskillington authored Oct 21, 2019
1 parent 1eb7e3f commit 71f7318
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 25 deletions.
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ aggregator:
hostID:
resolver: environment
envVarName: M3AGGREGATOR_HOST_ID
instanceID:
type: host_id
metricPrefix: ""
counterPrefix: ""
timerPrefix: ""
Expand Down
2 changes: 1 addition & 1 deletion scripts/development/m3_stack/start_m3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ if [[ "$AGGREGATOR_PIPELINE" = true ]]; then
"replication_factor": 1,
"instances": [
{
"id": "m3aggregator01:6000",
"id": "m3aggregator01",
"isolation_group": "rack-a",
"zone": "embedded",
"weight": 1024,
Expand Down
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/aggregator/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ aggregator:
hostID:
resolver: environment
envVarName: M3AGGREGATOR_HOST_ID
instanceID:
type: host_id
metricPrefix: ""
counterPrefix: ""
timerPrefix: ""
Expand Down
4 changes: 2 additions & 2 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init
"replication_factor": 2,
"instances": [
{
"id": "m3aggregator01:6000",
"id": "m3aggregator01",
"isolation_group": "availability-zone-a",
"zone": "embedded",
"weight": 100,
Expand All @@ -39,7 +39,7 @@ curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init
"port": 6000
},
{
"id": "m3aggregator02:6000",
"id": "m3aggregator02",
"isolation_group": "availability-zone-b",
"zone": "embedded",
"weight": 100,
Expand Down
21 changes: 20 additions & 1 deletion src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -114,6 +115,7 @@ type aggregator struct {
sleepFn sleepFn
shardsPendingClose int32
metrics aggregatorMetrics
logger *zap.Logger
}

// NewAggregator creates a new aggregator.
Expand All @@ -134,9 +136,10 @@ func NewAggregator(opts Options) Aggregator {
flushHandler: opts.FlushHandler(),
adminClient: opts.AdminClient(),
resignTimeout: opts.ResignTimeout(),
metrics: newAggregatorMetrics(scope, samplingRate, opts.MaxAllowedForwardingDelayFn()),
doneCh: make(chan struct{}),
sleepFn: time.Sleep,
metrics: newAggregatorMetrics(scope, samplingRate, opts.MaxAllowedForwardingDelayFn()),
logger: iOpts.Logger(),
}
}

Expand Down Expand Up @@ -318,14 +321,30 @@ func (agg *aggregator) processPlacementWithLock(
if err == nil {
newShardSet = instance.Shards()
} else if err == ErrInstanceNotFoundInPlacement {
// NB(r): Without this log message it's hard for operators to debug
// logs about receiving metrics that the aggregator does not own.
placementInstances := newPlacement.Instances()
placementInstanceIDs := make([]string, 0, len(placementInstances))
for _, instance := range placementInstances {
placementInstanceIDs = append(placementInstanceIDs, instance.ID())
}

msg := "aggregator instance ID must appear in placement: " +
"no shards assigned since not found with current instance ID"
agg.logger.Error(msg,
zap.String("currInstanceID", agg.placementManager.InstanceID()),
zap.Strings("placementInstanceIDs", placementInstanceIDs))

newShardSet = shard.NewShards(nil)
} else {
return err
}

agg.updateShardsWithLock(newStagedPlacement, newPlacement, newShardSet)
if err := agg.updateShardSetIDWithLock(instance); err != nil {
return err
}

agg.metrics.placement.updated.Inc(1)
return nil
}
Expand Down
16 changes: 15 additions & 1 deletion src/aggregator/aggregator/aggregator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions src/aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,19 @@ func TestAggregatorOpenInstanceNotInPlacement(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

placementManager := NewMockPlacementManager(ctrl)

agg, _ := testAggregator(t, ctrl)
agg.placementManager = placementManager

testPlacement := placement.NewPlacement().SetCutoverNanos(5678)
testStagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
placementManager := NewMockPlacementManager(ctrl)

placementManager.EXPECT().Open().Return(nil)
placementManager.EXPECT().InstanceID().Return(agg.opts.PlacementManager().InstanceID())
placementManager.EXPECT().Placement().Return(testStagedPlacement, testPlacement, nil)
placementManager.EXPECT().InstanceFrom(testPlacement).Return(nil, ErrInstanceNotFoundInPlacement)

agg, _ := testAggregator(t, ctrl)
agg.placementManager = placementManager
require.NoError(t, agg.Open())
require.Equal(t, uint32(0), agg.shardSetID)
require.False(t, agg.shardSetOpen)
Expand Down
10 changes: 10 additions & 0 deletions src/aggregator/aggregator/placement_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type PlacementManager interface {
// Open opens the placement manager.
Open() error

// InstanceID returns the configured instance ID.
InstanceID() string

// Placement returns the active staged placement and the active placement.
Placement() (placement.ActiveStagedPlacement, placement.Placement, error)

Expand Down Expand Up @@ -122,6 +125,13 @@ func (mgr *placementManager) Open() error {
return nil
}

func (mgr *placementManager) InstanceID() string {
mgr.RLock()
value := mgr.instanceID
mgr.RUnlock()
return value
}

func (mgr *placementManager) Placement() (placement.ActiveStagedPlacement, placement.Placement, error) {
mgr.RLock()
stagedPlacement, placement, err := mgr.placementWithLock()
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/client/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (t *DropType) UnmarshalYAML(unmarshal func(interface{}) error) error {
*t = defaultDropType
return nil
}
strs := make([]string, len(validDropTypes))
strs := make([]string, 0, len(validDropTypes))
for _, valid := range validDropTypes {
if str == valid.String() {
*t = valid
Expand Down
2 changes: 2 additions & 0 deletions src/aggregator/config/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ aggregator:
hostID:
resolver: environment
envVarName: M3AGGREGATOR_HOST_ID
instanceID:
type: host_id
metricPrefix: ""
counterPrefix: ""
timerPrefix: ""
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/integration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
defaultServerStateChangeTimeout = 5 * time.Second
defaultClientBatchSize = 1440
defaultWorkerPoolSize = 4
defaultInstanceID = "localhost:6000"
defaultInstanceID = "localhost"
defaultPlacementKVKey = "/placement"
defaultElectionKeyFmt = "/shardset/%d/lock"
defaultFlushTimesKeyFmt = "/shardset/%d/flush"
Expand Down
104 changes: 94 additions & 10 deletions src/cmd/services/m3aggregator/config/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os"
"runtime"
"sort"
"strings"
"time"

"github.com/m3db/m3/src/aggregator/aggregation/quantile/cm"
Expand Down Expand Up @@ -61,6 +62,9 @@ type AggregatorConfiguration struct {
// HostID is the local host ID configuration.
HostID *hostid.Configuration `yaml:"hostID"`

// InstanceID is the instance ID configuration.
InstanceID InstanceIDConfiguration `yaml:"instanceID"`

// AggregationTypes configs the aggregation types.
AggregationTypes aggregation.TypesConfiguration `yaml:"aggregationTypes"`

Expand Down Expand Up @@ -152,6 +156,78 @@ type AggregatorConfiguration struct {
EntryPool pool.ObjectPoolConfiguration `yaml:"entryPool"`
}

// InstanceIDType is the instance ID type that defines how the
// instance ID is constructed, which is then used to lookup the
// aggregator instance in the placement.
type InstanceIDType uint

const (
// HostIDPortInstanceIDType specifies to use the host ID
// concatenated with the port to be used for lookup
// in the placement.
// NB: this is a legacy instance ID type and is how the instance
// ID used to be constructed which imposed the strange
// requirement that the instance ID in the topology used to require
// the port concat'd with the host ID).
HostIDPortInstanceIDType InstanceIDType = iota
// HostIDInstanceIDType specifies to just use the host ID
// as the instance ID for lookup in the placement.
HostIDInstanceIDType

// defaultInstanceIDType must be used as the legacy instance ID
// since the config needs to be backwards compatible and for those
// not explicitly specifying the instance ID type it will cause
// existing placements to not work with latest versions of the aggregator
// in a backwards compatible fashion.
defaultInstanceIDType = HostIDPortInstanceIDType
)

func (t InstanceIDType) String() string {
switch t {
case HostIDInstanceIDType:
return "host_id"
case HostIDPortInstanceIDType:
return "host_id_port"
}
return "unknown"
}

var (
validInstanceIDTypes = []InstanceIDType{
HostIDInstanceIDType,
HostIDPortInstanceIDType,
}
)

// UnmarshalYAML unmarshals a InstanceIDType into a valid type from string.
func (t *InstanceIDType) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
}
if str == "" {
*t = defaultInstanceIDType
return nil
}
strs := make([]string, 0, len(validInstanceIDTypes))
for _, valid := range validInstanceIDTypes {
if str == valid.String() {
*t = valid
return nil
}
strs = append(strs, "'"+valid.String()+"'")
}
return fmt.Errorf(
"invalid InstanceIDType '%s' valid types are: %s", str, strings.Join(strs, ", "))
}

// InstanceIDConfiguration is the instance ID configuration.
type InstanceIDConfiguration struct {
// InstanceIDType specifies how to construct the instance ID
// that is used for lookup of the aggregator in the placement.
InstanceIDType InstanceIDType `yaml:"type"`
}

// NewAggregatorOptions creates a new set of aggregator options.
func (c *AggregatorConfiguration) NewAggregatorOptions(
address string,
Expand Down Expand Up @@ -197,7 +273,7 @@ func (c *AggregatorConfiguration) NewAggregatorOptions(
}
opts = opts.SetAdminClient(adminClient)

// Set instance id.
// Set instance ID.
instanceID, err := c.newInstanceID(address)
if err != nil {
return nil, err
Expand Down Expand Up @@ -361,23 +437,31 @@ func (c *AggregatorConfiguration) NewAggregatorOptions(

func (c *AggregatorConfiguration) newInstanceID(address string) (string, error) {
var (
hostName string
err error
hostIDValue string
err error
)
if c.HostID != nil {
hostName, err = c.HostID.Resolve()
hostIDValue, err = c.HostID.Resolve()
} else {
hostName, err = os.Hostname()
hostIDValue, err = os.Hostname()
}
if err != nil {
return "", fmt.Errorf("error determining host name: %v", err)
return "", fmt.Errorf("error determining host ID: %v", err)
}

_, port, err := net.SplitHostPort(address)
if err != nil {
return "", fmt.Errorf("error parsing server address %s: %v", address, err)
switch c.InstanceID.InstanceIDType {
case HostIDInstanceIDType:
return hostIDValue, nil
case HostIDPortInstanceIDType:
_, port, err := net.SplitHostPort(address)
if err != nil {
return "", fmt.Errorf("error parsing server address %s: %v", address, err)
}
return net.JoinHostPort(hostIDValue, port), nil
default:
return "", fmt.Errorf("unknown instance ID type: value=%d, str=%s",
c.InstanceID.InstanceIDType, c.InstanceID.InstanceIDType.String())
}
return net.JoinHostPort(hostName, port), nil
}

func bufferForPastTimedMetricFn(buffer time.Duration) aggregator.BufferForPastTimedMetricFn {
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/topology/consistency_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (l *ConsistencyLevel) UnmarshalYAML(unmarshal func(interface{}) error) erro
if str == "" {
return errConsistencyLevelUnspecified
}
strs := make([]string, len(validConsistencyLevels))
strs := make([]string, 0, len(validConsistencyLevels))
for _, valid := range validConsistencyLevels {
if str == valid.String() {
*l = valid
Expand Down Expand Up @@ -192,7 +192,7 @@ func (l *ConnectConsistencyLevel) UnmarshalYAML(unmarshal func(interface{}) erro
if str == "" {
return errClusterConnectConsistencyLevelUnspecified
}
strs := make([]string, len(validConnectConsistencyLevels))
strs := make([]string, 0, len(validConnectConsistencyLevels))
for _, valid := range validConnectConsistencyLevels {
if str == valid.String() {
*l = valid
Expand Down Expand Up @@ -284,7 +284,7 @@ func (l *ReadConsistencyLevel) UnmarshalYAML(unmarshal func(interface{}) error)
if str == "" {
return errReadConsistencyLevelUnspecified
}
strs := make([]string, len(validReadConsistencyLevels))
strs := make([]string, 0, len(validReadConsistencyLevels))
for _, valid := range validReadConsistencyLevels {
if str == valid.String() {
*l = valid
Expand Down
2 changes: 1 addition & 1 deletion src/x/instrument/extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (t *ExtendedMetricsType) UnmarshalYAML(unmarshal func(interface{}) error) e
*t = DefaultExtendedMetricsType
return nil
}
strs := make([]string, len(validExtendedMetricsTypes))
strs := make([]string, 0, len(validExtendedMetricsTypes))
for _, valid := range validExtendedMetricsTypes {
if str == valid.String() {
*t = valid
Expand Down
2 changes: 1 addition & 1 deletion src/x/instrument/sanitize.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (t *MetricSanitizationType) UnmarshalYAML(unmarshal func(interface{}) error
*t = defaultMetricSanitization
return nil
}
strs := make([]string, len(validMetricSanitizationTypes))
strs := make([]string, 0, len(validMetricSanitizationTypes))
for _, valid := range validMetricSanitizationTypes {
if str == valid.String() {
*t = valid
Expand Down

0 comments on commit 71f7318

Please sign in to comment.