Skip to content

Commit

Permalink
Add support for consuming OTLP/gRPC metrics (elastic#4722)
Browse files Browse the repository at this point in the history
* Add support for consuming OTLP/gRPC metrics

Add support for consuming basic OTLP/gRPC metrics,
like we support in the OpenTelemetry Collector exporter.
This does not cover distributions or summaries; these
will be added once the Elasticsearch enhancements we
are dependent on are available.

* processor/otel: add comment about [Cc]onsumerStats
# Conflicts:
#	changelogs/head.asciidoc
  • Loading branch information
axw committed Feb 19, 2021
1 parent 9f0a6c7 commit 2b2c05b
Show file tree
Hide file tree
Showing 8 changed files with 697 additions and 11 deletions.
53 changes: 46 additions & 7 deletions beater/otlp/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver/metrics"
"go.opentelemetry.io/collector/receiver/otlpreceiver/trace"
"google.golang.org/grpc"

Expand All @@ -38,8 +39,10 @@ var (
request.IDRequestCount, request.IDResponseCount, request.IDResponseErrorsCount, request.IDResponseValidCount,
}

gRPCConsumerRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.consumer")
gRPCConsumerMonitoringMap = request.MonitoringMapForRegistry(gRPCConsumerRegistry, monitoringKeys)
gRPCMetricsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.metrics")
gRPCMetricsMonitoringMap = request.MonitoringMapForRegistry(gRPCMetricsRegistry, monitoringKeys)
gRPCTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.grpc.traces")
gRPCTracesMonitoringMap = request.MonitoringMapForRegistry(gRPCTracesRegistry, monitoringKeys)
)

// RegisterGRPCServices registers OTLP consumer services with the given gRPC server.
Expand All @@ -48,11 +51,24 @@ func RegisterGRPCServices(grpcServer *grpc.Server, reporter publish.Reporter, lo
consumer: &otel.Consumer{Reporter: reporter},
logger: logger,
}
// TODO(axw) add support for metrics to processer/otel.Consumer, and register a metrics receiver here.

// TODO(axw) rather than registering and unregistering monitoring callbacks
// each time a new consumer is created, we should register one callback and
// have it aggregate metrics from the dynamic consumers.
//
// For now, we take the easy way out: we only have one OTLP gRPC service
// running at any time, so just unregister/register a new one.
gRPCMetricsRegistry.Remove("consumer")
monitoring.NewFunc(gRPCMetricsRegistry, "consumer", consumer.collectMetricsMonitoring, monitoring.Report)

traceReceiver := trace.New("otlp", consumer)
metricsReceiver := metrics.New("otlp", consumer)
if err := otlpreceiver.RegisterTraceReceiver(context.Background(), traceReceiver, grpcServer, nil); err != nil {
return errors.Wrap(err, "failed to register OTLP trace receiver")
}
if err := otlpreceiver.RegisterMetricsReceiver(context.Background(), metricsReceiver, grpcServer, nil); err != nil {
return errors.Wrap(err, "failed to register OTLP metrics receiver")
}
return nil
}

