Skip to content

Commit

Permalink
Add an otelcol.connector.spanmetrics component (#4894)
Browse files Browse the repository at this point in the history
* Add an otelcol.connector.spanmetrics component
  • Loading branch information
ptodev authored Aug 24, 2023
1 parent 494cde2 commit 7357b37
Show file tree
Hide file tree
Showing 10 changed files with 1,162 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Main (unreleased)
- `discovery.puppetdb` - service discovery from PuppetDB. (@captncraig)
- `otelcol.processor.discovery` adds resource attributes to spans, where the attributes
keys and values are sourced from `discovery.*` components. (@ptodev)
- `otelcol.connector.spanmetrics` - creates OpenTelemetry metrics from traces. (@ptodev)

- Update `YACE` to `v0.54.0`, which includes bugfixes for FIPS support. (@ashrayjain)

Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
_ "github.com/grafana/agent/component/otelcol/auth/headers" // Import otelcol.auth.headers
_ "github.com/grafana/agent/component/otelcol/auth/oauth2" // Import otelcol.auth.oauth2
_ "github.com/grafana/agent/component/otelcol/auth/sigv4" // Import otelcol.auth.sigv4
_ "github.com/grafana/agent/component/otelcol/connector/spanmetrics" // Import otelcol.connector.spanmetrics
_ "github.com/grafana/agent/component/otelcol/exporter/jaeger" // Import otelcol.exporter.jaeger
_ "github.com/grafana/agent/component/otelcol/exporter/loadbalancing" // Import otelcol.exporter.loadbalancing
_ "github.com/grafana/agent/component/otelcol/exporter/logging" // Import otelcol.exporter.logging
Expand Down
206 changes: 206 additions & 0 deletions component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Package connector exposes utilities to create a Flow component from
// OpenTelemetry Collector connectors.
package connector

import (
"context"
"errors"
"os"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/agent/component/otelcol/internal/lazycollector"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/component/otelcol/internal/scheduler"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/util/zapadapter"
"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
otelconnector "go.opentelemetry.io/collector/connector"
otelextension "go.opentelemetry.io/collector/extension"
sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"

_ "github.com/grafana/agent/component/otelcol/internal/featuregate" // Enable needed feature gates
)

const (
ConnectorTracesToTraces = iota
ConnectorTracesToMetrics
ConnectorTracesToLogs
ConnectorMetricsToTraces
ConnectorMetricsToMetrics
ConnectorMetricsToLogs
ConnectorLogsToTraces
ConnectorLogsToMetrics
ConnectorLogsToLogs
)

// Arguments is an extension of component.Arguments which contains necessary
// settings for OpenTelemetry Collector connectors.
type Arguments interface {
component.Arguments

// Convert converts the Arguments into an OpenTelemetry Collector connector
// configuration.
Convert() (otelcomponent.Config, error)

// Extensions returns the set of extensions that the configured component is
// allowed to use.
Extensions() map[otelcomponent.ID]otelextension.Extension

// Exporters returns the set of exporters that are exposed to the configured
// component.
Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component

// NextConsumers returns the set of consumers to send data to.
NextConsumers() *otelcol.ConsumerArguments

ConnectorType() int
}

// Connector is a Flow component shim which manages an OpenTelemetry Collector
// connector component.
type Connector struct {
ctx context.Context
cancel context.CancelFunc

opts component.Options
factory otelconnector.Factory
consumer *lazyconsumer.Consumer

sched *scheduler.Scheduler
collector *lazycollector.Collector
}

var (
_ component.Component = (*Connector)(nil)
_ component.HealthComponent = (*Connector)(nil)
)

// New creates a new Flow component which encapsulates an OpenTelemetry
// Collector connector. args must hold a value of the argument type registered
// with the Flow component.
//
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Connector, error) {
ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)

// Create a lazy collector where metrics from the upstream component will be
// forwarded.
collector := lazycollector.New()
opts.Registerer.MustRegister(collector)

// Immediately set our state with our consumer. The exports will never change
// throughout the lifetime of our component.
//
// This will panic if the wrapping component is not registered to export
// otelcol.ConsumerExports.
opts.OnStateChange(otelcol.ConsumerExports{Input: consumer})

p := &Connector{
ctx: ctx,
cancel: cancel,

opts: opts,
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
collector: collector,
}
if err := p.Update(args); err != nil {
return nil, err
}
return p, nil
}

// Run starts the Connector component.
func (p *Connector) Run(ctx context.Context) error {
defer p.cancel()
return p.sched.Run(ctx)
}

// Update implements component.Component. It will convert the Arguments into
// configuration for OpenTelemetry Collector connector configuration and manage
// the underlying OpenTelemetry Collector connector.
func (p *Connector) Update(args component.Arguments) error {
pargs := args.(Arguments)

host := scheduler.NewHost(
p.opts.Logger,
scheduler.WithHostExtensions(pargs.Extensions()),
scheduler.WithHostExporters(pargs.Exporters()),
)

reg := prometheus.NewRegistry()
p.collector.Set(reg)

promExporter, err := sdkprometheus.New(sdkprometheus.WithRegisterer(reg), sdkprometheus.WithoutTargetInfo())
if err != nil {
return err
}

settings := otelconnector.CreateSettings{
TelemetrySettings: otelcomponent.TelemetrySettings{
Logger: zapadapter.New(p.opts.Logger),

TracerProvider: p.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),
},

BuildInfo: otelcomponent.BuildInfo{
Command: os.Args[0],
Description: "Grafana Agent",
Version: build.Version,
},
}

