diff --git a/internal/beatcmd/beat.go b/internal/beatcmd/beat.go index 572b69c7792..20aea369326 100644 --- a/internal/beatcmd/beat.go +++ b/internal/beatcmd/beat.go @@ -479,26 +479,67 @@ func (b *Beat) registerStateMetrics() { } func (b *Beat) registerStatsMetrics() { - // "otel-adapter" is an arbitrary name that is not registered elsewhere. - // It does not show up in the metric names as long as we don't call - // v.OnRegistryStart. - monitoring.NewFunc(monitoring.Default, "otel-adapter", func(_ monitoring.Mode, v monitoring.Visitor) { + // TODO: we should ensure all metrics are produced in the expected JSON + // hierarchy for _source compatibility. + libbeatRegistry := monitoring.Default.GetRegistry("libbeat") + monitoring.NewFunc(libbeatRegistry, "output", func(_ monitoring.Mode, v monitoring.Visitor) { var rm metricdata.ResourceMetrics if err := b.metricReader.Collect(context.Background(), &rm); err != nil { return } + v.OnRegistryStart() + defer v.OnRegistryFinished() for _, sm := range rm.ScopeMetrics { switch { case sm.Scope.Name == "github.com/elastic/go-docappender": - adaptDocappenderMetrics(context.Background(), v, sm) - case strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server"): - // All simple scalar metrics that begin with the name "apm-server." - // in github.com/elastic/apm-server/... scopes are mapped directly. - for _, m := range sm.Metrics { - if strings.HasPrefix(m.Name, "apm-server.") { - if value, ok := getScalarInt64(m.Data); ok { - monitoring.ReportInt(v, m.Name, value) - } + addDocappenderLibbeatOutputMetrics(context.Background(), v, sm) + } + } + }) + monitoring.NewFunc(libbeatRegistry, "pipeline", func(_ monitoring.Mode, v monitoring.Visitor) { + var rm metricdata.ResourceMetrics + if err := b.metricReader.Collect(context.Background(), &rm); err != nil { + return + } + v.OnRegistryStart() + defer v.OnRegistryFinished() + for _, sm := range rm.ScopeMetrics { + if sm.Scope.Name == "github.com/elastic/go-docappender" { + addDocappenderLibbeatPipelineMetrics(context.Background(), v, sm) + } + } + }) + monitoring.NewFunc(monitoring.Default, "output.elasticsearch", func(_ monitoring.Mode, v monitoring.Visitor) { + var rm metricdata.ResourceMetrics + if err := b.metricReader.Collect(context.Background(), &rm); err != nil { + return + } + v.OnRegistryStart() + defer v.OnRegistryFinished() + for _, sm := range rm.ScopeMetrics { + switch { + case sm.Scope.Name == "github.com/elastic/go-docappender": + addDocappenderOutputElasticsearchMetrics(context.Background(), v, sm) + } + } + }) + monitoring.NewFunc(monitoring.Default, "apm-server", func(_ monitoring.Mode, v monitoring.Visitor) { + var rm metricdata.ResourceMetrics + if err := b.metricReader.Collect(context.Background(), &rm); err != nil { + return + } + v.OnRegistryStart() + defer v.OnRegistryFinished() + for _, sm := range rm.ScopeMetrics { + if !strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server") { + continue + } + // All simple scalar metrics that begin with the name "apm-server." + // in github.com/elastic/apm-server/... scopes are mapped directly. + for _, m := range sm.Metrics { + if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok { + if value, ok := getScalarInt64(m.Data); ok { + monitoring.ReportInt(v, suffix, value) } } } @@ -527,9 +568,7 @@ func getScalarInt64(data metricdata.Aggregation) (int64, bool) { // Adapt go-docappender's OTel metrics to beats stack monitoring metrics, // with a mixture of libbeat-specific and apm-server specific metric names. -// -// Non-libbeat Elasticsearch output metrics go under "output.elasticsearch". -func adaptDocappenderMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { +func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { for _, m := range sm.Metrics { switch m.Name { case "elasticsearch.events.processed": @@ -550,38 +589,59 @@ func adaptDocappenderMetrics(ctx context.Context, v monitoring.Visitor, sm metri failed++ } } - monitoring.ReportInt(v, "libbeat.output.events.acked", acked) - monitoring.ReportInt(v, "libbeat.output.events.failed", failed) - monitoring.ReportInt(v, "libbeat.output.events.toomany", toomany) + monitoring.ReportInt(v, "events.acked", acked) + monitoring.ReportInt(v, "events.failed", failed) + monitoring.ReportInt(v, "events.toomany", toomany) case "elasticsearch.events.count": if value, ok := getScalarInt64(m.Data); ok { - monitoring.ReportInt(v, "libbeat.output.events.total", value) - monitoring.ReportInt(v, "libbeat.pipeline.events.total", value) + monitoring.ReportInt(v, "events.total", value) } case "elasticsearch.events.queued": if value, ok := getScalarInt64(m.Data); ok { - monitoring.ReportInt(v, "libbeat.output.events.active", value) + monitoring.ReportInt(v, "events.active", value) } case "elasticsearch.flushed.bytes": if value, ok := getScalarInt64(m.Data); ok { - monitoring.ReportInt(v, "libbeat.output.write.bytes", value) + monitoring.ReportInt(v, "write.bytes", value) + } + case "elasticsearch.bulk_requests.count": + if value, ok := getScalarInt64(m.Data); ok { + monitoring.ReportInt(v, "events.batches", value) } + } + } +} + +func addDocappenderLibbeatPipelineMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { + for _, m := range sm.Metrics { + switch m.Name { + case "elasticsearch.events.count": + if value, ok := getScalarInt64(m.Data); ok { + monitoring.ReportInt(v, "events.total", value) + } + } + } +} + +// Add non-libbeat Elasticsearch output metrics under "output.elasticsearch". +func addDocappenderOutputElasticsearchMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { + for _, m := range sm.Metrics { + switch m.Name { case "elasticsearch.bulk_requests.count": if value, ok := getScalarInt64(m.Data); ok { - monitoring.ReportInt(v, "libbeat.output.events.batches", value) - monitoring.ReportInt(v, "output.elasticsearch.bulk_requests.completed", value) + monitoring.ReportInt(v, "bulk_requests.completed", value) } case "elasticsearch.bulk_requests.available": if value, ok := getScalarInt64(m.Data); ok { - monitoring.ReportInt(v, "output.elasticsearch.bulk_requests.available", value) + monitoring.ReportInt(v, "bulk_requests.available", value) } case "elasticsearch.indexer.created": if value, ok := getScalarInt64(m.Data); ok { - monitoring.ReportInt(v, "output.elasticsearch.indexer.created", value) + monitoring.ReportInt(v, "indexer.created", value) } case "elasticsearch.indexer.destroyed": if value, ok := getScalarInt64(m.Data); ok { - monitoring.ReportInt(v, "output.elasticsearch.indexer.destroyed", value) + monitoring.ReportInt(v, "indexer.destroyed", value) } // TODO output.elasticsearch.indexers.active (created - destroyed?) } diff --git a/internal/beater/api/config/agent/handler.go b/internal/beater/api/config/agent/handler.go index 1967123a93d..8faddc6a3b3 100644 --- a/internal/beater/api/config/agent/handler.go +++ b/internal/beater/api/config/agent/handler.go @@ -26,8 +26,6 @@ import ( "github.com/pkg/errors" - "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/elastic/apm-server/internal/agentcfg" "github.com/elastic/apm-server/internal/beater/auth" "github.com/elastic/apm-server/internal/beater/headers" @@ -43,10 +41,6 @@ const ( ) var ( - // MonitoringMap holds a mapping for request.IDs to monitoring counters - MonitoringMap = request.DefaultMonitoringMapForRegistry(registry) - registry = monitoring.Default.NewRegistry("apm-server.acm") - errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds()) ) diff --git a/internal/beater/api/intake/handler.go b/internal/beater/api/intake/handler.go index 4c612cedd23..22ac11c6073 100644 --- a/internal/beater/api/intake/handler.go +++ b/internal/beater/api/intake/handler.go @@ -24,7 +24,6 @@ import ( "net/http" "strings" - "github.com/elastic/elastic-agent-libs/monitoring" "go.opentelemetry.io/otel" "github.com/elastic/apm-data/input/elasticapm" @@ -41,10 +40,6 @@ const ( ) var ( - // MonitoringMap holds a mapping for request.IDs to monitoring counters - MonitoringMap = request.DefaultMonitoringMapForRegistry(registry) - registry = monitoring.Default.NewRegistry("apm-server.server") - errMethodNotAllowed = errors.New("only POST requests are supported") errServerShuttingDown = errors.New("server is shutting down") errInvalidContentType = errors.New("invalid content type") diff --git a/internal/beater/api/root/handler.go b/internal/beater/api/root/handler.go index ad12cf8ea43..2ef21a67bc4 100644 --- a/internal/beater/api/root/handler.go +++ b/internal/beater/api/root/handler.go @@ -22,18 +22,11 @@ import ( "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/apm-server/internal/beater/auth" "github.com/elastic/apm-server/internal/beater/request" ) -var ( - // MonitoringMap holds a mapping for request.IDs to monitoring counters - MonitoringMap = request.DefaultMonitoringMapForRegistry(registry) - registry = monitoring.Default.NewRegistry("apm-server.root") -) - // HandlerConfig holds configuration for Handler. type HandlerConfig struct { // PublishReady reports whether or not the server is ready for publishing events. diff --git a/internal/beater/interceptors/metrics.go b/internal/beater/interceptors/metrics.go index b33531c0631..5db02a9ce74 100644 --- a/internal/beater/interceptors/metrics.go +++ b/internal/beater/interceptors/metrics.go @@ -29,7 +29,6 @@ import ( "github.com/elastic/apm-server/internal/beater/request" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/monitoring" ) const ( @@ -45,13 +44,7 @@ var methodUnaryRequestMetrics = make(map[string]string) // This function must only be called from package init functions; it is not safe // for concurrent access. func RegisterMethodUnaryRequestMetrics(fullMethod, legacyMetricsPrefix string) { - methodUnaryRequestMetrics[fullMethod] = m -} - -// UnaryRequestMetrics is an interface that gRPC services may implement -// to provide a metrics registry for the Metrics interceptor. -type UnaryRequestMetrics interface { - RequestMetrics(fullMethod string) map[request.ResultID]*monitoring.Int + methodUnaryRequestMetrics[fullMethod] = legacyMetricsPrefix } type metricsInterceptor struct { @@ -69,24 +62,16 @@ func (m *metricsInterceptor) Interceptor() grpc.UnaryServerInterceptor { info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { - var ints map[request.ResultID]*monitoring.Int - if requestMetrics, ok := info.Server.(UnaryRequestMetrics); ok { - ints = requestMetrics.RequestMetrics(info.FullMethod) - } else { - ints = methodUnaryRequestMetrics[info.FullMethod] - } - if ints == nil { + legacyMetricsPrefix, ok := methodUnaryRequestMetrics[info.FullMethod] + if !ok { m.logger.With( "grpc.request.method", info.FullMethod, ).Warn("metrics registry missing") return handler(ctx, req) } - m.getCounter(string(request.IDRequestCount)).Add(ctx, 1) - defer m.getCounter(string(request.IDResponseCount)).Add(ctx, 1) - - ints[request.IDRequestCount].Inc() - defer ints[request.IDResponseCount].Inc() + m.inc(ctx, legacyMetricsPrefix, request.IDRequestCount) + defer m.inc(ctx, legacyMetricsPrefix, request.IDResponseCount) start := time.Now() resp, err := handler(ctx, req) @@ -99,31 +84,29 @@ func (m *metricsInterceptor) Interceptor() grpc.UnaryServerInterceptor { if s, ok := status.FromError(err); ok { switch s.Code() { case codes.Unauthenticated: - m.getCounter(string(request.IDResponseErrorsUnauthorized)).Add(ctx, 1) - ints[request.IDResponseErrorsUnauthorized].Inc() + m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsUnauthorized) case codes.DeadlineExceeded, codes.Canceled: - m.getCounter(string(request.IDResponseErrorsTimeout)).Add(ctx, 1) - ints[request.IDResponseErrorsTimeout].Inc() + m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsTimeout) case codes.ResourceExhausted: - m.getCounter(string(request.IDResponseErrorsRateLimit)).Add(ctx, 1) - ints[request.IDResponseErrorsRateLimit].Inc() + m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsRateLimit) } } } - - m.getCounter(string(responseID)).Add(ctx, 1) - ints[responseID].Inc() - + m.inc(ctx, legacyMetricsPrefix, responseID) return resp, err } } -func (m *metricsInterceptor) getCounter(n string) metric.Int64Counter { - name := "grpc.server." + n +func (m *metricsInterceptor) inc(ctx context.Context, legacyMetricsPrefix string, id request.ResultID) { + m.getCounter("grpc.server.", string(id)).Add(ctx, 1) + m.getCounter(legacyMetricsPrefix, string(id)).Add(ctx, 1) +} + +func (m *metricsInterceptor) getCounter(prefix, n string) metric.Int64Counter { + name := prefix + n if met, ok := m.counters[name]; ok { return met } - nm, _ := m.meter.Int64Counter(name) m.counters[name] = nm return nm diff --git a/internal/beater/middleware/monitoring_middleware.go b/internal/beater/middleware/monitoring_middleware.go index 2414e7443b6..797b2c4250a 100644 --- a/internal/beater/middleware/monitoring_middleware.go +++ b/internal/beater/middleware/monitoring_middleware.go @@ -99,7 +99,7 @@ func MonitoringMiddleware(legacyMetricsPrefix string, mp metric.MeterProvider) M } mid := &monitoringMiddleware{ - meter: mp.Meter("internal/beater/middleware"), + meter: mp.Meter("github.com/elastic/apm-server/internal/beater/middleware"), legacyMetricsPrefix: legacyMetricsPrefix, counters: sync.Map{}, histograms: sync.Map{}, diff --git a/internal/beater/otlp/http.go b/internal/beater/otlp/http.go index 897a1a6e76c..c9d83555a19 100644 --- a/internal/beater/otlp/http.go +++ b/internal/beater/otlp/http.go @@ -35,21 +35,12 @@ import ( "github.com/elastic/apm-data/input" "github.com/elastic/apm-data/input/otlp" "github.com/elastic/apm-data/model/modelpb" - "github.com/elastic/apm-server/internal/beater/request" - "github.com/elastic/elastic-agent-libs/monitoring" ) var ( httpMetricsConsumerUnsupportedDropped, _ = meter.Int64ObservableCounter( "apm-server.otlp.http.metrics.consumer.unsupported_dropped", ) - - httpMetricsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.http.metrics") - HTTPMetricsMonitoringMap = request.MonitoringMapForRegistry(httpMetricsRegistry, monitoringKeys) - httpTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.http.traces") - HTTPTracesMonitoringMap = request.MonitoringMapForRegistry(httpTracesRegistry, monitoringKeys) - httpLogsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.http.logs") - HTTPLogsMonitoringMap = request.MonitoringMapForRegistry(httpLogsRegistry, monitoringKeys) ) func NewHTTPHandlers(logger *zap.Logger, processor modelpb.BatchProcessor, semaphore input.Semaphore) HTTPHandlers { diff --git a/internal/beater/request/result.go b/internal/beater/request/result.go index 62ed8541d8a..9aaf741af4f 100644 --- a/internal/beater/request/result.go +++ b/internal/beater/request/result.go @@ -20,7 +20,6 @@ package request import ( "net/http" - "github.com/elastic/elastic-agent-libs/monitoring" "go.opentelemetry.io/otel" "github.com/pkg/errors" @@ -128,27 +127,6 @@ type Result struct { Stacktrace string } -// DefaultMonitoringMapForRegistry returns map matching resultIDs to monitoring counters for given registry. -func DefaultMonitoringMapForRegistry(r *monitoring.Registry) map[ResultID]*monitoring.Int { - ids := append(DefaultResultIDs, IDUnset) - for id := range MapResultIDToStatus { - ids = append(ids, id) - } - return MonitoringMapForRegistry(r, ids) -} - -// MonitoringMapForRegistry returns map matching resultIDs to monitoring counters for given registry and keys -func MonitoringMapForRegistry(r *monitoring.Registry, ids []ResultID) map[ResultID]*monitoring.Int { - m := map[ResultID]*monitoring.Int{} - counter := func(s ResultID) *monitoring.Int { - return monitoring.NewInt(r, string(s)) - } - for _, id := range ids { - m[id] = counter(id) - } - return m -} - // Reset sets result to it's empty values func (r *Result) Reset() { r.ID = IDUnset