Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument obsreport.Processor #36

Merged
merged 6 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 193 additions & 77 deletions obsreport/obsreport_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,29 @@ import (

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

var (
processorName = "processor"
processorScope = scopeName + nameSep + processorName
)

// BuildProcessorCustomMetricName is used to be build a metric name following
// the standards used in the Collector. The configType should be the same
// value used to identify the type on the config.
// value used to identify the type on the component.
func BuildProcessorCustomMetricName(configType, metric string) string {
componentPrefix := obsmetrics.ProcessorPrefix
if !strings.HasSuffix(componentPrefix, obsmetrics.NameSep) {
Expand All @@ -44,6 +58,23 @@ func BuildProcessorCustomMetricName(configType, metric string) string {
type Processor struct {
level configtelemetry.Level
mutators []tag.Mutator

meter metric.Meter
logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue

acceptedSpansCounter syncint64.Counter
refusedSpansCounter syncint64.Counter
droppedSpansCounter syncint64.Counter
acceptedMetricPointsCounter syncint64.Counter
refusedMetricPointsCounter syncint64.Counter
droppedMetricPointsCounter syncint64.Counter
acceptedLogRecordsCounter syncint64.Counter
refusedLogRecordsCounter syncint64.Counter
droppedLogRecordsCounter syncint64.Counter

}

// ProcessorSettings are settings for creating a Processor.
Expand All @@ -54,10 +85,7 @@ type ProcessorSettings struct {

// NewProcessor creates a new Processor.
func NewProcessor(cfg ProcessorSettings) (*Processor, error) {
return &Processor{
level: cfg.ProcessorCreateSettings.MetricsLevel,
mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))},
}, nil
return newProcessor(cfg, featuregate.GetRegistry())
}

// Deprecated: [v0.65.0] use NewProcessor.
Expand All @@ -70,128 +98,216 @@ func MustNewProcessor(cfg ProcessorSettings) *Processor {
return proc
}

func newProcessor(cfg ProcessorSettings, registry *featuregate.Registry) (*Processor, error) {
proc := &Processor{
level: cfg.ProcessorCreateSettings.MetricsLevel,
mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))},
logger: cfg.ProcessorCreateSettings.Logger,
useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
meter: cfg.ProcessorCreateSettings.MeterProvider.Meter(processorScope),
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ProcessorKey, cfg.ProcessorID.String()),
},

}

if err := proc.createOtelMetrics(); err != nil {
return nil, err
}

return proc, nil
}