connectorConfig, err := pargs.Convert()
if err != nil {
return err
}

next := pargs.NextConsumers()

// Create instances of the connector from our factory for each of our
// supported telemetry signals.
var components []otelcomponent.Component

var tracesConnector otelconnector.Traces
var metricsConnector otelconnector.Metrics
var logsConnector otelconnector.Logs

switch pargs.ConnectorType() {
case ConnectorTracesToMetrics:
if len(next.Traces) > 0 || len(next.Logs) > 0 {
return errors.New("this connector can only output metrics")
}

if len(next.Metrics) > 0 {
nextMetrics := fanoutconsumer.Metrics(next.Metrics)
tracesConnector, err = p.factory.CreateTracesToMetrics(p.ctx, settings, connectorConfig, nextMetrics)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if tracesConnector != nil {
components = append(components, tracesConnector)
}
}
default:
return errors.New("unsupported connector type")
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector)
return nil
}

// CurrentHealth implements component.HealthComponent.
func (p *Connector) CurrentHealth() component.Health {
return p.sched.CurrentHealth()
}
160 changes: 160 additions & 0 deletions component/otelcol/connector/spanmetrics/spanmetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Package spanmetrics provides an otelcol.connector.spanmetrics component.
package spanmetrics

import (
"fmt"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/connector"
"github.com/grafana/agent/pkg/river"
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.connector.spanmetrics",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := spanmetricsconnector.NewFactory()
return connector.New(opts, fact, args.(Arguments))
},
})
}

// Arguments configures the otelcol.connector.spanmetrics component.
type Arguments struct {
// Dimensions defines the list of additional dimensions on top of the provided:
// - service.name
// - span.name
// - span.kind
// - status.code
// The dimensions will be fetched from the span's attributes. Examples of some conventionally used attributes:
// https://github.com/open-telemetry/opentelemetry-collector/blob/main/model/semconv/opentelemetry.go.
Dimensions []Dimension `river:"dimension,block,optional"`

// DimensionsCacheSize defines the size of cache for storing Dimensions, which helps to avoid cache memory growing
// indefinitely over the lifetime of the collector.
DimensionsCacheSize int `river:"dimensions_cache_size,attr,optional"`

AggregationTemporality string `river:"aggregation_temporality,attr,optional"`

Histogram HistogramConfig `river:"histogram,block"`

// MetricsEmitInterval is the time period between when metrics are flushed or emitted to the downstream components.
MetricsFlushInterval time.Duration `river:"metrics_flush_interval,attr,optional"`

// Namespace is the namespace of the metrics emitted by the connector.
Namespace string `river:"namespace,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
}

var (
_ river.Validator = (*Arguments)(nil)
_ river.Defaulter = (*Arguments)(nil)
_ connector.Arguments = (*Arguments)(nil)
)

const (
AggregationTemporalityCumulative = "CUMULATIVE"
AggregationTemporalityDelta = "DELTA"
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
DimensionsCacheSize: 1000,
AggregationTemporality: AggregationTemporalityCumulative,
MetricsFlushInterval: 15 * time.Second,
}

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
if args.DimensionsCacheSize <= 0 {
return fmt.Errorf(
"invalid cache size: %v, the maximum number of the items in the cache should be positive",
args.DimensionsCacheSize)
}

if args.MetricsFlushInterval <= 0 {
return fmt.Errorf("metrics_flush_interval must be greater than 0")
}

switch args.AggregationTemporality {
case AggregationTemporalityCumulative, AggregationTemporalityDelta:
// Valid
default:
return fmt.Errorf("invalid aggregation_temporality: %v", args.AggregationTemporality)
}

return nil
}

func convertAggregationTemporality(temporality string) (string, error) {
switch temporality {
case AggregationTemporalityCumulative:
return "AGGREGATION_TEMPORALITY_CUMULATIVE", nil
case AggregationTemporalityDelta:
return "AGGREGATION_TEMPORALITY_DELTA", nil
default:
return "", fmt.Errorf("invalid aggregation_temporality: %v", temporality)
}
}

// Convert implements connector.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
dimensions := make([]spanmetricsconnector.Dimension, 0, len(args.Dimensions))
for _, d := range args.Dimensions {
dimensions = append(dimensions, d.Convert())
}

histogram, err := args.Histogram.Convert()
if err != nil {
return nil, err
}

aggregationTemporality, err := convertAggregationTemporality(args.AggregationTemporality)
if err != nil {
return nil, err
}

return &spanmetricsconnector.Config{
Dimensions: dimensions,
DimensionsCacheSize: args.DimensionsCacheSize,
AggregationTemporality: aggregationTemporality,
Histogram: *histogram,
MetricsFlushInterval: args.MetricsFlushInterval,
Namespace: args.Namespace,
}, nil
}

// Extensions implements connector.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements connector.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements connector.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}

// ConnectorType() int implements connector.Arguments.
func (Arguments) ConnectorType() int {
return connector.ConnectorTracesToMetrics
}
Loading

0 comments on commit 7357b37

Please sign in to comment.