Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Dec 5, 2024
1 parent 629f0eb commit 0b60cb6
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 111 deletions.
116 changes: 88 additions & 28 deletions internal/beatcmd/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,26 +479,67 @@ func (b *Beat) registerStateMetrics() {
}

func (b *Beat) registerStatsMetrics() {
// "otel-adapter" is an arbitrary name that is not registered elsewhere.
// It does not show up in the metric names as long as we don't call
// v.OnRegistryStart.
monitoring.NewFunc(monitoring.Default, "otel-adapter", func(_ monitoring.Mode, v monitoring.Visitor) {
// TODO: we should ensure all metrics are produced in the expected JSON
// hierarchy for _source compatibility.
libbeatRegistry := monitoring.Default.GetRegistry("libbeat")
monitoring.NewFunc(libbeatRegistry, "output", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
return
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
for _, sm := range rm.ScopeMetrics {
switch {
case sm.Scope.Name == "github.com/elastic/go-docappender":
adaptDocappenderMetrics(context.Background(), v, sm)
case strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server"):
// All simple scalar metrics that begin with the name "apm-server."
// in github.com/elastic/apm-server/... scopes are mapped directly.
for _, m := range sm.Metrics {
if strings.HasPrefix(m.Name, "apm-server.") {
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, m.Name, value)
}
addDocappenderLibbeatOutputMetrics(context.Background(), v, sm)
}
}
})
monitoring.NewFunc(libbeatRegistry, "pipeline", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
return
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
for _, sm := range rm.ScopeMetrics {
if sm.Scope.Name == "github.com/elastic/go-docappender" {
addDocappenderLibbeatPipelineMetrics(context.Background(), v, sm)
}
}
})
monitoring.NewFunc(monitoring.Default, "output.elasticsearch", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
return
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
for _, sm := range rm.ScopeMetrics {
switch {
case sm.Scope.Name == "github.com/elastic/go-docappender":
addDocappenderOutputElasticsearchMetrics(context.Background(), v, sm)
}
}
})
monitoring.NewFunc(monitoring.Default, "apm-server", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
if err := b.metricReader.Collect(context.Background(), &rm); err != nil {
return
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
for _, sm := range rm.ScopeMetrics {
if !strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server") {
continue
}
// All simple scalar metrics that begin with the name "apm-server."
// in github.com/elastic/apm-server/... scopes are mapped directly.
for _, m := range sm.Metrics {
if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok {
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, suffix, value)
}
}
}
Expand Down Expand Up @@ -527,9 +568,7 @@ func getScalarInt64(data metricdata.Aggregation) (int64, bool) {

// Adapt go-docappender's OTel metrics to beats stack monitoring metrics,
// with a mixture of libbeat-specific and apm-server specific metric names.
//
// Non-libbeat Elasticsearch output metrics go under "output.elasticsearch".
func adaptDocappenderMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
for _, m := range sm.Metrics {
switch m.Name {
case "elasticsearch.events.processed":
Expand All @@ -550,38 +589,59 @@ func adaptDocappenderMetrics(ctx context.Context, v monitoring.Visitor, sm metri
failed++
}
}
monitoring.ReportInt(v, "libbeat.output.events.acked", acked)
monitoring.ReportInt(v, "libbeat.output.events.failed", failed)
monitoring.ReportInt(v, "libbeat.output.events.toomany", toomany)
monitoring.ReportInt(v, "events.acked", acked)
monitoring.ReportInt(v, "events.failed", failed)
monitoring.ReportInt(v, "events.toomany", toomany)
case "elasticsearch.events.count":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "libbeat.output.events.total", value)
monitoring.ReportInt(v, "libbeat.pipeline.events.total", value)
monitoring.ReportInt(v, "events.total", value)
}
case "elasticsearch.events.queued":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "libbeat.output.events.active", value)
monitoring.ReportInt(v, "events.active", value)
}
case "elasticsearch.flushed.bytes":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "libbeat.output.write.bytes", value)
monitoring.ReportInt(v, "write.bytes", value)
}
case "elasticsearch.bulk_requests.count":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "events.batches", value)
}
}
}
}

func addDocappenderLibbeatPipelineMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
for _, m := range sm.Metrics {
switch m.Name {
case "elasticsearch.events.count":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "events.total", value)
}
}
}
}

