From d1b61608cce152987cb63f54acb29da69284883a Mon Sep 17 00:00:00 2001 From: Hugo Shaka Date: Fri, 10 Jan 2025 10:43:54 -0500 Subject: [PATCH 1/4] Use a non-global metrics registry in Teleport (#50913) * Support a non-global registry in Teleport * lint * Update lib/service/service.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --------- Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/service/service.go | 48 +++++++++++++++++- lib/service/service_test.go | 84 +++++++++++++++++++++++++++++++- lib/service/servicecfg/config.go | 11 +++++ 3 files changed, 141 insertions(+), 2 deletions(-) diff --git a/lib/service/service.go b/lib/service/service.go index 9acad10449fad..e01736307c67e 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -53,6 +53,7 @@ import ( "github.com/gravitational/roundtrip" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -457,6 +458,15 @@ type TeleportProcess struct { // resolver is used to identify the reverse tunnel address when connecting via // the proxy. resolver reversetunnelclient.Resolver + + // metricRegistry is the prometheus metric registry for the process. + // Every teleport service that wants to register metrics should use this + // instead of the global prometheus.DefaultRegisterer to avoid registration + // conflicts. + // + // Both the metricsRegistry and the default global registry are gathered by + // Telepeort's metric service. + metricsRegistry *prometheus.Registry } type keyPairKey struct { @@ -1073,6 +1083,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { keyPairs: make(map[keyPairKey]KeyPair), cloudLabels: cloudLabels, TracingProvider: tracing.NoopProvider(), + metricsRegistry: cfg.MetricsRegistry, } process.registerExpectedServices(cfg) @@ -3260,11 +3271,46 @@ func (process *TeleportProcess) initUploaderService() error { return nil } +// promHTTPLogAdapter adapts a slog.Logger into a promhttp.Logger. +type promHTTPLogAdapter struct { + ctx context.Context + *slog.Logger +} + +// Println implements the promhttp.Logger interface. +func (l promHTTPLogAdapter) Println(v ...interface{}) { + //nolint:sloglint // msg cannot be constant + l.ErrorContext(l.ctx, fmt.Sprint(v...)) +} + // initMetricsService starts the metrics service currently serving metrics for // prometheus consumption func (process *TeleportProcess) initMetricsService() error { mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) + + // We gather metrics both from the in-process registry (preferred metrics registration method) + // and the global registry (used by some Teleport services and many dependencies). + gatherers := prometheus.Gatherers{ + process.metricsRegistry, + prometheus.DefaultGatherer, + } + + metricsHandler := promhttp.InstrumentMetricHandler( + process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{ + // Errors can happen if metrics are registered with identical names in both the local and the global registry. + // In this case, we log the error but continue collecting metrics. The first collected metric will win + // (the one from the local metrics registry takes precedence). + // As we move more things to the local registry, especially in other tools like tbot, we will have less + // conflicts in tests. + ErrorHandling: promhttp.ContinueOnError, + ErrorLog: promHTTPLogAdapter{ + ctx: process.ExitContext(), + Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics), + }, + }), + ) + + mux.Handle("/metrics", metricsHandler) logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentMetrics, process.id)) diff --git a/lib/service/service_test.go b/lib/service/service_test.go index b6adb661fb36e..fc16e612970a4 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -23,9 +23,12 @@ import ( "crypto/tls" "errors" "fmt" + "github.com/sirupsen/logrus" + "io" "log/slog" "net" "net/http" + "net/url" "os" "path/filepath" "strings" @@ -39,7 +42,8 @@ import ( "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/crypto/ssh" "golang.org/x/sync/errgroup" @@ -1851,6 +1855,84 @@ func TestInitDatabaseService(t *testing.T) { } } +func TestMetricsService(t *testing.T) { + t.Parallel() + // Test setup: create a listener for the metrics server, get its file descriptor. + + // Note: this code is copied from integrations/helpers/NewListenerOn() to avoid including helpers in a production + // build and avoid a cyclic dependency. + metricsListener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, metricsListener.Close()) + }) + require.IsType(t, &net.TCPListener{}, metricsListener) + metricsListenerFile, err := metricsListener.(*net.TCPListener).File() + require.NoError(t, err) + + // Test setup: create a new teleport process + dataDir := makeTempDir(t) + cfg := servicecfg.MakeDefaultConfig() + cfg.DataDir = dataDir + cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}) + cfg.Auth.Enabled = true + cfg.Proxy.Enabled = false + cfg.SSH.Enabled = false + cfg.DebugService.Enabled = false + cfg.Auth.StorageConfig.Params["path"] = dataDir + cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} + cfg.Metrics.Enabled = true + + // Configure the metrics server to use the listener we previously created. + cfg.Metrics.ListenAddr = &utils.NetAddr{AddrNetwork: "tcp", Addr: metricsListener.Addr().String()} + cfg.FileDescriptors = []*servicecfg.FileDescriptor{ + {Type: string(ListenerMetrics), Address: metricsListener.Addr().String(), File: metricsListenerFile}, + } + + // Create and start the Teleport service. + process, err := NewTeleport(cfg) + require.NoError(t, err) + require.NoError(t, process.Start()) + t.Cleanup(func() { + assert.NoError(t, process.Close()) + assert.NoError(t, process.Wait()) + }) + + // Test setup: create our test metrics. + nonce := strings.ReplaceAll(uuid.NewString(), "-", "") + localMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "local_metric_" + nonce, + }) + globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "global_metric_" + nonce, + }) + require.NoError(t, process.metricsRegistry.Register(localMetric)) + require.NoError(t, prometheus.Register(globalMetric)) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + t.Cleanup(cancel) + _, err = process.WaitForEvent(ctx, MetricsReady) + require.NoError(t, err) + + // Test execution: get metrics and check the tests metrics are here. + metricsURL, err := url.Parse("http://" + metricsListener.Addr().String()) + require.NoError(t, err) + metricsURL.Path = "/metrics" + resp, err := http.Get(metricsURL.String()) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + // Test validation: check that the metrics server served both the local and global registry. + require.Contains(t, string(body), "local_metric_"+nonce) + require.Contains(t, string(body), "global_metric_"+nonce) +} + // makeTempDir makes a temp dir with a shorter name than t.TempDir() in order to // avoid https://github.com/golang/go/issues/62614. func makeTempDir(t *testing.T) string { diff --git a/lib/service/servicecfg/config.go b/lib/service/servicecfg/config.go index aceda2be99be5..e97dce89475f6 100644 --- a/lib/service/servicecfg/config.go +++ b/lib/service/servicecfg/config.go @@ -33,6 +33,7 @@ import ( "github.com/ghodss/yaml" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" @@ -264,6 +265,12 @@ type Config struct { // AccessGraph represents AccessGraph server config AccessGraph AccessGraphConfig + // MetricsRegistry is the prometheus metrics registry used by the Teleport process to register its metrics. + // As of today, not every Teleport metric is registered against this registry. Some Teleport services + // and Teleport dependencies are using the global registry. + // Both the MetricsRegistry and the default global registry are gathered by Teleport's metric service. + MetricsRegistry *prometheus.Registry + // token is either the token needed to join the auth server, or a path pointing to a file // that contains the token // @@ -520,6 +527,10 @@ func ApplyDefaults(cfg *Config) { cfg.LoggerLevel = new(slog.LevelVar) } + if cfg.MetricsRegistry == nil { + cfg.MetricsRegistry = prometheus.NewRegistry() + } + // Remove insecure and (borderline insecure) cryptographic primitives from // default configuration. These can still be added back in file configuration by // users, but not supported by default by Teleport. See #1856 for more From 5d6a2156f8cc164ef6f635e789ebf6bf37effa23 Mon Sep 17 00:00:00 2001 From: Hugo Shaka Date: Wed, 15 Jan 2025 11:35:21 -0500 Subject: [PATCH 2/4] Serve metrics from the local registry in the diagnostic service (#51031) * Use local metrics registry in the diagnostic service * Test metrics are served by the diag service --- lib/service/service.go | 54 +++++++++++++++-------------- lib/service/service_test.go | 68 +++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 25 deletions(-) diff --git a/lib/service/service.go b/lib/service/service.go index e01736307c67e..e288b96ffa792 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -3287,30 +3287,7 @@ func (l promHTTPLogAdapter) Println(v ...interface{}) { // prometheus consumption func (process *TeleportProcess) initMetricsService() error { mux := http.NewServeMux() - - // We gather metrics both from the in-process registry (preferred metrics registration method) - // and the global registry (used by some Teleport services and many dependencies). - gatherers := prometheus.Gatherers{ - process.metricsRegistry, - prometheus.DefaultGatherer, - } - - metricsHandler := promhttp.InstrumentMetricHandler( - process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{ - // Errors can happen if metrics are registered with identical names in both the local and the global registry. - // In this case, we log the error but continue collecting metrics. The first collected metric will win - // (the one from the local metrics registry takes precedence). - // As we move more things to the local registry, especially in other tools like tbot, we will have less - // conflicts in tests. - ErrorHandling: promhttp.ContinueOnError, - ErrorLog: promHTTPLogAdapter{ - ctx: process.ExitContext(), - Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics), - }, - }), - ) - - mux.Handle("/metrics", metricsHandler) + mux.Handle("/metrics", process.newMetricsHandler()) logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentMetrics, process.id)) @@ -3395,6 +3372,33 @@ func (process *TeleportProcess) initMetricsService() error { return nil } +// newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the +// in-process one. +func (process *TeleportProcess) newMetricsHandler() http.Handler { + // We gather metrics both from the in-process registry (preferred metrics registration method) + // and the global registry (used by some Teleport services and many dependencies). + gatherers := prometheus.Gatherers{ + process.metricsRegistry, + prometheus.DefaultGatherer, + } + + metricsHandler := promhttp.InstrumentMetricHandler( + process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{ + // Errors can happen if metrics are registered with identical names in both the local and the global registry. + // In this case, we log the error but continue collecting metrics. The first collected metric will win + // (the one from the local metrics registry takes precedence). + // As we move more things to the local registry, especially in other tools like tbot, we will have less + // conflicts in tests. + ErrorHandling: promhttp.ContinueOnError, + ErrorLog: promHTTPLogAdapter{ + ctx: process.ExitContext(), + Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics), + }, + }), + ) + return metricsHandler +} + // initDiagnosticService starts diagnostic service currently serving healthz // and prometheus endpoints func (process *TeleportProcess) initDiagnosticService() error { @@ -3404,7 +3408,7 @@ func (process *TeleportProcess) initDiagnosticService() error { // metrics will otherwise be served by the metrics service if it's enabled // in the config. if !process.Config.Metrics.Enabled { - mux.Handle("/metrics", promhttp.Handler()) + mux.Handle("/metrics", process.newMetricsHandler()) } if process.Config.Debug { diff --git a/lib/service/service_test.go b/lib/service/service_test.go index fc16e612970a4..c0a27d8569e68 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -1855,6 +1855,10 @@ func TestInitDatabaseService(t *testing.T) { } } +// TestMetricsService tests that the optional metrics service exposes +// metrics from both the in-process and global metrics registry. When the +// service is disabled, metrics are served by the diagnostics service +// (tested in TestMetricsInDiagnosticsService). func TestMetricsService(t *testing.T) { t.Parallel() // Test setup: create a listener for the metrics server, get its file descriptor. @@ -1933,6 +1937,70 @@ func TestMetricsService(t *testing.T) { require.Contains(t, string(body), "global_metric_"+nonce) } +// TestMetricsInDiagnosticsService tests that the diagnostics service exposes +// metrics from both the in-process and global metrics registry when the metrics +// service is disabled. +func TestMetricsInDiagnosticsService(t *testing.T) { + t.Parallel() + // Test setup: create a new teleport process + dataDir := makeTempDir(t) + cfg := servicecfg.MakeDefaultConfig() + cfg.DataDir = dataDir + cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}) + cfg.Auth.Enabled = true + cfg.Proxy.Enabled = false + cfg.SSH.Enabled = false + cfg.DebugService.Enabled = false + cfg.Auth.StorageConfig.Params["path"] = dataDir + cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} + cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} + + // Test setup: Create and start the Teleport service. + process, err := NewTeleport(cfg) + require.NoError(t, err) + require.NoError(t, process.Start()) + t.Cleanup(func() { + assert.NoError(t, process.Close()) + assert.NoError(t, process.Wait()) + }) + + // Test setup: create our test metrics. + nonce := strings.ReplaceAll(uuid.NewString(), "-", "") + localMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "local_metric_" + nonce, + }) + globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "global_metric_" + nonce, + }) + require.NoError(t, process.metricsRegistry.Register(localMetric)) + require.NoError(t, prometheus.Register(globalMetric)) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + t.Cleanup(cancel) + _, err = process.WaitForEvent(ctx, TeleportReadyEvent) + require.NoError(t, err) + + // Test execution: query the metrics endpoint and check the tests metrics are here. + diagAddr, err := process.DiagnosticAddr() + require.NoError(t, err) + metricsURL, err := url.Parse("http://" + diagAddr.String()) + require.NoError(t, err) + metricsURL.Path = "/metrics" + resp, err := http.Get(metricsURL.String()) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + // Test validation: check that the metrics server served both the local and global registry. + require.Contains(t, string(body), "local_metric_"+nonce) + require.Contains(t, string(body), "global_metric_"+nonce) +} + // makeTempDir makes a temp dir with a shorter name than t.TempDir() in order to // avoid https://github.com/golang/go/issues/62614. func makeTempDir(t *testing.T) string { From 43a065a1f9c26c24fbaa9f1137d02df18a2b8937 Mon Sep 17 00:00:00 2001 From: Hugo Shaka Date: Wed, 15 Jan 2025 12:45:15 -0500 Subject: [PATCH 3/4] Init local registry at runtime instead of config (#51074) --- lib/service/service.go | 11 ++++++++++- lib/service/servicecfg/config.go | 4 ---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/service/service.go b/lib/service/service.go index e288b96ffa792..008ca3175d769 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -903,6 +903,15 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { "pid", fmt.Sprintf("%v.%v", os.Getpid(), processID), ) + // Use the custom metrics registry if specified, else create a new one. + // We must create the registry in NewTeleport, as opposed to ApplyConfig(), + // because some tests are running multiple Teleport instances from the same + // config. + metricsRegistry := cfg.MetricsRegistry + if metricsRegistry == nil { + metricsRegistry = prometheus.NewRegistry() + } + // If FIPS mode was requested make sure binary is build against BoringCrypto. if cfg.FIPS { if !modules.GetModules().IsBoringBinary() { @@ -1083,7 +1092,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { keyPairs: make(map[keyPairKey]KeyPair), cloudLabels: cloudLabels, TracingProvider: tracing.NoopProvider(), - metricsRegistry: cfg.MetricsRegistry, + metricsRegistry: metricsRegistry, } process.registerExpectedServices(cfg) diff --git a/lib/service/servicecfg/config.go b/lib/service/servicecfg/config.go index e97dce89475f6..c1d2a1edd117d 100644 --- a/lib/service/servicecfg/config.go +++ b/lib/service/servicecfg/config.go @@ -527,10 +527,6 @@ func ApplyDefaults(cfg *Config) { cfg.LoggerLevel = new(slog.LevelVar) } - if cfg.MetricsRegistry == nil { - cfg.MetricsRegistry = prometheus.NewRegistry() - } - // Remove insecure and (borderline insecure) cryptographic primitives from // default configuration. These can still be added back in file configuration by // users, but not supported by default by Teleport. See #1856 for more From 7ebd57b97addded00f232b34ad8eb8c313dc364e Mon Sep 17 00:00:00 2001 From: hugoShaka Date: Tue, 21 Jan 2025 15:05:52 -0500 Subject: [PATCH 4/4] lint --- lib/service/service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/service/service_test.go b/lib/service/service_test.go index c0a27d8569e68..7295f3ade8229 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -23,7 +23,6 @@ import ( "crypto/tls" "errors" "fmt" - "github.com/sirupsen/logrus" "io" "log/slog" "net" @@ -43,6 +42,7 @@ import ( "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/crypto/ssh"