Skip to content

Commit

Permalink
[aggregator] Add passthrough functionality in m3aggregator using rawt…
Browse files Browse the repository at this point in the history
…cp server (#2235)

* Add passthrough functionality in m3aggregator using rawtcp server

The m3aggregator instances will take passthrough metrics (with timestamp and storage policy), and directly write them to the ingestion tier.
  • Loading branch information
Siyu authored Apr 15, 2020
1 parent fdcd250 commit 4e5e6cd
Show file tree
Hide file tree
Showing 33 changed files with 1,508 additions and 164 deletions.
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/aggregator/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ aggregator:
flushInterval: 1s
writeBufferSize: 16384
readBufferSize: 256
passthrough:
enabled: true
forwarding:
maxConstDelay: 1m # Need to add some buffer window, since timed metrics by default are delayed by 1min.
entryTTL: 1h
Expand Down
140 changes: 114 additions & 26 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/m3db/m3/src/aggregator/aggregator/handler"
"github.com/m3db/m3/src/aggregator/aggregator/handler/writer"
"github.com/m3db/m3/src/aggregator/client"
"github.com/m3db/m3/src/aggregator/sharding"
"github.com/m3db/m3/src/cluster/placement"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/m3db/m3/src/metrics/metric/aggregated"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/x/clock"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
Expand Down Expand Up @@ -77,6 +79,9 @@ type Aggregator interface {
// AddForwarded adds a forwarded metric with metadata.
AddForwarded(metric aggregated.ForwardedMetric, metadata metadata.ForwardMetadata) error

// AddPassthrough adds a passthrough metric with storage policy.
AddPassthrough(metric aggregated.Metric, storagePolicy policy.StoragePolicy) error

// Resign stops the aggregator from participating in leader election and resigns
// from ongoing campaign if any.
Resign() error
Expand All @@ -103,6 +108,7 @@ type aggregator struct {
electionManager ElectionManager
flushManager FlushManager
flushHandler handler.Handler
passthroughWriter writer.Writer
adminClient client.AdminClient
resignTimeout time.Duration

Expand Down Expand Up @@ -137,6 +143,7 @@ func NewAggregator(opts Options) Aggregator {
electionManager: opts.ElectionManager(),
flushManager: opts.FlushManager(),
flushHandler: opts.FlushHandler(),
passthroughWriter: opts.PassthroughWriter(),
adminClient: opts.AdminClient(),
resignTimeout: opts.ResignTimeout(),
doneCh: make(chan struct{}),
Expand Down Expand Up @@ -257,6 +264,43 @@ func (agg *aggregator) AddForwarded(
return nil
}

func (agg *aggregator) AddPassthrough(
metric aggregated.Metric,
storagePolicy policy.StoragePolicy,
) error {
callStart := agg.nowFn()
agg.metrics.passthrough.Inc(1)

if agg.electionManager.ElectionState() == FollowerState {
agg.metrics.addPassthrough.ReportFollowerNoop()
return nil
}

pw, err := agg.passWriter()
if err != nil {
agg.metrics.addPassthrough.ReportError(err)
return err
}

mp := aggregated.ChunkedMetricWithStoragePolicy{
ChunkedMetric: aggregated.ChunkedMetric{
ChunkedID: id.ChunkedID{
Data: []byte(metric.ID),
},
TimeNanos: metric.TimeNanos,
Value: metric.Value,
},
StoragePolicy: storagePolicy,
}

if err := pw.Write(mp); err != nil {
agg.metrics.addPassthrough.ReportError(err)
return err
}
agg.metrics.addPassthrough.ReportSuccess(agg.nowFn().Sub(callStart))
return nil
}

func (agg *aggregator) Resign() error {
ctx, cancel := context.WithTimeout(context.Background(), agg.resignTimeout)
defer cancel()
Expand Down Expand Up @@ -284,13 +328,29 @@ func (agg *aggregator) Close() error {
agg.closeShardSetWithLock()
}
agg.flushHandler.Close()
agg.passthroughWriter.Close()
if agg.adminClient != nil {
agg.adminClient.Close()
}
agg.state = aggregatorClosed
return nil
}

func (agg *aggregator) passWriter() (writer.Writer, error) {
agg.RLock()
defer agg.RUnlock()

if agg.state != aggregatorOpen {
return nil, errAggregatorNotOpenOrClosed
}

if agg.electionManager.ElectionState() == FollowerState {
return writer.NewBlackholeWriter(), nil
}

return agg.passthroughWriter, nil
}

func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) {
agg.RLock()
shard, err := agg.shardForWithLock(id, noUpdateShards)
Expand Down Expand Up @@ -760,6 +820,29 @@ func (m *aggregatorAddTimedMetrics) ReportError(err error) {
}
}

type aggregatorAddPassthroughMetrics struct {
aggregatorAddMetricMetrics
followerNoop tally.Counter
}

func newAggregatorAddPassthroughMetrics(
scope tally.Scope,
samplingRate float64,
) aggregatorAddPassthroughMetrics {
return aggregatorAddPassthroughMetrics{
aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate),
followerNoop: scope.Counter("follower-noop"),
}
}

func (m *aggregatorAddPassthroughMetrics) ReportError(err error) {
m.aggregatorAddMetricMetrics.ReportError(err)
}

func (m *aggregatorAddPassthroughMetrics) ReportFollowerNoop() {
m.followerNoop.Inc(1)
}

type latencyBucketKey struct {
resolution time.Duration
numForwardedTimes int
Expand Down Expand Up @@ -930,19 +1013,21 @@ func newAggregatorShardSetIDMetrics(scope tally.Scope) aggregatorShardSetIDMetri
}

type aggregatorMetrics struct {
counters tally.Counter
timers tally.Counter
timerBatches tally.Counter
gauges tally.Counter
forwarded tally.Counter
timed tally.Counter
addUntimed aggregatorAddUntimedMetrics
addTimed aggregatorAddTimedMetrics
addForwarded aggregatorAddForwardedMetrics
placement aggregatorPlacementMetrics
shards aggregatorShardsMetrics
shardSetID aggregatorShardSetIDMetrics
tick aggregatorTickMetrics
counters tally.Counter
timers tally.Counter
timerBatches tally.Counter
gauges tally.Counter
forwarded tally.Counter
timed tally.Counter
passthrough tally.Counter
addUntimed aggregatorAddUntimedMetrics
addTimed aggregatorAddTimedMetrics
addForwarded aggregatorAddForwardedMetrics
addPassthrough aggregatorAddPassthroughMetrics
placement aggregatorPlacementMetrics
shards aggregatorShardsMetrics
shardSetID aggregatorShardSetIDMetrics
tick aggregatorTickMetrics
}

func newAggregatorMetrics(
Expand All @@ -953,24 +1038,27 @@ func newAggregatorMetrics(
addUntimedScope := scope.SubScope("addUntimed")
addTimedScope := scope.SubScope("addTimed")
addForwardedScope := scope.SubScope("addForwarded")
addPassthroughScope := scope.SubScope("addPassthrough")
placementScope := scope.SubScope("placement")
shardsScope := scope.SubScope("shards")
shardSetIDScope := scope.SubScope("shard-set-id")
tickScope := scope.SubScope("tick")
return aggregatorMetrics{
counters: scope.Counter("counters"),
timers: scope.Counter("timers"),
timerBatches: scope.Counter("timer-batches"),
gauges: scope.Counter("gauges"),
forwarded: scope.Counter("forwarded"),
timed: scope.Counter("timed"),
addUntimed: newAggregatorAddUntimedMetrics(addUntimedScope, samplingRate),
addTimed: newAggregatorAddTimedMetrics(addTimedScope, samplingRate),
addForwarded: newAggregatorAddForwardedMetrics(addForwardedScope, samplingRate, maxAllowedForwardingDelayFn),
placement: newAggregatorPlacementMetrics(placementScope),
shards: newAggregatorShardsMetrics(shardsScope),
shardSetID: newAggregatorShardSetIDMetrics(shardSetIDScope),
tick: newAggregatorTickMetrics(tickScope),
counters: scope.Counter("counters"),
timers: scope.Counter("timers"),
timerBatches: scope.Counter("timer-batches"),
gauges: scope.Counter("gauges"),
forwarded: scope.Counter("forwarded"),
timed: scope.Counter("timed"),
passthrough: scope.Counter("passthrough"),
addUntimed: newAggregatorAddUntimedMetrics(addUntimedScope, samplingRate),
addTimed: newAggregatorAddTimedMetrics(addTimedScope, samplingRate),
addForwarded: newAggregatorAddForwardedMetrics(addForwardedScope, samplingRate, maxAllowedForwardingDelayFn),
addPassthrough: newAggregatorAddPassthroughMetrics(addPassthroughScope, samplingRate),
placement: newAggregatorPlacementMetrics(placementScope),
shards: newAggregatorShardsMetrics(shardsScope),
shardSetID: newAggregatorShardSetIDMetrics(shardSetIDScope),
tick: newAggregatorTickMetrics(tickScope),
}
}

Expand Down
33 changes: 33 additions & 0 deletions src/aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ var (
TimeNanos: 12345,
Values: []float64{76109, 23891},
}
testPassthroughMetric = aggregated.Metric{
Type: metric.CounterType,
ID: []byte("testPassthrough"),
TimeNanos: 12345,
Value: 1000,
}
testInvalidMetric = unaggregated.MetricUnion{
Type: metric.UnknownType,
ID: []byte("testInvalid"),
Expand Down Expand Up @@ -126,6 +132,7 @@ var (
SourceID: 1234,
NumForwardedTimes: 3,
}
testPassthroughStroagePolicy = policy.NewStoragePolicy(time.Minute, xtime.Minute, 12*time.Hour)
)

func TestAggregatorOpenAlreadyOpen(t *testing.T) {
Expand Down Expand Up @@ -664,6 +671,25 @@ func TestAggregatorResignSuccess(t *testing.T) {
require.NoError(t, agg.Resign())
}

func TestAggregatorAddPassthroughNotOpen(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

agg, _ := testAggregator(t, ctrl)
err := agg.AddPassthrough(testPassthroughMetric, testPassthroughStroagePolicy)
require.Equal(t, errAggregatorNotOpenOrClosed, err)
}

func TestAggregatorAddPassthroughSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

agg, _ := testAggregator(t, ctrl)
require.NoError(t, agg.Open())
err := agg.AddPassthrough(testPassthroughMetric, testPassthroughStroagePolicy)
require.NoError(t, err)
}

func TestAggregatorStatus(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -1100,6 +1126,7 @@ func testOptions(ctrl *gomock.Controller) Options {
electionMgr.EXPECT().Reset().Return(nil).AnyTimes()
electionMgr.EXPECT().Open(gomock.Any()).Return(nil).AnyTimes()
electionMgr.EXPECT().Close().Return(nil).AnyTimes()
electionMgr.EXPECT().ElectionState().Return(LeaderState).AnyTimes()

flushManager := NewMockFlushManager(ctrl)
flushManager.EXPECT().Reset().Return(nil).AnyTimes()
Expand All @@ -1117,6 +1144,11 @@ func testOptions(ctrl *gomock.Controller) Options {
h.EXPECT().NewWriter(gomock.Any()).Return(w, nil).AnyTimes()
h.EXPECT().Close().AnyTimes()

pw := writer.NewMockWriter(ctrl)
pw.EXPECT().Write(gomock.Any()).Return(nil).AnyTimes()
pw.EXPECT().Flush().Return(nil).AnyTimes()
pw.EXPECT().Close().Return(nil).AnyTimes()

cl := client.NewMockAdminClient(ctrl)
cl.EXPECT().Flush().Return(nil).AnyTimes()
cl.EXPECT().Close().AnyTimes()
Expand All @@ -1133,6 +1165,7 @@ func testOptions(ctrl *gomock.Controller) Options {
SetElectionManager(electionMgr).
SetFlushManager(flushManager).
SetFlushHandler(h).
SetPassthroughWriter(pw).
SetAdminClient(cl).
SetMaxAllowedForwardingDelayFn(infiniteAllowedDelayFn).
SetBufferForFutureTimedMetric(math.MaxInt64).
Expand Down
52 changes: 40 additions & 12 deletions src/aggregator/aggregator/capture/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ import (
type aggregator struct {
sync.RWMutex

numMetricsAdded int
countersWithMetadatas []unaggregated.CounterWithMetadatas
batchTimersWithMetadatas []unaggregated.BatchTimerWithMetadatas
gaugesWithMetadatas []unaggregated.GaugeWithMetadatas
forwardedMetricsWithMetadata []aggregated.ForwardedMetricWithMetadata
timedMetricsWithMetadata []aggregated.TimedMetricWithMetadata
timedMetricsWithMetadatas []aggregated.TimedMetricWithMetadatas
numMetricsAdded int
countersWithMetadatas []unaggregated.CounterWithMetadatas
batchTimersWithMetadatas []unaggregated.BatchTimerWithMetadatas
gaugesWithMetadatas []unaggregated.GaugeWithMetadatas
forwardedMetricsWithMetadata []aggregated.ForwardedMetricWithMetadata
timedMetricsWithMetadata []aggregated.TimedMetricWithMetadata
timedMetricsWithMetadatas []aggregated.TimedMetricWithMetadatas
passthroughMetricsWithMetadata []aggregated.PassthroughMetricWithMetadata
}

// NewAggregator creates a new capturing aggregator.
Expand Down Expand Up @@ -152,6 +153,26 @@ func (agg *aggregator) AddForwarded(
return nil
}

func (agg *aggregator) AddPassthrough(
metric aggregated.Metric,
storagePolicy policy.StoragePolicy,
) error {
// Clone the metric and timed metadata to ensure it cannot be mutated externally.
metric = cloneTimedMetric(metric)
storagePolicy = cloneStoragePolicy(storagePolicy)

agg.Lock()
defer agg.Unlock()

pm := aggregated.PassthroughMetricWithMetadata{
Metric: metric,
StoragePolicy: storagePolicy,
}
agg.passthroughMetricsWithMetadata = append(agg.passthroughMetricsWithMetadata, pm)
agg.numMetricsAdded++
return nil
}

func (agg *aggregator) Resign() error { return nil }
func (agg *aggregator) Status() aggr.RuntimeStatus { return aggr.RuntimeStatus{} }
func (agg *aggregator) Close() error { return nil }
Expand All @@ -167,17 +188,19 @@ func (agg *aggregator) Snapshot() SnapshotResult {
agg.Lock()

result := SnapshotResult{
CountersWithMetadatas: agg.countersWithMetadatas,
BatchTimersWithMetadatas: agg.batchTimersWithMetadatas,
GaugesWithMetadatas: agg.gaugesWithMetadatas,
ForwardedMetricsWithMetadata: agg.forwardedMetricsWithMetadata,
TimedMetricWithMetadata: agg.timedMetricsWithMetadata,
CountersWithMetadatas: agg.countersWithMetadatas,
BatchTimersWithMetadatas: agg.batchTimersWithMetadatas,
GaugesWithMetadatas: agg.gaugesWithMetadatas,
ForwardedMetricsWithMetadata: agg.forwardedMetricsWithMetadata,
TimedMetricWithMetadata: agg.timedMetricsWithMetadata,
PassthroughMetricWithMetadata: agg.passthroughMetricsWithMetadata,
}
agg.countersWithMetadatas = nil
agg.batchTimersWithMetadatas = nil
agg.gaugesWithMetadatas = nil
agg.forwardedMetricsWithMetadata = nil
agg.timedMetricsWithMetadata = nil
agg.passthroughMetricsWithMetadata = nil
agg.numMetricsAdded = 0

agg.Unlock()
Expand Down Expand Up @@ -263,3 +286,8 @@ func cloneTimedMetadata(meta metadata.TimedMetadata) metadata.TimedMetadata {
cloned := meta
return cloned
}

func cloneStoragePolicy(sp policy.StoragePolicy) policy.StoragePolicy {
cloned := sp
return cloned
}
Loading

0 comments on commit 4e5e6cd

Please sign in to comment.