// Add non-libbeat Elasticsearch output metrics under "output.elasticsearch".
func addDocappenderOutputElasticsearchMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
for _, m := range sm.Metrics {
switch m.Name {
case "elasticsearch.bulk_requests.count":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "libbeat.output.events.batches", value)
monitoring.ReportInt(v, "output.elasticsearch.bulk_requests.completed", value)
monitoring.ReportInt(v, "bulk_requests.completed", value)
}
case "elasticsearch.bulk_requests.available":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "output.elasticsearch.bulk_requests.available", value)
monitoring.ReportInt(v, "bulk_requests.available", value)
}
case "elasticsearch.indexer.created":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "output.elasticsearch.indexer.created", value)
monitoring.ReportInt(v, "indexer.created", value)
}
case "elasticsearch.indexer.destroyed":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "output.elasticsearch.indexer.destroyed", value)
monitoring.ReportInt(v, "indexer.destroyed", value)
}
// TODO output.elasticsearch.indexers.active (created - destroyed?)
}
Expand Down
6 changes: 0 additions & 6 deletions internal/beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (

"github.com/pkg/errors"

"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-server/internal/agentcfg"
"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/headers"
Expand All @@ -43,10 +41,6 @@ const (
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.acm")

errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds())
)

Expand Down
5 changes: 0 additions & 5 deletions internal/beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"net/http"
"strings"

"github.com/elastic/elastic-agent-libs/monitoring"
"go.opentelemetry.io/otel"