Expand All @@ -63,13 +79,36 @@ type monitoredConsumer struct {

// ConsumeTraces consumes OpenTelemtry trace data.
func (c *monitoredConsumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error {
gRPCConsumerMonitoringMap[request.IDRequestCount].Inc()
defer gRPCConsumerMonitoringMap[request.IDResponseCount].Inc()
gRPCTracesMonitoringMap[request.IDRequestCount].Inc()
defer gRPCTracesMonitoringMap[request.IDResponseCount].Inc()
if err := c.consumer.ConsumeTraces(ctx, traces); err != nil {
gRPCConsumerMonitoringMap[request.IDResponseErrorsCount].Inc()
gRPCTracesMonitoringMap[request.IDResponseErrorsCount].Inc()
c.logger.With(logp.Error(err)).Error("ConsumeTraces returned an error")
return err
}
gRPCConsumerMonitoringMap[request.IDResponseValidCount].Inc()
gRPCTracesMonitoringMap[request.IDResponseValidCount].Inc()
return nil
}

// ConsumeMetrics consumes OpenTelemtry metrics data.
func (c *monitoredConsumer) ConsumeMetrics(ctx context.Context, metrics pdata.Metrics) error {
gRPCMetricsMonitoringMap[request.IDRequestCount].Inc()
defer gRPCMetricsMonitoringMap[request.IDResponseCount].Inc()
if err := c.consumer.ConsumeMetrics(ctx, metrics); err != nil {
gRPCMetricsMonitoringMap[request.IDResponseErrorsCount].Inc()
c.logger.With(logp.Error(err)).Error("ConsumeMetrics returned an error")
return err
}
gRPCMetricsMonitoringMap[request.IDResponseValidCount].Inc()
return nil
}

func (c *monitoredConsumer) collectMetricsMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
V.OnRegistryFinished()

stats := c.consumer.Stats()
monitoring.ReportNamespace(V, "consumer", func() {
monitoring.ReportInt(V, "unsupported_dropped", stats.UnsupportedMetricsDropped)
})
}
81 changes: 78 additions & 3 deletions beater/otlp/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import (
)

var (
exportTraceServiceRequestType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest")
exportTraceServiceResponseType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse")
exportMetricsServiceRequestType = proto.MessageType("opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest")
exportMetricsServiceResponseType = proto.MessageType("opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse")
exportTraceServiceRequestType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest")
exportTraceServiceResponseType = proto.MessageType("opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse")
)

func TestConsumeTraces(t *testing.T) {
Expand Down Expand Up @@ -93,7 +95,7 @@ func TestConsumeTraces(t *testing.T) {
assert.Len(t, events, 2)

actual := map[string]interface{}{}
monitoring.GetRegistry("apm-server.otlp.grpc.consumer").Do(monitoring.Full, func(key string, value interface{}) {
monitoring.GetRegistry("apm-server.otlp.grpc.traces").Do(monitoring.Full, func(key string, value interface{}) {
actual[key] = value
})
assert.Equal(t, map[string]interface{}{
Expand All @@ -104,6 +106,65 @@ func TestConsumeTraces(t *testing.T) {
}, actual)
}

func TestConsumeMetrics(t *testing.T) {
var reportError error
report := func(ctx context.Context, req publish.PendingReq) error {
return reportError
}

// Send a minimal metric to verify that everything is connected properly.
//
// We intentionally do not check the published event contents; those are
// tested in processor/otel.
cannedRequest := jsonExportMetricsServiceRequest(`{
"resource_metrics": [
{
"instrumentation_library_metrics": [
{
"metrics": [
{
"name": "metric_name"
}
]
}
]
}
]
}`)

conn := newServer(t, report)
err := conn.Invoke(
context.Background(), "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
cannedRequest, newExportMetricsServiceResponse(),
)
assert.NoError(t, err)

reportError = errors.New("failed to publish events")
err = conn.Invoke(
context.Background(), "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
cannedRequest, newExportMetricsServiceResponse(),
)
assert.Error(t, err)
errStatus := status.Convert(err)
assert.Equal(t, "failed to publish events", errStatus.Message())

actual := map[string]interface{}{}
monitoring.GetRegistry("apm-server.otlp.grpc.metrics").Do(monitoring.Full, func(key string, value interface{}) {
actual[key] = value
})
assert.Equal(t, map[string]interface{}{
// In both of the requests we send above,
// the metrics do not have a type and so
// we treat them as unsupported metrics.
"consumer.unsupported_dropped": int64(2),

"request.count": int64(2),
"response.count": int64(2),
"response.errors.count": int64(1),
"response.valid.count": int64(1),
}, actual)
}

func jsonExportTraceServiceRequest(j string) interface{} {
request := reflect.New(exportTraceServiceRequestType.Elem()).Interface()
decoder := json.NewDecoder(strings.NewReader(j))
Expand All @@ -118,6 +179,20 @@ func newExportTraceServiceResponse() interface{} {
return reflect.New(exportTraceServiceResponseType.Elem()).Interface()
}

func jsonExportMetricsServiceRequest(j string) interface{} {
request := reflect.New(exportMetricsServiceRequestType.Elem()).Interface()
decoder := json.NewDecoder(strings.NewReader(j))
decoder.DisallowUnknownFields()
if err := decoder.Decode(request); err != nil {
panic(err)
}
return request
}

func newExportMetricsServiceResponse() interface{} {
return reflect.New(exportMetricsServiceResponseType.Elem()).Interface()
}

func newServer(t *testing.T, report publish.Reporter) *grpc.ClientConn {
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
Expand Down
23 changes: 23 additions & 0 deletions processor/otel/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"net/url"
"strconv"
"strings"
"sync/atomic"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
Expand All @@ -65,9 +66,31 @@ const (

// Consumer transforms open-telemetry data to be compatible with elastic APM data
type Consumer struct {
stats consumerStats

Reporter publish.Reporter
}

// ConsumerStats holds a snapshot of statistics about data consumption.
type ConsumerStats struct {
// UnsupportedMetricsDropped records the number of unsupported metrics
// that have been dropped by the consumer.
UnsupportedMetricsDropped int64
}

// consumerStats holds the current statistics, which must be accessed and
// modified using atomic operations.
type consumerStats struct {
unsupportedMetricsDropped int64
}

// Stats returns a snapshot of the current statistics about data consumption.
func (c *Consumer) Stats() ConsumerStats {
return ConsumerStats{
UnsupportedMetricsDropped: atomic.LoadInt64(&c.stats.unsupportedMetricsDropped),
}
}

// ConsumeTraces consumes OpenTelemetry trace data,
// converting into Elastic APM events and reporting to the Elastic APM schema.
func (c *Consumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error {
Expand Down
Loading

0 comments on commit 2b2c05b

Please sign in to comment.