Skip to content

Commit

Permalink
[processor/servicegraph] update own telemetry to use otel (open-telem…
Browse files Browse the repository at this point in the history
…etry#29917)

This updates the servicegraph processor to emit telemetry using
OpenTelemetry instead of OpenCensus.

Related open-telemetry#29867

---------

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
Alex Boten authored and cparkins committed Jan 10, 2024
1 parent fa12f08 commit c19656c
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 69 deletions.
27 changes: 27 additions & 0 deletions .chloggen/codeboten_rm-census-servicegraph.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: servicegraphprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: update own telemetry to use otel

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29917]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 2 additions & 8 deletions processor/servicegraphprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"time"

"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -51,9 +50,6 @@ func init() {

// NewFactory creates a factory for the servicegraph processor.
func NewFactory() processor.Factory {
// TODO: Handle this err
_ = view.Register(serviceGraphProcessorViews()...)

return processor.NewFactory(
typeStr,
createDefaultConfig,
Expand All @@ -63,8 +59,6 @@ func NewFactory() processor.Factory {

// NewConnectorFactoryFunc creates a function that returns a factory for the servicegraph connector.
var NewConnectorFactoryFunc = func(cfgType component.Type, tracesToMetricsStability component.StabilityLevel) connector.Factory {
// TODO: Handle this err
_ = view.Register(serviceGraphProcessorViews()...)
return connector.NewFactory(
cfgType,
createDefaultConfig,
Expand All @@ -84,13 +78,13 @@ func createDefaultConfig() component.Config {
}

func createTracesProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
p := newProcessor(params.Logger, cfg)
p := newProcessor(params.TelemetrySettings, cfg)
p.tracesConsumer = nextConsumer
return p, nil
}

func createTracesToMetricsConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
c := newProcessor(params.Logger, cfg)
c := newProcessor(params.TelemetrySettings, cfg)
c.metricsConsumer = nextConsumer
return c, nil
}
8 changes: 4 additions & 4 deletions processor/servicegraphprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.20

require (
github.com/stretchr/testify v1.8.4
go.opencensus.io v0.24.0
go.opentelemetry.io/collector/component v0.91.0
go.opentelemetry.io/collector/config/configgrpc v0.91.0
go.opentelemetry.io/collector/config/configtelemetry v0.91.0
go.opentelemetry.io/collector/connector v0.91.0
go.opentelemetry.io/collector/consumer v0.91.0
go.opentelemetry.io/collector/exporter v0.91.0
Expand All @@ -17,6 +17,8 @@ require (
go.opentelemetry.io/collector/pdata v1.0.0
go.opentelemetry.io/collector/processor v0.91.0
go.opentelemetry.io/collector/semconv v0.91.0
go.opentelemetry.io/otel/metric v1.21.0
go.opentelemetry.io/otel/sdk/metric v1.21.0
go.uber.org/zap v1.26.0
)

Expand Down Expand Up @@ -67,12 +69,12 @@ require (
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.91.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.91.0 // indirect
go.opentelemetry.io/collector/config/configcompression v0.91.0 // indirect
go.opentelemetry.io/collector/config/confignet v0.91.0 // indirect
go.opentelemetry.io/collector/config/configopaque v0.91.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect
go.opentelemetry.io/collector/config/configtls v0.91.0 // indirect
go.opentelemetry.io/collector/config/internal v0.91.0 // indirect
go.opentelemetry.io/collector/confmap v0.91.0 // indirect
Expand All @@ -93,9 +95,7 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.44.1-0.20231201153405-6027c1ae76f2 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
45 changes: 0 additions & 45 deletions processor/servicegraphprocessor/metrics.go

This file was deleted.

40 changes: 34 additions & 6 deletions processor/servicegraphprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ import (
"sync"
"time"

"go.opencensus.io/stats"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/store"
)

Expand Down Expand Up @@ -76,10 +78,14 @@ type serviceGraphProcessor struct {
metricMutex sync.RWMutex
keyToMetric map[string]metricSeries

statDroppedSpans metric.Int64Counter
statTotalEdges metric.Int64Counter
statExpiredEdges metric.Int64Counter

shutdownCh chan any
}

func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProcessor {
func newProcessor(set component.TelemetrySettings, config component.Config) *serviceGraphProcessor {
pConfig := config.(*Config)

bounds := defaultLatencyHistogramBuckets
Expand All @@ -102,9 +108,28 @@ func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProc
pConfig.VirtualNodePeerAttributes = defaultPeerAttributes
}

scopeName := "processor/servicegraphprocessor"
meter := set.MeterProvider.Meter(scopeName)

droppedSpan, _ := meter.Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "dropped_spans"),
metric.WithDescription("Number of spans dropped when trying to add edges"),
metric.WithUnit("1"),
)
totalEdges, _ := meter.Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "total_edges"),
metric.WithDescription("Total number of unique edges"),
metric.WithUnit("1"),
)
expiredEdges, _ := meter.Int64Counter(
processorhelper.BuildCustomMetricName(metadata.Type, "expired_edges"),
metric.WithDescription("Number of edges that expired before finding its matching span"),
metric.WithUnit("1"),
)

return &serviceGraphProcessor{
config: pConfig,
logger: logger,
logger: set.Logger,
startTime: time.Now(),
reqTotal: make(map[string]int64),
reqFailedTotal: make(map[string]int64),
Expand All @@ -117,6 +142,9 @@ func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProc
reqDurationBounds: bounds,
keyToMetric: make(map[string]metricSeries),
shutdownCh: make(chan any),
statDroppedSpans: droppedSpan,
statTotalEdges: totalEdges,
statExpiredEdges: expiredEdges,
}
}

Expand Down Expand Up @@ -299,7 +327,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace.

if errors.Is(err, store.ErrTooManyItems) {
totalDroppedSpans++
stats.Record(ctx, statDroppedSpans.M(1))
p.statDroppedSpans.Add(ctx, 1)
continue
}

Expand All @@ -309,7 +337,7 @@ func (p *serviceGraphProcessor) aggregateMetrics(ctx context.Context, td ptrace.
}

if isNew {
stats.Record(ctx, statTotalEdges.M(1))
p.statTotalEdges.Add(ctx, 1)
}
}
}
Expand Down Expand Up @@ -354,7 +382,7 @@ func (p *serviceGraphProcessor) onExpire(e *store.Edge) {
zap.Stringer("trace_id", e.TraceID),
)

stats.Record(context.Background(), statExpiredEdges.M(1))
p.statExpiredEdges.Add(context.Background(), 1)

if virtualNodeFeatureGate.IsEnabled() {
e.ConnectionType = store.VirtualNode
Expand Down
Loading

0 comments on commit c19656c

Please sign in to comment.