func (por *Processor) createOtelMetrics() error {
if !por.useOtelForMetrics {
return nil
}

var errors, err error

por.acceptedSpansCounter, err = por.meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.AcceptedSpansKey,
instrument.WithDescription("Number of spans successfully pushed into the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.refusedSpansCounter, err = por.meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.RefusedSpansKey,
instrument.WithDescription("Number of spans that were rejected by the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.droppedSpansCounter, err = por.meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.DroppedSpansKey,
instrument.WithDescription("Number of spans that were dropped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.acceptedMetricPointsCounter, err = por.meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.AcceptedMetricPointsKey,
instrument.WithDescription("Number of metric points successfully pushed into the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.refusedMetricPointsCounter, err = por.meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.RefusedMetricPointsKey,
instrument.WithDescription("Number of metric points that were rejected by the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.droppedMetricPointsCounter, err = por.meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.DroppedMetricPointsKey,
instrument.WithDescription("Number of metric points that were dropped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.acceptedLogRecordsCounter, err = por.meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.AcceptedLogRecordsKey,
instrument.WithDescription("Number of log records successfully pushed into the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.refusedLogRecordsCounter, err = por.meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.RefusedLogRecordsKey,
instrument.WithDescription("Number of log records that were rejected by the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.droppedLogRecordsCounter, err = por.meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.DroppedLogRecordsKey,
instrument.WithDescription("Number of log records that were dropped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

return errors
}

func (por *Processor) recordWithOtel(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {
var acceptedCount, refusedCount, droppedCount syncint64.Counter
switch dataType {
case component.DataTypeTraces:
acceptedCount = por.acceptedSpansCounter
refusedCount = por.refusedSpansCounter
droppedCount = por.droppedSpansCounter
case component.DataTypeMetrics:
acceptedCount = por.acceptedMetricPointsCounter
refusedCount = por.refusedMetricPointsCounter
droppedCount = por.droppedMetricPointsCounter
case component.DataTypeLogs:
acceptedCount = por.acceptedLogRecordsCounter
refusedCount = por.refusedLogRecordsCounter
droppedCount = por.droppedLogRecordsCounter
}

acceptedCount.Add(ctx, accepted, por.otelAttrs...)
refusedCount.Add(ctx, refused, por.otelAttrs...)
droppedCount.Add(ctx, dropped, por.otelAttrs...)
}

func (por *Processor) recordWithOC(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {
var acceptedMeasure, refusedMeasure, droppedMeasure *stats.Int64Measure

switch dataType {
case component.DataTypeTraces:
acceptedMeasure = obsmetrics.ProcessorAcceptedSpans
refusedMeasure = obsmetrics.ProcessorRefusedSpans
droppedMeasure = obsmetrics.ProcessorDroppedSpans
case component.DataTypeMetrics:
acceptedMeasure = obsmetrics.ProcessorAcceptedMetricPoints
refusedMeasure = obsmetrics.ProcessorRefusedMetricPoints
droppedMeasure = obsmetrics.ProcessorDroppedMetricPoints
case component.DataTypeLogs:
acceptedMeasure = obsmetrics.ProcessorAcceptedLogRecords
refusedMeasure = obsmetrics.ProcessorRefusedLogRecords
droppedMeasure = obsmetrics.ProcessorDroppedLogRecords
}

// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
acceptedMeasure.M(accepted),
refusedMeasure.M(refused),
droppedMeasure.M(dropped),
)
}

func (por *Processor) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {
if por.useOtelForMetrics {
por.recordWithOtel(ctx, dataType, accepted, refused, dropped)
} else {
por.recordWithOC(ctx, dataType, accepted, refused, dropped)
}
}

// TracesAccepted reports that the trace data was accepted.
func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedSpans.M(int64(numSpans)),
obsmetrics.ProcessorRefusedSpans.M(0),
obsmetrics.ProcessorDroppedSpans.M(0),
)
por.recordData(ctx, component.DataTypeTraces, int64(numSpans), int64(0), int64(0))
}
}

// TracesRefused reports that the trace data was refused.
func (por *Processor) TracesRefused(ctx context.Context, numSpans int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedSpans.M(0),
obsmetrics.ProcessorRefusedSpans.M(int64(numSpans)),
obsmetrics.ProcessorDroppedSpans.M(0),
)
por.recordData(ctx, component.DataTypeTraces, int64(0), int64(numSpans), int64(0))
}
}

// TracesDropped reports that the trace data was dropped.
func (por *Processor) TracesDropped(ctx context.Context, numSpans int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedSpans.M(0),
obsmetrics.ProcessorRefusedSpans.M(0),
obsmetrics.ProcessorDroppedSpans.M(int64(numSpans)),
)
por.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(numSpans))
}
}

// MetricsAccepted reports that the metrics were accepted.
func (por *Processor) MetricsAccepted(ctx context.Context, numPoints int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedMetricPoints.M(int64(numPoints)),
obsmetrics.ProcessorRefusedMetricPoints.M(0),
obsmetrics.ProcessorDroppedMetricPoints.M(0),
)
por.recordData(ctx, component.DataTypeMetrics, int64(numPoints), int64(0), int64(0))
}
}

// MetricsRefused reports that the metrics were refused.
func (por *Processor) MetricsRefused(ctx context.Context, numPoints int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedMetricPoints.M(0),
obsmetrics.ProcessorRefusedMetricPoints.M(int64(numPoints)),
obsmetrics.ProcessorDroppedMetricPoints.M(0),
)
por.recordData(ctx, component.DataTypeMetrics, int64(0), int64(numPoints), int64(0))
}
}

// MetricsDropped reports that the metrics were dropped.
func (por *Processor) MetricsDropped(ctx context.Context, numPoints int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedMetricPoints.M(0),
obsmetrics.ProcessorRefusedMetricPoints.M(0),
obsmetrics.ProcessorDroppedMetricPoints.M(int64(numPoints)),
)
por.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(numPoints))
}
}

// LogsAccepted reports that the logs were accepted.
func (por *Processor) LogsAccepted(ctx context.Context, numRecords int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedLogRecords.M(int64(numRecords)),
obsmetrics.ProcessorRefusedLogRecords.M(0),
obsmetrics.ProcessorDroppedLogRecords.M(0),
)
por.recordData(ctx, component.DataTypeLogs, int64(numRecords), int64(0), int64(0))
}
}

// LogsRefused reports that the logs were refused.
func (por *Processor) LogsRefused(ctx context.Context, numRecords int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedLogRecords.M(0),
obsmetrics.ProcessorRefusedLogRecords.M(int64(numRecords)),
obsmetrics.ProcessorDroppedMetricPoints.M(0),
)
por.recordData(ctx, component.DataTypeLogs, int64(0), int64(numRecords), int64(0))
}
}

// LogsDropped reports that the logs were dropped.
func (por *Processor) LogsDropped(ctx context.Context, numRecords int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedLogRecords.M(0),
obsmetrics.ProcessorRefusedLogRecords.M(0),
obsmetrics.ProcessorDroppedLogRecords.M(int64(numRecords)),
)
por.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(numRecords))
}
}
Loading