Skip to content

Commit

Permalink
Update config structure for interval processor
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens committed Aug 30, 2024
1 parent 53ad0a4 commit c9701bf
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 20 deletions.
14 changes: 11 additions & 3 deletions processor/intervalprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,17 @@ The following metric types will *not* be aggregated, and will instead be passed,

The following settings can be optionally configured:

* `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s
* `gauge_pass_through`: Whether gauges should pass through as they are to the next component or be aggregated. Default: false
* `summary_pass_through`: Whether summaries should pass through as they are to the next component or be aggregated. Default: false
```yaml
intervalprocessor:
# The interval in which the processor should export the aggregated metrics.
[ interval: <duration> | default = 60s ]

pass_through:
# Whether gauges should be aggregated or passed through to the next component as they are
[ gauge: <bool> | default = false ]
# Whether summaries should be aggregated or passed through to the next component as they are
[ summary: <boo>l | default = false ]
```
## Example of metric flows
Expand Down
14 changes: 10 additions & 4 deletions processor/intervalprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ var _ component.Config = (*Config)(nil)
type Config struct {
// Interval is the time interval at which the processor will aggregate metrics.
Interval time.Duration `mapstructure:"interval"`
// GaugePassThrough is a flag that determines whether gauge metrics should be passed through
// PassThrough is a configuration that determines whether gauge and summary metrics should be passed through
// as they are or aggregated.
GaugePassThrough bool `mapstructure:"gauge_pass_through"`
// SummaryPassThrough is a flag that determines whether summary metrics should be passed through
PassThrough PassThrough `mapstructure:"pass_through"`
}

type PassThrough struct {
// Gauge is a flag that determines whether gauge metrics should be passed through
// as they are or aggregated.
Gauge bool `mapstructure:"gauge"`
// Summary is a flag that determines whether summary metrics should be passed through
// as they are or aggregated.
SummaryPassThrough bool `mapstructure:"summary_pass_through"`
Summary bool `mapstructure:"summary"`
}

// Validate checks whether the input configuration has all of the required fields for the processor.
Expand Down
8 changes: 5 additions & 3 deletions processor/intervalprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ func NewFactory() processor.Factory {

func createDefaultConfig() component.Config {
return &Config{
Interval: 60 * time.Second,
GaugePassThrough: false,
SummaryPassThrough: false,
Interval: 60 * time.Second,
PassThrough: PassThrough{
Gauge: false,
Summary: false,
},
}
}

Expand Down
14 changes: 5 additions & 9 deletions processor/intervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ type Processor struct {
expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint
summaryLookup map[identity.Stream]pmetric.SummaryDataPoint

exportInterval time.Duration
gaugePassThrough bool
summaryPassThrough bool
config *Config

nextConsumer consumer.Metrics
}
Expand All @@ -64,16 +62,14 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics
expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{},
summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{},

exportInterval: config.Interval,
gaugePassThrough: config.GaugePassThrough,
summaryPassThrough: config.SummaryPassThrough,
config: config,

nextConsumer: nextConsumer,
}
}

func (p *Processor) Start(_ context.Context, _ component.Host) error {
exportTicker := time.NewTicker(p.exportInterval)
exportTicker := time.NewTicker(p.config.Interval)
go func() {
for {
select {
Expand Down Expand Up @@ -109,15 +105,15 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
sm.Metrics().RemoveIf(func(m pmetric.Metric) bool {
switch m.Type() {
case pmetric.MetricTypeSummary:
if p.summaryPassThrough {
if p.config.PassThrough.Summary {
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup)
return true
case pmetric.MetricTypeGauge:
if p.gaugePassThrough {
if p.config.PassThrough.Gauge {
return false
}

Expand Down
2 changes: 1 addition & 1 deletion processor/intervalprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestAggregation(t *testing.T) {

var config *Config
for _, tc := range testCases {
config = &Config{Interval: time.Second, GaugePassThrough: tc.passThrough, SummaryPassThrough: tc.passThrough}
config = &Config{Interval: time.Second, PassThrough: PassThrough{Gauge: tc.passThrough, Summary: tc.passThrough}}

t.Run(tc.name, func(t *testing.T) {
// next stores the results of the filter metric processor
Expand Down

0 comments on commit c9701bf

Please sign in to comment.