"github.com/elastic/apm-data/input/elasticapm"
Expand All @@ -41,10 +40,6 @@ const (
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.server")

errMethodNotAllowed = errors.New("only POST requests are supported")
errServerShuttingDown = errors.New("server is shutting down")
errInvalidContentType = errors.New("invalid content type")
Expand Down
7 changes: 0 additions & 7 deletions internal/beater/api/root/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,11 @@ import (

"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-server/internal/beater/auth"
"github.com/elastic/apm-server/internal/beater/request"
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.root")
)

// HandlerConfig holds configuration for Handler.
type HandlerConfig struct {
// PublishReady reports whether or not the server is ready for publishing events.
Expand Down
49 changes: 16 additions & 33 deletions internal/beater/interceptors/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/elastic/apm-server/internal/beater/request"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

const (
Expand All @@ -45,13 +44,7 @@ var methodUnaryRequestMetrics = make(map[string]string)
// This function must only be called from package init functions; it is not safe
// for concurrent access.
func RegisterMethodUnaryRequestMetrics(fullMethod, legacyMetricsPrefix string) {
methodUnaryRequestMetrics[fullMethod] = m
}

// UnaryRequestMetrics is an interface that gRPC services may implement
// to provide a metrics registry for the Metrics interceptor.
type UnaryRequestMetrics interface {
RequestMetrics(fullMethod string) map[request.ResultID]*monitoring.Int
methodUnaryRequestMetrics[fullMethod] = legacyMetricsPrefix
}

type metricsInterceptor struct {
Expand All @@ -69,24 +62,16 @@ func (m *metricsInterceptor) Interceptor() grpc.UnaryServerInterceptor {
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
var ints map[request.ResultID]*monitoring.Int
if requestMetrics, ok := info.Server.(UnaryRequestMetrics); ok {
ints = requestMetrics.RequestMetrics(info.FullMethod)
} else {
ints = methodUnaryRequestMetrics[info.FullMethod]
}
if ints == nil {
legacyMetricsPrefix, ok := methodUnaryRequestMetrics[info.FullMethod]
if !ok {
m.logger.With(
"grpc.request.method", info.FullMethod,
).Warn("metrics registry missing")
return handler(ctx, req)
}

m.getCounter(string(request.IDRequestCount)).Add(ctx, 1)
defer m.getCounter(string(request.IDResponseCount)).Add(ctx, 1)

ints[request.IDRequestCount].Inc()
defer ints[request.IDResponseCount].Inc()
m.inc(ctx, legacyMetricsPrefix, request.IDRequestCount)
defer m.inc(ctx, legacyMetricsPrefix, request.IDResponseCount)

start := time.Now()
resp, err := handler(ctx, req)
Expand All @@ -99,31 +84,29 @@ func (m *metricsInterceptor) Interceptor() grpc.UnaryServerInterceptor {
if s, ok := status.FromError(err); ok {
switch s.Code() {
case codes.Unauthenticated:
m.getCounter(string(request.IDResponseErrorsUnauthorized)).Add(ctx, 1)
ints[request.IDResponseErrorsUnauthorized].Inc()
m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsUnauthorized)
case codes.DeadlineExceeded, codes.Canceled:
m.getCounter(string(request.IDResponseErrorsTimeout)).Add(ctx, 1)
ints[request.IDResponseErrorsTimeout].Inc()
m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsTimeout)
case codes.ResourceExhausted:
m.getCounter(string(request.IDResponseErrorsRateLimit)).Add(ctx, 1)
ints[request.IDResponseErrorsRateLimit].Inc()
m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsRateLimit)
}
}
}

m.getCounter(string(responseID)).Add(ctx, 1)
ints[responseID].Inc()

m.inc(ctx, legacyMetricsPrefix, responseID)
return resp, err
}
}

func (m *metricsInterceptor) getCounter(n string) metric.Int64Counter {
name := "grpc.server." + n
func (m *metricsInterceptor) inc(ctx context.Context, legacyMetricsPrefix string, id request.ResultID) {
m.getCounter("grpc.server.", string(id)).Add(ctx, 1)
m.getCounter(legacyMetricsPrefix, string(id)).Add(ctx, 1)
}

func (m *metricsInterceptor) getCounter(prefix, n string) metric.Int64Counter {
name := prefix + n
if met, ok := m.counters[name]; ok {
return met
}

nm, _ := m.meter.Int64Counter(name)
m.counters[name] = nm
return nm
Expand Down
2 changes: 1 addition & 1 deletion internal/beater/middleware/monitoring_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func MonitoringMiddleware(legacyMetricsPrefix string, mp metric.MeterProvider) M
}

mid := &monitoringMiddleware{
meter: mp.Meter("internal/beater/middleware"),
meter: mp.Meter("github.com/elastic/apm-server/internal/beater/middleware"),
legacyMetricsPrefix: legacyMetricsPrefix,
counters: sync.Map{},
histograms: sync.Map{},
Expand Down
9 changes: 0 additions & 9 deletions internal/beater/otlp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,12 @@ import (
"github.com/elastic/apm-data/input"
"github.com/elastic/apm-data/input/otlp"
"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/beater/request"
"github.com/elastic/elastic-agent-libs/monitoring"
)

var (
httpMetricsConsumerUnsupportedDropped, _ = meter.Int64ObservableCounter(
"apm-server.otlp.http.metrics.consumer.unsupported_dropped",
)

httpMetricsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.http.metrics")
HTTPMetricsMonitoringMap = request.MonitoringMapForRegistry(httpMetricsRegistry, monitoringKeys)
httpTracesRegistry = monitoring.Default.NewRegistry("apm-server.otlp.http.traces")
HTTPTracesMonitoringMap = request.MonitoringMapForRegistry(httpTracesRegistry, monitoringKeys)
httpLogsRegistry = monitoring.Default.NewRegistry("apm-server.otlp.http.logs")
HTTPLogsMonitoringMap = request.MonitoringMapForRegistry(httpLogsRegistry, monitoringKeys)
)

func NewHTTPHandlers(logger *zap.Logger, processor modelpb.BatchProcessor, semaphore input.Semaphore) HTTPHandlers {
Expand Down
22 changes: 0 additions & 22 deletions internal/beater/request/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package request
import (
"net/http"

"github.com/elastic/elastic-agent-libs/monitoring"
"go.opentelemetry.io/otel"

"github.com/pkg/errors"
Expand Down Expand Up @@ -128,27 +127,6 @@ type Result struct {
Stacktrace string
}

// DefaultMonitoringMapForRegistry returns map matching resultIDs to monitoring counters for given registry.
func DefaultMonitoringMapForRegistry(r *monitoring.Registry) map[ResultID]*monitoring.Int {
ids := append(DefaultResultIDs, IDUnset)
for id := range MapResultIDToStatus {
ids = append(ids, id)
}
return MonitoringMapForRegistry(r, ids)
}

// MonitoringMapForRegistry returns map matching resultIDs to monitoring counters for given registry and keys
func MonitoringMapForRegistry(r *monitoring.Registry, ids []ResultID) map[ResultID]*monitoring.Int {
m := map[ResultID]*monitoring.Int{}
counter := func(s ResultID) *monitoring.Int {
return monitoring.NewInt(r, string(s))
}
for _, id := range ids {
m[id] = counter(id)
}
return m
}

// Reset sets result to it's empty values
func (r *Result) Reset() {
r.ID = IDUnset
Expand Down

0 comments on commit 0b60cb6

Please sign in to comment.