Skip to content

Commit

Permalink
added filter to spanmetrics processor (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-shopify committed Sep 30, 2022
1 parent b2014b6 commit 98b3b1e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 1 deletion.
3 changes: 3 additions & 0 deletions processor/spanmetricsprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package spanmetricsprocessor // import "github.com/open-telemetry/opentelemetry-
import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterconfig"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -37,6 +38,8 @@ type Dimension struct {
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

filterconfig.MatchConfig `mapstructure:",squash"`

// MetricsExporter is the name of the metrics exporter to use to ship metrics.
MetricsExporter string `mapstructure:"metrics_exporter"`

Expand Down
3 changes: 2 additions & 1 deletion processor/spanmetricsprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter v0.61.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter v0.61.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.61.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.61.0
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector v0.61.0
Expand All @@ -31,6 +32,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/handlers v1.5.1 // indirect
Expand All @@ -50,7 +52,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.1.17 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.61.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.61.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.61.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.61.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions processor/spanmetricsprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache"
)

Expand Down Expand Up @@ -87,12 +88,24 @@ type processorImp struct {
// An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values:
// e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }}
metricKeyToDimensions *cache.Cache

include filterspan.Matcher
exclude filterspan.Matcher
}

func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer consumer.Traces) (*processorImp, error) {
logger.Info("Building spanmetricsprocessor")
pConfig := config.(*Config)

include, err := filterspan.NewMatcher(pConfig.Include)
if err != nil {
return nil, err
}
exclude, err := filterspan.NewMatcher(pConfig.Exclude)
if err != nil {
return nil, err
}

bounds := defaultLatencyHistogramBucketsMs
if pConfig.LatencyHistogramBuckets != nil {
bounds = mapDurationsToMillis(pConfig.LatencyHistogramBuckets)
Expand Down Expand Up @@ -126,6 +139,8 @@ func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer cons
nextConsumer: nextConsumer,
dimensions: pConfig.Dimensions,
metricKeyToDimensions: metricKeyToDimensionsCache,
include: include,
exclude: exclude,
}, nil
}

Expand Down Expand Up @@ -368,6 +383,9 @@ func (p *processorImp) aggregateMetricsForServiceSpans(rspans ptrace.ResourceSpa
spans := ils.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if filterspan.SkipSpan(p.include, p.exclude, span, rspans.Resource(), ils.InstrumentationLibrary()) {
continue
}
p.aggregateMetricsForSpan(serviceName, span, rspans.Resource().Attributes())
}
}
Expand Down

0 comments on commit 98b3b1e

Please sign in to comment.