diff --git a/src/client/api/metrics.c b/src/client/api/metrics.c index a4a78a6e25c..5cee0326280 100644 --- a/src/client/api/metrics.c +++ b/src/client/api/metrics.c @@ -27,9 +27,10 @@ bool daos_client_metric_retain; int dc_tm_init(void) { - int metrics_tag; - pid_t pid; - int rc; + struct d_tm_node_t *started_at; + int metrics_tag; + pid_t pid; + int rc; d_getenv_bool(DAOS_CLIENT_METRICS_ENABLE, &daos_client_metric); if (!daos_client_metric) @@ -64,10 +65,18 @@ dc_tm_init(void) rc = d_tm_add_ephemeral_dir(NULL, MAX_IDS_SIZE(INIT_JOB_NUM), "%s/%u", dc_jobid, pid); if (rc != 0) { - DL_ERROR(rc, "add metric %s/%u failed.\n", dc_jobid, pid); + DL_ERROR(rc, "add metric %s/%u failed.", dc_jobid, pid); D_GOTO(out, rc); } + rc = d_tm_add_metric(&started_at, D_TM_TIMESTAMP, "Timestamp of client startup", NULL, + "%s/%u/%s", dc_jobid, pid, "started_at"); + if (rc != 0) { + DL_ERROR(rc, "add metric %s/%u/started_at failed.", dc_jobid, pid); + D_GOTO(out, rc); + } + + d_tm_record_timestamp(started_at); out: if (rc) d_tm_fini(); diff --git a/src/common/SConscript b/src/common/SConscript index 05004e51509..0eec057198d 100644 --- a/src/common/SConscript +++ b/src/common/SConscript @@ -11,6 +11,7 @@ COMMON_FILES = ['debug.c', 'mem.c', 'fail_loc.c', 'lru.c', 'cipher.c', 'cipher_isal.c', 'qat.c', 'fault_domain.c', 'tls.c', 'metrics.c'] + def build_daos_common(denv, client): """ Building non-pmem version for client's common lib""" benv = denv.Clone() diff --git a/src/control/cmd/daos_agent/config.go b/src/control/cmd/daos_agent/config.go index ad8fc40924d..6e62a725f1a 100644 --- a/src/control/cmd/daos_agent/config.go +++ b/src/control/cmd/daos_agent/config.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2023 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -54,6 +54,13 @@ type Config struct { DisableAutoEvict bool `yaml:"disable_auto_evict,omitempty"` ExcludeFabricIfaces common.StringSet `yaml:"exclude_fabric_ifaces,omitempty"` FabricInterfaces []*NUMAFabricConfig `yaml:"fabric_ifaces,omitempty"` + TelemetryPort int `yaml:"telemetry_port,omitempty"` + TelemetryRetain time.Duration `yaml:"telemetry_retain,omitempty"` +} + +// TelemetryEnabled returns true if client telemetry collection/export is enabled. +func (c *Config) TelemetryEnabled() bool { + return c.TelemetryPort > 0 } // NUMAFabricConfig defines a list of fabric interfaces that belong to a NUMA @@ -88,6 +95,10 @@ func LoadConfig(cfgPath string) (*Config, error) { return nil, fmt.Errorf("invalid system name: %s", cfg.SystemName) } + if cfg.TelemetryRetain > 0 && cfg.TelemetryPort == 0 { + return nil, errors.New("telemetry_retain requires telemetry_port") + } + return cfg, nil } diff --git a/src/control/cmd/daos_agent/infocache.go b/src/control/cmd/daos_agent/infocache.go index 0dbdf4fc645..7193cf6f151 100644 --- a/src/control/cmd/daos_agent/infocache.go +++ b/src/control/cmd/daos_agent/infocache.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2023 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -8,6 +8,7 @@ package main import ( "context" + "fmt" "net" "strings" "sync" @@ -22,6 +23,7 @@ import ( "github.com/daos-stack/daos/src/control/lib/control" "github.com/daos-stack/daos/src/control/lib/hardware" "github.com/daos-stack/daos/src/control/lib/hardware/hwprov" + "github.com/daos-stack/daos/src/control/lib/telemetry" "github.com/daos-stack/daos/src/control/logging" ) @@ -36,17 +38,20 @@ type fabricScanFn func(ctx context.Context, providers ...string) (*NUMAFabric, e // NewInfoCache creates a new InfoCache with appropriate parameters set. func NewInfoCache(ctx context.Context, log logging.Logger, client control.UnaryInvoker, cfg *Config) *InfoCache { ic := &InfoCache{ - log: log, - ignoreIfaces: cfg.ExcludeFabricIfaces, - client: client, - cache: cache.NewItemCache(log), - getAttachInfo: control.GetAttachInfo, - fabricScan: getFabricScanFn(log, cfg, hwprov.DefaultFabricScanner(log)), - netIfaces: net.Interfaces, - devClassGetter: hwprov.DefaultNetDevClassProvider(log), - devStateGetter: hwprov.DefaultNetDevStateProvider(log), + log: log, + ignoreIfaces: cfg.ExcludeFabricIfaces, + client: client, + cache: cache.NewItemCache(log), + getAttachInfoCb: control.GetAttachInfo, + fabricScan: getFabricScanFn(log, cfg, hwprov.DefaultFabricScanner(log)), + netIfaces: net.Interfaces, + devClassGetter: hwprov.DefaultNetDevClassProvider(log), + devStateGetter: hwprov.DefaultNetDevStateProvider(log), } + ic.clientTelemetryEnabled.Store(cfg.TelemetryEnabled()) + ic.clientTelemetryRetain.Store(cfg.TelemetryRetain > 0) + if cfg.DisableCache { ic.DisableAttachInfoCache() ic.DisableFabricCache() @@ -198,12 +203,14 @@ type InfoCache struct { cache *cache.ItemCache fabricCacheDisabled atm.Bool attachInfoCacheDisabled atm.Bool + clientTelemetryEnabled atm.Bool + clientTelemetryRetain atm.Bool - getAttachInfo getAttachInfoFn - fabricScan fabricScanFn - netIfaces func() ([]net.Interface, error) - devClassGetter hardware.NetDevClassProvider - devStateGetter hardware.NetDevStateProvider + getAttachInfoCb getAttachInfoFn + fabricScan fabricScanFn + netIfaces func() ([]net.Interface, error) + devClassGetter hardware.NetDevClassProvider + devStateGetter hardware.NetDevStateProvider client control.UnaryInvoker attachInfoRefresh time.Duration @@ -292,6 +299,41 @@ func (c *InfoCache) EnableStaticFabricCache(ctx context.Context, nf *NUMAFabric) c.EnableFabricCache() } +func (c *InfoCache) getAttachInfo(ctx context.Context, rpcClient control.UnaryInvoker, req *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) { + if c == nil { + return nil, errors.New("InfoCache is nil") + } + if c.getAttachInfoCb == nil { + return nil, errors.New("getAttachInfoFn is nil") + } + + resp, err := c.getAttachInfoCb(ctx, rpcClient, req) + if err != nil { + return nil, err + } + c.adjustAttachInfo(resp) + return resp, nil +} + +// adjustAttachInfo performs any necessary adjustments to the attach info +// before returning it. +func (c *InfoCache) adjustAttachInfo(resp *control.GetAttachInfoResp) { + if c == nil || resp == nil { + return + } + + if c.clientTelemetryEnabled.IsTrue() { + resp.ClientNetHint.EnvVars = append(resp.ClientNetHint.EnvVars, + fmt.Sprintf("%s=1", telemetry.ClientMetricsEnabledEnv), + ) + if c.clientTelemetryRetain.IsTrue() { + resp.ClientNetHint.EnvVars = append(resp.ClientNetHint.EnvVars, + fmt.Sprintf("%s=1", telemetry.ClientMetricsRetainEnv), + ) + } + } +} + // GetAttachInfo fetches the attach info from the cache, and refreshes if necessary. func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.GetAttachInfoResp, error) { if c == nil { @@ -308,7 +350,8 @@ func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.Get } createItem := func() (cache.Item, error) { c.log.Debugf("cache miss for %s", sysAttachInfoKey(sys)) - return newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo), nil + cai := newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo) + return cai, nil } item, release, err := c.cache.GetOrCreate(ctx, sysAttachInfoKey(sys), createItem) diff --git a/src/control/cmd/daos_agent/infocache_test.go b/src/control/cmd/daos_agent/infocache_test.go index 54571d006a7..e86c44bfc0c 100644 --- a/src/control/cmd/daos_agent/infocache_test.go +++ b/src/control/cmd/daos_agent/infocache_test.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2023 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -8,20 +8,23 @@ package main import ( "context" + "fmt" "net" "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/pkg/errors" + "github.com/daos-stack/daos/src/control/build" "github.com/daos-stack/daos/src/control/common" "github.com/daos-stack/daos/src/control/common/test" "github.com/daos-stack/daos/src/control/lib/cache" "github.com/daos-stack/daos/src/control/lib/control" "github.com/daos-stack/daos/src/control/lib/hardware" + "github.com/daos-stack/daos/src/control/lib/telemetry" "github.com/daos-stack/daos/src/control/logging" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/pkg/errors" ) type testInfoCacheParams struct { @@ -32,6 +35,8 @@ type testInfoCacheParams struct { mockNetDevStateGetter hardware.NetDevStateProvider disableFabricCache bool disableAttachInfoCache bool + enableClientTelemetry bool + retainClientTelemetry bool ctlInvoker control.Invoker cachedItems []cache.Item } @@ -43,16 +48,19 @@ func newTestInfoCache(t *testing.T, log logging.Logger, params testInfoCachePara } ic := &InfoCache{ - log: log, - getAttachInfo: params.mockGetAttachInfo, - fabricScan: params.mockScanFabric, - devClassGetter: params.mockNetDevClassGetter, - devStateGetter: params.mockNetDevStateGetter, - netIfaces: params.mockNetIfaces, - client: params.ctlInvoker, - cache: c, + log: log, + getAttachInfoCb: params.mockGetAttachInfo, + fabricScan: params.mockScanFabric, + devClassGetter: params.mockNetDevClassGetter, + devStateGetter: params.mockNetDevStateGetter, + netIfaces: params.mockNetIfaces, + client: params.ctlInvoker, + cache: c, } + ic.clientTelemetryEnabled.Store(params.enableClientTelemetry) + ic.clientTelemetryRetain.Store(params.retainClientTelemetry) + if ic.netIfaces == nil { ic.netIfaces = func() ([]net.Interface, error) { return []net.Interface{ @@ -714,6 +722,14 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) { NetDevClass: uint32(hardware.Ether), }, } + telemEnabledResp := copyGetAttachInfoResp(ctlResp) + telemEnabledResp.ClientNetHint.EnvVars = append(telemEnabledResp.ClientNetHint.EnvVars, + fmt.Sprintf("%s=1", telemetry.ClientMetricsEnabledEnv), + ) + telemRetainedResp := copyGetAttachInfoResp(telemEnabledResp) + telemRetainedResp.ClientNetHint.EnvVars = append(telemRetainedResp.ClientNetHint.EnvVars, + fmt.Sprintf("%s=1", telemetry.ClientMetricsRetainEnv), + ) for name, tc := range map[string]struct { getInfoCache func(logging.Logger) *InfoCache @@ -734,7 +750,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) { disableAttachInfoCache: true, }) }, - remoteResp: ctlResp, + remoteResp: copyGetAttachInfoResp(ctlResp), expResp: ctlResp, expRemote: true, }, @@ -748,11 +764,45 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) { expErr: errors.New("mock remote"), expRemote: true, }, + "cache disabled; client telemetry enabled": { + getInfoCache: func(l logging.Logger) *InfoCache { + return newTestInfoCache(t, l, testInfoCacheParams{ + disableAttachInfoCache: true, + enableClientTelemetry: true, + }) + }, + remoteResp: copyGetAttachInfoResp(ctlResp), + expResp: telemEnabledResp, + expRemote: true, + }, + "cache enabled; client telemetry enabled": { + getInfoCache: func(l logging.Logger) *InfoCache { + return newTestInfoCache(t, l, testInfoCacheParams{ + enableClientTelemetry: true, + }) + }, + remoteResp: copyGetAttachInfoResp(ctlResp), + expResp: telemEnabledResp, + expRemote: true, + expCached: true, + }, + "cache enabled; client telemetry enabled; client telemetry retained": { + getInfoCache: func(l logging.Logger) *InfoCache { + return newTestInfoCache(t, l, testInfoCacheParams{ + enableClientTelemetry: true, + retainClientTelemetry: true, + }) + }, + remoteResp: copyGetAttachInfoResp(ctlResp), + expResp: telemRetainedResp, + expRemote: true, + expCached: true, + }, "enabled but empty": { getInfoCache: func(l logging.Logger) *InfoCache { return newTestInfoCache(t, l, testInfoCacheParams{}) }, - remoteResp: ctlResp, + remoteResp: copyGetAttachInfoResp(ctlResp), expResp: ctlResp, expRemote: true, expCached: true, @@ -772,7 +822,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) { fetch: func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) { return nil, errors.New("shouldn't call cached remote") }, - lastResponse: ctlResp, + lastResponse: copyGetAttachInfoResp(ctlResp), cacheItem: cacheItem{lastCached: time.Now()}, system: "test", }) @@ -790,7 +840,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) { fetch: func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) { return nil, errors.New("shouldn't call cached remote") }, - lastResponse: ctlResp, + lastResponse: copyGetAttachInfoResp(ctlResp), cacheItem: cacheItem{lastCached: time.Now()}, system: build.DefaultSystemName, }) @@ -814,7 +864,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) { return ic }, system: "somethingelse", - remoteResp: ctlResp, + remoteResp: copyGetAttachInfoResp(ctlResp), expResp: ctlResp, expCached: true, expRemote: true, @@ -831,7 +881,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) { calledRemote := false if ic != nil { - ic.getAttachInfo = func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) { + ic.getAttachInfoCb = func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) { calledRemote = true return tc.remoteResp, tc.remoteErr } diff --git a/src/control/cmd/daos_agent/start.go b/src/control/cmd/daos_agent/start.go index cb5505234d5..0e8e6ca2016 100644 --- a/src/control/cmd/daos_agent/start.go +++ b/src/control/cmd/daos_agent/start.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2020-2023 Intel Corporation. +// (C) Copyright 2020-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -127,6 +127,16 @@ func (cmd *startCmd) Execute(_ []string) error { } cmd.Debugf("dRPC socket server started: %s", time.Since(drpcSrvStart)) + if cmd.cfg.TelemetryEnabled() { + telemetryStart := time.Now() + shutdown, err := startPrometheusExporter(ctx, cmd, cmd.cfg) + if err != nil { + return errors.Wrap(err, "unable to start prometheus exporter") + } + defer shutdown() + cmd.Debugf("telemetry exporter started: %s", time.Since(telemetryStart)) + } + cmd.Debugf("startup complete in %s", time.Since(startedAt)) cmd.Infof("%s (pid %d) listening on %s", versionString(), os.Getpid(), sockPath) if err := systemd.Ready(); err != nil && err != systemd.ErrSdNotifyNoSocket { diff --git a/src/control/cmd/daos_agent/telemetry.go b/src/control/cmd/daos_agent/telemetry.go new file mode 100644 index 00000000000..5bce4c7e64c --- /dev/null +++ b/src/control/cmd/daos_agent/telemetry.go @@ -0,0 +1,36 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// + +package main + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/daos-stack/daos/src/control/lib/telemetry/promexp" + "github.com/daos-stack/daos/src/control/logging" +) + +func startPrometheusExporter(ctx context.Context, log logging.Logger, cfg *Config) (func(), error) { + expCfg := &promexp.ExporterConfig{ + Port: cfg.TelemetryPort, + Title: "DAOS Client Telemetry", + Register: func(ctx context.Context, log logging.Logger) error { + c, err := promexp.NewClientCollector(ctx, log, &promexp.CollectorOpts{ + RetainDuration: cfg.TelemetryRetain, + }) + if err != nil { + return err + } + prometheus.MustRegister(c) + + return nil + }, + } + + return promexp.StartExporter(ctx, log, expCfg) +} diff --git a/src/control/common/test/utils.go b/src/control/common/test/utils.go index 81c17facecd..f7cc72ef72a 100644 --- a/src/control/common/test/utils.go +++ b/src/control/common/test/utils.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2018-2022 Intel Corporation. +// (C) Copyright 2018-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -26,6 +26,8 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "golang.org/x/sys/unix" "google.golang.org/protobuf/testing/protocmp" + + "github.com/daos-stack/daos/src/control/logging" ) // AssertTrue asserts b is true @@ -412,3 +414,14 @@ func Context(t *testing.T) context.Context { t.Cleanup(cancel) return ctx } + +// MustLogContext returns a context containing the supplied logger. +// Canceled when the test is done. +func MustLogContext(t *testing.T, log logging.Logger) context.Context { + t.Helper() + ctx, err := logging.ToContext(Context(t), log) + if err != nil { + t.Fatal(err) + } + return ctx +} diff --git a/src/control/lib/telemetry/promexp/client.go b/src/control/lib/telemetry/promexp/client.go new file mode 100644 index 00000000000..273fea53037 --- /dev/null +++ b/src/control/lib/telemetry/promexp/client.go @@ -0,0 +1,168 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// +//go:build linux && (amd64 || arm64) +// +build linux +// +build amd64 arm64 + +package promexp + +import ( + "context" + "regexp" + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + + "github.com/daos-stack/daos/src/control/lib/atm" + "github.com/daos-stack/daos/src/control/lib/telemetry" + "github.com/daos-stack/daos/src/control/logging" +) + +type ( + // ClientCollector is a metrics collector for DAOS client metrics. + ClientCollector struct { + metricsCollector + } +) + +func extractClientLabels(log logging.Logger, in string) (labels labelMap, name string) { + log.Tracef("in: %q", in) + + labels = make(labelMap) + compsIdx := 0 + comps := strings.Split(in, string(telemetry.PathSep)) + if len(comps) == 0 { + return labels, "" + } + + if strings.HasPrefix(comps[compsIdx], "ID") { + if len(comps) == 1 { + return labels, "" + } + compsIdx++ + } + + for i, label := range []string{"job", "pid", "tid"} { + if i > 0 { + // After jobid, we should have a pid and/or tid, and + // then move on to the engine labels. + _, err := strconv.Atoi(comps[compsIdx]) + if err != nil { + break + } + } + + if len(comps) == compsIdx+1 { + // If we have a weird path ending on a pid or tid, treat it + // as empty of labels. + if _, err := strconv.Atoi(comps[compsIdx]); err == nil && i > 0 { + return labelMap{}, "" + } + return labels, comps[compsIdx] + } + labels[label] = comps[compsIdx] + compsIdx++ + } + + var engLabels labelMap + engLabels, name = extractEngineLabels(log, strings.Join(comps[compsIdx:], string(telemetry.PathSep))) + for k, v := range engLabels { + labels[k] = v + } + + return +} + +func newClientMetric(log logging.Logger, m telemetry.Metric) *sourceMetric { + labels, name := extractClientLabels(log, m.FullPath()) + baseName := "client_" + name + + return newSourceMetric(log, m, baseName, labels) +} + +// newClientSource creates a new MetricSource for client metrics. +func newClientSource(parent context.Context) (*MetricSource, func(), error) { + ctx, err := telemetry.InitClientRoot(parent) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to init telemetry") + } + + cleanupFn := func() { + telemetry.Detach(ctx) + } + + return &MetricSource{ + ctx: ctx, + enabled: atm.NewBool(true), + tmSchema: telemetry.NewSchema(), + smSchema: newSourceMetricSchema(newClientMetric), + }, cleanupFn, nil +} + +// NewClientCollector creates a new ClientCollector instance. +func NewClientCollector(ctx context.Context, log logging.Logger, opts *CollectorOpts) (*ClientCollector, error) { + source, cleanup, err := newClientSource(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to create client source") + } + + if opts == nil { + opts = defaultCollectorOpts() + } + + go func() { + <-ctx.Done() + cleanup() + }() + + if opts.RetainDuration != 0 { + log.Debugf("pruning client metrics every %s", opts.RetainDuration) + + go func() { + pruneTicker := time.NewTicker(opts.RetainDuration) + defer pruneTicker.Stop() + + for { + select { + case <-ctx.Done(): + case <-pruneTicker.C: + source.PruneSegments(log, opts.RetainDuration) + } + } + }() + } + + c := &ClientCollector{ + metricsCollector: metricsCollector{ + log: log, + summary: prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "client", + Subsystem: "exporter", + Name: "scrape_duration_seconds", + Help: "daos_client_exporter: Duration of a scrape job.", + }, + []string{"source", "result"}, + ), + collectFn: func(ch chan *sourceMetric) { + source.Collect(log, ch) + }, + }, + } + + for _, pat := range opts.Ignores { + re, err := regexp.Compile(pat) + if err != nil { + return nil, errors.Wrapf(err, "failed to compile %q", pat) + } + c.ignoredMetrics = append(c.ignoredMetrics, re) + } + + return c, nil +} diff --git a/src/control/lib/telemetry/promexp/client_test.go b/src/control/lib/telemetry/promexp/client_test.go new file mode 100644 index 00000000000..ea60ceba0d3 --- /dev/null +++ b/src/control/lib/telemetry/promexp/client_test.go @@ -0,0 +1,158 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// + +package promexp + +import ( + "fmt" + "regexp" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/prometheus/client_golang/prometheus" + + "github.com/daos-stack/daos/src/control/common/test" + "github.com/daos-stack/daos/src/control/logging" +) + +func TestPromExp_extractClientLabels(t *testing.T) { + shmID := 256 + jobID := "testJob" + pid := "12345" + tid := "67890" + + testPath := func(suffix string) string { + return fmt.Sprintf("ID: %d/%s/%s/%s/%s", shmID, jobID, pid, tid, suffix) + } + + for name, tc := range map[string]struct { + input string + expName string + expLabels labelMap + }{ + "empty": { + expLabels: labelMap{}, + }, + "ID stripped": { + input: "ID: 123", + expLabels: labelMap{}, + }, + "weird truncation": { + input: "ID: 123/jobbo/6783/90", + expLabels: labelMap{}, + }, + "active update ops": { + input: testPath("io/ops/update/active"), + expName: "io_ops_update_active", + expLabels: labelMap{ + "job": jobID, + "pid": pid, + "tid": tid, + }, + }, + "fetch latency 1MB": { + input: testPath("io/latency/fetch/1MB"), + expName: "io_latency_fetch", + expLabels: labelMap{ + "job": jobID, + "pid": pid, + "tid": tid, + "size": "1MB", + }, + }, + "started_at": { + input: fmt.Sprintf("ID: %d/%s/%s/started_at", shmID, jobID, pid), + expName: "started_at", + expLabels: labelMap{ + "job": jobID, + "pid": pid, + }, + }, + "pool ops": { + input: fmt.Sprintf("ID: %d/%s/%s/pool/%s/ops/foo", shmID, jobID, pid, test.MockPoolUUID(1)), + expName: "pool_ops_foo", + expLabels: labelMap{ + "job": jobID, + "pid": pid, + "pool": test.MockPoolUUID(1).String(), + }, + }, + } { + t.Run(name, func(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + labels, name := extractClientLabels(log, tc.input) + + test.AssertEqual(t, name, tc.expName, "") + if diff := cmp.Diff(labels, tc.expLabels); diff != "" { + t.Errorf("labels mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestPromExp_NewClientCollector(t *testing.T) { + for name, tc := range map[string]struct { + opts *CollectorOpts + expErr error + expResult *ClientCollector + }{ + "defaults": { + expResult: &ClientCollector{ + metricsCollector: metricsCollector{ + summary: &prometheus.SummaryVec{ + MetricVec: &prometheus.MetricVec{}, + }, + }, + }, + }, + "opts with ignores": { + opts: &CollectorOpts{Ignores: []string{"one", "two"}}, + expResult: &ClientCollector{ + metricsCollector: metricsCollector{ + summary: &prometheus.SummaryVec{ + MetricVec: &prometheus.MetricVec{}, + }, + ignoredMetrics: []*regexp.Regexp{ + regexp.MustCompile("one"), + regexp.MustCompile("two"), + }, + }, + }, + }, + } { + t.Run(name, func(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + result, err := NewClientCollector(test.Context(t), log, tc.opts) + + test.CmpErr(t, tc.expErr, err) + + cmpOpts := []cmp.Option{ + cmpopts.IgnoreUnexported(MetricSource{}), + cmpopts.IgnoreUnexported(prometheus.SummaryVec{}), + cmpopts.IgnoreUnexported(prometheus.MetricVec{}), + cmpopts.IgnoreUnexported(regexp.Regexp{}), + cmp.AllowUnexported(ClientCollector{}), + cmp.AllowUnexported(metricsCollector{}), + cmp.FilterPath(func(p cmp.Path) bool { + // Ignore a few specific fields + return (strings.HasSuffix(p.String(), "log") || + strings.HasSuffix(p.String(), "sourceMutex") || + strings.HasSuffix(p.String(), "cleanupSource") || + strings.HasSuffix(p.String(), "collectFn")) + }, cmp.Ignore()), + } + if diff := cmp.Diff(tc.expResult, result, cmpOpts...); diff != "" { + t.Fatalf("(-want, +got)\n%s", diff) + } + }) + } +} diff --git a/src/control/lib/telemetry/promexp/collector.go b/src/control/lib/telemetry/promexp/collector.go index d57f1cf1687..33dfd925dfd 100644 --- a/src/control/lib/telemetry/promexp/collector.go +++ b/src/control/lib/telemetry/promexp/collector.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2021-2022 Intel Corporation. +// (C) Copyright 2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -7,441 +7,34 @@ // +build linux // +build amd64 arm64 -// - package promexp import ( - "context" - "fmt" "regexp" - "strings" - "sync" - "unicode" + "time" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/daos-stack/daos/src/control/lib/atm" "github.com/daos-stack/daos/src/control/lib/telemetry" "github.com/daos-stack/daos/src/control/logging" ) type ( - Collector struct { - log logging.Logger - summary *prometheus.SummaryVec - ignoredMetrics []*regexp.Regexp - sources []*EngineSource - cleanupSource map[uint32]func() - sourceMutex sync.RWMutex // To protect sources - } - + // CollectorOpts contains options for the metrics collector. CollectorOpts struct { - Ignores []string - } - - EngineSource struct { - ctx context.Context - tmMutex sync.RWMutex // To protect telemetry collection - Index uint32 - Rank uint32 - enabled atm.Bool - tmSchema *telemetry.Schema - rmSchema rankMetricSchema + Ignores []string + RetainDuration time.Duration } - rankMetricSchema struct { - mu sync.Mutex - rankMetrics map[string]*rankMetric - seen map[string]struct{} + metricsCollector struct { + log logging.Logger + summary *prometheus.SummaryVec + ignoredMetrics []*regexp.Regexp + collectFn func(ch chan *sourceMetric) } ) -func (s *rankMetricSchema) Prune() { - s.mu.Lock() - defer s.mu.Unlock() - - for id := range s.rankMetrics { - if _, found := s.seen[id]; !found { - delete(s.rankMetrics, id) - } - } - s.seen = make(map[string]struct{}) -} - -func (s *rankMetricSchema) add(log logging.Logger, rank uint32, metric telemetry.Metric) (rm *rankMetric) { - s.mu.Lock() - defer s.mu.Unlock() - - id := metric.FullPath() - s.seen[id] = struct{}{} - - var found bool - if rm, found = s.rankMetrics[id]; !found { - rm = newRankMetric(log, rank, metric) - s.rankMetrics[id] = rm - } else { - rm.resetVecs() - } - - return -} - -func NewEngineSource(parent context.Context, idx uint32, rank uint32) (*EngineSource, func(), error) { - ctx, err := telemetry.Init(parent, idx) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to init telemetry") - } - - cleanupFn := func() { - telemetry.Detach(ctx) - } - - return &EngineSource{ - ctx: ctx, - Index: idx, - Rank: rank, - enabled: atm.NewBool(true), - tmSchema: telemetry.NewSchema(), - rmSchema: rankMetricSchema{ - rankMetrics: make(map[string]*rankMetric), - seen: make(map[string]struct{}), - }, - }, cleanupFn, nil -} - -func defaultCollectorOpts() *CollectorOpts { - return &CollectorOpts{} -} - -func NewCollector(log logging.Logger, opts *CollectorOpts, sources ...*EngineSource) (*Collector, error) { - if opts == nil { - opts = defaultCollectorOpts() - } - - c := &Collector{ - log: log, - sources: sources, - cleanupSource: make(map[uint32]func()), - summary: prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: "engine", - Subsystem: "exporter", - Name: "scrape_duration_seconds", - Help: "daos_exporter: Duration of a scrape job.", - }, - []string{"source", "result"}, - ), - } - - for _, pat := range opts.Ignores { - re, err := regexp.Compile(pat) - if err != nil { - return nil, errors.Wrapf(err, "failed to compile %q", pat) - } - c.ignoredMetrics = append(c.ignoredMetrics, re) - } - - return c, nil -} - -type labelMap map[string]string - -func (lm labelMap) keys() (keys []string) { - for label := range lm { - keys = append(keys, label) - } - - return -} - -func sanitizeMetricName(in string) string { - return strings.Map(func(r rune) rune { - switch { - // Valid names for Prometheus are limited to: - case r >= 'a' && r <= 'z': // lowercase letters - case r >= 'A' && r <= 'Z': // uppercase letters - case unicode.IsDigit(r): // digits - default: // sanitize any other character - return '_' - } - - return r - }, strings.TrimLeft(in, "/")) -} - -func matchLabel(labels labelMap, input, match, label string) bool { - if !strings.HasPrefix(input, match) { - return false - } - - splitStr := strings.SplitN(input, "_", 2) - if len(splitStr) == 2 { - labels[label] = splitStr[1] - return true - } - return false -} - -func appendName(cur, name string) string { - if cur == "" { - return name - } - return cur + "_" + name -} - -// extractLabels takes a "/"-separated DAOS metric name in order to -// create a normalized Prometheus name and label map. -// -// NB: Prometheus metric names should follow best practices as -// outlined at https://prometheus.io/docs/practices/naming/ -// -// In particular, a metric name should describe the measurement, -// not the entity the measurement is about. In other words, if 4 -// different entities share the same measurement, then there should -// be a single metric with a label that distinguishes between -// individual measurement values. -// -// Good: pool_started_at {pool="00000000-1111-2222-3333-4444444444"} -// Bad: pool_00000000_1111_2222_3333_4444444444_started_at -func extractLabels(in string) (labels labelMap, name string) { - labels = make(labelMap) - compsIdx := 0 - comps := strings.Split(in, string(telemetry.PathSep)) - if len(comps) == 0 { - return labels, in - } - - if strings.HasPrefix(comps[compsIdx], "ID") { - if len(comps) == 1 { - return labels, "" - } - compsIdx++ - } - - switch comps[compsIdx] { - case "pool": - name = "pool" - compsIdx++ - labels["pool"] = comps[compsIdx] - compsIdx++ - switch comps[compsIdx] { - case "ops": - compsIdx++ - name += "_ops_" + comps[compsIdx] - compsIdx++ - } - case "io": - name = "io" - compsIdx++ - switch comps[compsIdx] { - case "latency": - compsIdx++ - name += "_latency_" + comps[compsIdx] - compsIdx++ - labels["size"] = comps[compsIdx] - compsIdx++ - case "ops": - compsIdx++ - name += "_ops_" + comps[compsIdx] - compsIdx++ - default: - name += "_" + comps[compsIdx] - compsIdx++ - } - case "net": - compsIdx++ - if comps[compsIdx] == "uri" { - compsIdx++ - name = "net_uri_" + comps[compsIdx] - compsIdx++ - break - } - - name = "net" - labels["provider"] = comps[compsIdx] - compsIdx++ - case "nvme": - name = "nvme" - compsIdx++ - labels["device"] = comps[compsIdx] - compsIdx++ - } - - for { - if len(comps) == compsIdx { - break - } - - switch { - case matchLabel(labels, comps[compsIdx], "tgt_", "target"): - compsIdx++ - case matchLabel(labels, comps[compsIdx], "xs_", "xstream"): - compsIdx++ - case matchLabel(labels, comps[compsIdx], "ctx_", "context"): - compsIdx++ - default: - name = appendName(name, comps[compsIdx]) - compsIdx++ - } - } - - name = sanitizeMetricName(name) - return -} - -func (es *EngineSource) Collect(log logging.Logger, ch chan<- *rankMetric) { - if es == nil { - log.Error("nil engine source") - return - } - if !es.IsEnabled() { - return - } - if ch == nil { - log.Error("nil channel") - return - } - - es.tmMutex.RLock() - defer es.tmMutex.RUnlock() - - metrics := make(chan telemetry.Metric) - go func() { - if err := telemetry.CollectMetrics(es.ctx, es.tmSchema, metrics); err != nil { - log.Errorf("failed to collect metrics for engine rank %d: %s", es.Rank, err) - return - } - es.tmSchema.Prune() - }() - - for metric := range metrics { - ch <- es.rmSchema.add(log, es.Rank, metric) - } - es.rmSchema.Prune() -} - -// IsEnabled checks if the engine source is enabled. -func (es *EngineSource) IsEnabled() bool { - return es.enabled.IsTrue() -} - -// Enable enables the engine source. -func (es *EngineSource) Enable() { - es.enabled.SetTrue() -} - -// Disable disables the engine source. -func (es *EngineSource) Disable() { - es.enabled.SetFalse() -} - -type gvMap map[string]*prometheus.GaugeVec - -func (m gvMap) add(name, help string, labels labelMap) { - if _, found := m[name]; !found { - gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: name, - Help: help, - }, labels.keys()) - m[name] = gv - } -} - -func (m gvMap) set(name string, value float64, labels labelMap) error { - gv, found := m[name] - if !found { - return errors.Errorf("gauge vector %s not found", name) - } - gv.With(prometheus.Labels(labels)).Set(value) - - return nil -} - -type cvMap map[string]*prometheus.CounterVec - -func (m cvMap) add(name, help string, labels labelMap) { - if _, found := m[name]; !found { - cv := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: name, - Help: help, - }, labels.keys()) - m[name] = cv - } -} - -func (m cvMap) set(name string, value float64, labels labelMap) error { - cv, found := m[name] - if !found { - return errors.Errorf("counter vector %s not found", name) - } - cv.With(prometheus.Labels(labels)).Add(value) - - return nil -} - -type rankMetric struct { - rank uint32 - metric telemetry.Metric - baseName string - labels labelMap - gvm gvMap - cvm cvMap -} - -func (rm *rankMetric) collect(ch chan<- prometheus.Metric) { - for _, gv := range rm.gvm { - gv.Collect(ch) - } - for _, cv := range rm.cvm { - cv.Collect(ch) - } -} - -func (rm *rankMetric) resetVecs() { - for _, gv := range rm.gvm { - gv.Reset() - } - for _, cv := range rm.cvm { - cv.Reset() - } -} - -func newRankMetric(log logging.Logger, rank uint32, m telemetry.Metric) *rankMetric { - rm := &rankMetric{ - metric: m, - rank: rank, - gvm: make(gvMap), - cvm: make(cvMap), - } - - var name string - rm.labels, name = extractLabels(m.FullPath()) - rm.labels["rank"] = fmt.Sprintf("%d", rm.rank) - rm.baseName = "engine_" + name - - desc := m.Desc() - - switch rm.metric.Type() { - case telemetry.MetricTypeGauge, telemetry.MetricTypeTimestamp, - telemetry.MetricTypeSnapshot: - rm.gvm.add(rm.baseName, desc, rm.labels) - case telemetry.MetricTypeStatsGauge, telemetry.MetricTypeDuration: - rm.gvm.add(rm.baseName, desc, rm.labels) - for _, ms := range getMetricStats(rm.baseName, rm.metric) { - rm.gvm.add(ms.name, ms.desc, rm.labels) - } - case telemetry.MetricTypeCounter: - rm.cvm.add(rm.baseName, desc, rm.labels) - default: - log.Errorf("[%s]: metric type %d not supported", name, rm.metric.Type()) - } - - return rm -} - -func (c *Collector) isIgnored(name string) bool { +func (c *metricsCollector) isIgnored(name string) bool { for _, re := range c.ignoredMetrics { // TODO: We may want to look into removing the use of regexp here // in favor of a less-flexible but more efficient approach. @@ -454,105 +47,7 @@ func (c *Collector) isIgnored(name string) bool { return false } -type metricStat struct { - name string - desc string - value float64 -} - -func getMetricStats(baseName string, m telemetry.Metric) (stats []*metricStat) { - ms, ok := m.(telemetry.StatsMetric) - if !ok { - return - } - - for name, s := range map[string]struct { - fn func() float64 - desc string - }{ - "min": { - fn: ms.FloatMin, - desc: " (min value)", - }, - "max": { - fn: ms.FloatMax, - desc: " (max value)", - }, - "mean": { - fn: ms.Mean, - desc: " (mean)", - }, - "stddev": { - fn: ms.StdDev, - desc: " (std dev)", - }, - } { - stats = append(stats, &metricStat{ - name: baseName + "_" + name, - desc: m.Desc() + s.desc, - value: s.fn(), - }) - } - - return -} - -// AddSource adds an EngineSource to the Collector. -func (c *Collector) AddSource(es *EngineSource, cleanup func()) { - if es == nil { - c.log.Error("attempted to add nil EngineSource") - return - } - - c.sourceMutex.Lock() - defer c.sourceMutex.Unlock() - - // If we attempt to add a duplicate, remove the old one. - c.removeSourceNoLock(es.Index) - - c.sources = append(c.sources, es) - if cleanup != nil { - c.cleanupSource[es.Index] = cleanup - } -} - -// RemoveSource removes an EngineSource with a given index from the Collector. -func (c *Collector) RemoveSource(engineIdx uint32) { - c.sourceMutex.Lock() - defer c.sourceMutex.Unlock() - - c.removeSourceNoLock(engineIdx) -} - -func (c *Collector) removeSourceNoLock(engineIdx uint32) { - for i, es := range c.sources { - if es.Index == engineIdx { - es.Disable() - c.sources = append(c.sources[:i], c.sources[i+1:]...) - - // Ensure that EngineSource isn't collecting during cleanup - es.tmMutex.Lock() - if cleanup, found := c.cleanupSource[engineIdx]; found && cleanup != nil { - cleanup() - } - es.tmMutex.Unlock() - delete(c.cleanupSource, engineIdx) - break - } - } -} - -func (c *Collector) getSources() []*EngineSource { - c.sourceMutex.RLock() - defer c.sourceMutex.RUnlock() - - sourceCopy := make([]*EngineSource, len(c.sources)) - _ = copy(sourceCopy, c.sources) - return sourceCopy -} - -// Collect collects metrics from all EngineSources. -func (c *Collector) Collect(ch chan<- prometheus.Metric) { +func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) { if c == nil { return } @@ -560,49 +55,51 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) { c.log.Error("passed a nil channel") return } + if c.collectFn == nil { + c.log.Error("collectFn is nil") + return + } - rankMetrics := make(chan *rankMetric) - go func(sources []*EngineSource) { - for _, source := range sources { - source.Collect(c.log, rankMetrics) - } - close(rankMetrics) - }(c.getSources()) + sourceMetrics := make(chan *sourceMetric) + go func() { + c.collectFn(sourceMetrics) + close(sourceMetrics) + }() - for rm := range rankMetrics { - if c.isIgnored(rm.baseName) { + for sm := range sourceMetrics { + if c.isIgnored(sm.baseName) { continue } var err error - switch rm.metric.Type() { + switch sm.metric.Type() { case telemetry.MetricTypeGauge, telemetry.MetricTypeTimestamp, telemetry.MetricTypeSnapshot: - err = rm.gvm.set(rm.baseName, rm.metric.FloatValue(), rm.labels) + err = sm.gvm.set(sm.baseName, sm.metric.FloatValue(), sm.labels) case telemetry.MetricTypeStatsGauge, telemetry.MetricTypeDuration: - if err = rm.gvm.set(rm.baseName, rm.metric.FloatValue(), rm.labels); err != nil { + if err = sm.gvm.set(sm.baseName, sm.metric.FloatValue(), sm.labels); err != nil { break } - for _, ms := range getMetricStats(rm.baseName, rm.metric) { - if err = rm.gvm.set(ms.name, ms.value, rm.labels); err != nil { + for _, ms := range getMetricStats(sm.baseName, sm.metric) { + if err = sm.gvm.set(ms.name, ms.value, sm.labels); err != nil { break } } case telemetry.MetricTypeCounter: - err = rm.cvm.set(rm.baseName, rm.metric.FloatValue(), rm.labels) + err = sm.cvm.set(sm.baseName, sm.metric.FloatValue(), sm.labels) default: - c.log.Errorf("[%s]: metric type %d not supported", rm.baseName, rm.metric.Type()) + c.log.Errorf("[%s]: metric type %d not supported", sm.baseName, sm.metric.Type()) } if err != nil { - c.log.Errorf("[%s]: %s", rm.baseName, err) + c.log.Errorf("[%s]: %s", sm.baseName, err) continue } - rm.collect(ch) + sm.collect(ch) } } -func (c *Collector) Describe(ch chan<- *prometheus.Desc) { +func (c *metricsCollector) Describe(ch chan<- *prometheus.Desc) { c.summary.Describe(ch) } diff --git a/src/control/lib/telemetry/promexp/engine.go b/src/control/lib/telemetry/promexp/engine.go new file mode 100644 index 00000000000..d97226e7b56 --- /dev/null +++ b/src/control/lib/telemetry/promexp/engine.go @@ -0,0 +1,271 @@ +// +// (C) Copyright 2021-2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// +//go:build linux && (amd64 || arm64) +// +build linux +// +build amd64 arm64 + +package promexp + +import ( + "context" + "fmt" + "regexp" + "strings" + "sync" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + + "github.com/daos-stack/daos/src/control/lib/atm" + "github.com/daos-stack/daos/src/control/lib/telemetry" + "github.com/daos-stack/daos/src/control/logging" +) + +type ( + // EngineCollector collects metrics from DAOS Engine sources. + EngineCollector struct { + metricsCollector + sources []*EngineSource + cleanupSource map[uint32]func() + sourceMutex sync.RWMutex // To protect sources + } + + // EngineSource provides metrics for a single DAOS Engine. + EngineSource struct { + MetricSource + Index uint32 + Rank uint32 + } +) + +// NewEngineSource initializes a new metrics source for a DAOS Engine. +func NewEngineSource(parent context.Context, idx uint32, rank uint32) (*EngineSource, func(), error) { + ctx, err := telemetry.Init(parent, idx) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to init telemetry") + } + + cleanupFn := func() { + telemetry.Detach(ctx) + } + + return &EngineSource{ + MetricSource: MetricSource{ + ctx: ctx, + enabled: atm.NewBool(true), + tmSchema: telemetry.NewSchema(), + smSchema: newSourceMetricSchema(func(l logging.Logger, m telemetry.Metric) *sourceMetric { + return newRankMetric(l, rank, m) + }), + }, + Index: idx, + Rank: rank, + }, cleanupFn, nil +} + +// NewEngineCollector initializes a new collector for DAOS Engine sources. +func NewEngineCollector(log logging.Logger, opts *CollectorOpts, sources ...*EngineSource) (*EngineCollector, error) { + if opts == nil { + opts = defaultCollectorOpts() + } + + c := &EngineCollector{ + metricsCollector: metricsCollector{ + log: log, + summary: prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "engine", + Subsystem: "exporter", + Name: "scrape_duration_seconds", + Help: "daos_exporter: Duration of a scrape job.", + }, + []string{"source", "result"}, + ), + }, + sources: sources, + cleanupSource: make(map[uint32]func()), + } + + c.collectFn = func(metrics chan *sourceMetric) { + for _, source := range c.getSources() { + source.Collect(c.log, metrics) + } + } + + for _, pat := range opts.Ignores { + re, err := regexp.Compile(pat) + if err != nil { + return nil, errors.Wrapf(err, "failed to compile %q", pat) + } + c.ignoredMetrics = append(c.ignoredMetrics, re) + } + + return c, nil +} + +// extractEngineLabels takes a "/"-separated DAOS metric name in order to +// create a normalized Prometheus name and label map. +// +// NB: Prometheus metric names should follow best practices as +// outlined at https://prometheus.io/docs/practices/naming/ +// +// In particular, a metric name should describe the measurement, +// not the entity the measurement is about. In other words, if 4 +// different entities share the same measurement, then there should +// be a single metric with a label that distinguishes between +// individual measurement values. +// +// Good: pool_started_at {pool="00000000-1111-2222-3333-4444444444"} +// Bad: pool_00000000_1111_2222_3333_4444444444_started_at +func extractEngineLabels(log logging.Logger, in string) (labels labelMap, name string) { + log.Tracef("in: %q", in) + + labels = make(labelMap) + compsIdx := 0 + comps := strings.Split(in, string(telemetry.PathSep)) + if len(comps) == 0 { + return labels, "" + } + + if strings.HasPrefix(comps[compsIdx], "ID") { + if len(comps) == 1 { + return labels, "" + } + compsIdx++ + } + + switch comps[compsIdx] { + case "pool": + name = "pool" + compsIdx++ + labels["pool"] = comps[compsIdx] + compsIdx++ + switch comps[compsIdx] { + case "ops": + compsIdx++ + name += "_ops_" + comps[compsIdx] + compsIdx++ + } + case "io": + name = "io" + compsIdx++ + switch comps[compsIdx] { + case "latency": + compsIdx++ + name += "_latency_" + comps[compsIdx] + compsIdx++ + labels["size"] = comps[compsIdx] + compsIdx++ + case "ops": + compsIdx++ + name += "_ops_" + comps[compsIdx] + compsIdx++ + default: + name += "_" + comps[compsIdx] + compsIdx++ + } + case "net": + compsIdx++ + if comps[compsIdx] == "uri" { + compsIdx++ + name = "net_uri_" + comps[compsIdx] + compsIdx++ + break + } + + name = "net" + labels["provider"] = comps[compsIdx] + compsIdx++ + case "nvme": + name = "nvme" + compsIdx++ + labels["device"] = comps[compsIdx] + compsIdx++ + } + + for { + if len(comps) == compsIdx { + break + } + + switch { + case matchLabel(labels, comps[compsIdx], "tgt_", "target"): + compsIdx++ + case matchLabel(labels, comps[compsIdx], "xs_", "xstream"): + compsIdx++ + case matchLabel(labels, comps[compsIdx], "ctx_", "context"): + compsIdx++ + default: + name = appendName(name, comps[compsIdx]) + compsIdx++ + } + } + + name = sanitizeMetricName(name) + return +} + +func newRankMetric(log logging.Logger, rank uint32, m telemetry.Metric) *sourceMetric { + labels, name := extractEngineLabels(log, m.FullPath()) + baseName := "engine_" + name + labels["rank"] = fmt.Sprintf("%d", rank) + + return newSourceMetric(log, m, baseName, labels) +} + +// AddSource adds an EngineSource to the Collector. +func (c *EngineCollector) AddSource(es *EngineSource, cleanup func()) { + if es == nil { + c.log.Error("attempted to add nil EngineSource") + return + } + + c.sourceMutex.Lock() + defer c.sourceMutex.Unlock() + + // If we attempt to add a duplicate, remove the old one. + c.removeSourceNoLock(es.Index) + + c.sources = append(c.sources, es) + if cleanup != nil { + c.cleanupSource[es.Index] = cleanup + } +} + +// RemoveSource removes an EngineSource with a given index from the Collector. +func (c *EngineCollector) RemoveSource(engineIdx uint32) { + c.sourceMutex.Lock() + defer c.sourceMutex.Unlock() + + c.removeSourceNoLock(engineIdx) +} + +func (c *EngineCollector) removeSourceNoLock(engineIdx uint32) { + for i, es := range c.sources { + if es.Index == engineIdx { + es.Disable() + c.sources = append(c.sources[:i], c.sources[i+1:]...) + + // Ensure that EngineSource isn't collecting during cleanup + es.tmMutex.Lock() + if cleanup, found := c.cleanupSource[engineIdx]; found && cleanup != nil { + cleanup() + } + es.tmMutex.Unlock() + delete(c.cleanupSource, engineIdx) + break + } + } +} + +func (c *EngineCollector) getSources() []*EngineSource { + c.sourceMutex.RLock() + defer c.sourceMutex.RUnlock() + + sourceCopy := make([]*EngineSource, len(c.sources)) + _ = copy(sourceCopy, c.sources) + return sourceCopy +} diff --git a/src/control/lib/telemetry/promexp/collector_test.go b/src/control/lib/telemetry/promexp/engine_test.go similarity index 88% rename from src/control/lib/telemetry/promexp/collector_test.go rename to src/control/lib/telemetry/promexp/engine_test.go index 7dbe295d627..44924a6fcd7 100644 --- a/src/control/lib/telemetry/promexp/collector_test.go +++ b/src/control/lib/telemetry/promexp/engine_test.go @@ -1,12 +1,7 @@ // -// (C) Copyright 2021-2022 Intel Corporation. +// (C) Copyright 2021-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent -// -//go:build linux && (amd64 || arm64) -// +build linux -// +build amd64 arm64 - // package promexp @@ -62,7 +57,10 @@ func TestPromexp_NewEngineSource(t *testing.T) { test.CmpErr(t, tc.expErr, err) - if diff := cmp.Diff(tc.expResult, result, cmpopts.IgnoreUnexported(EngineSource{})); diff != "" { + cmpOpts := cmp.Options{ + cmpopts.IgnoreUnexported(MetricSource{}), + } + if diff := cmp.Diff(tc.expResult, result, cmpOpts...); diff != "" { t.Fatalf("(-want, +got)\n%s", diff) } @@ -155,31 +153,20 @@ func TestPromExp_EngineSource_Collect(t *testing.T) { for name, tc := range map[string]struct { es *EngineSource - resultChan chan *rankMetric + resultChan chan *sourceMetric expMetrics telemetry.TestMetricsMap }{ - "nil source": { - resultChan: make(chan *rankMetric), - }, "nil channel": { es: validSrc, }, - "bad source": { - es: &EngineSource{ - ctx: test.Context(t), - Rank: 123, - Index: testIdx + 1, - }, - resultChan: make(chan *rankMetric), - }, "success": { es: validSrc, - resultChan: make(chan *rankMetric), + resultChan: make(chan *sourceMetric), expMetrics: realMetrics, }, "disabled": { es: disabledSrc, - resultChan: make(chan *rankMetric), + resultChan: make(chan *sourceMetric), expMetrics: telemetry.TestMetricsMap{}, }, } { @@ -189,7 +176,7 @@ func TestPromExp_EngineSource_Collect(t *testing.T) { go tc.es.Collect(log, tc.resultChan) - gotMetrics := []*rankMetric{} + gotMetrics := []*sourceMetric{} for { done := false select { @@ -206,7 +193,7 @@ func TestPromExp_EngineSource_Collect(t *testing.T) { test.AssertEqual(t, len(tc.expMetrics), len(gotMetrics), "wrong number of metrics returned") for _, got := range gotMetrics { - test.AssertEqual(t, testRank, got.rank, "wrong rank") + test.AssertEqual(t, fmt.Sprintf("%d", testRank), got.labels["rank"], "wrong rank") expM, ok := tc.expMetrics[got.metric.Type()] if !ok { t.Fatalf("metric type %d not expected", got.metric.Type()) @@ -220,7 +207,7 @@ func TestPromExp_EngineSource_Collect(t *testing.T) { } } -func TestPromExp_NewCollector(t *testing.T) { +func TestPromExp_NewEngineCollector(t *testing.T) { testSrc := []*EngineSource{ { Rank: 1, @@ -234,20 +221,24 @@ func TestPromExp_NewCollector(t *testing.T) { sources []*EngineSource opts *CollectorOpts expErr error - expResult *Collector + expResult *EngineCollector }{ "no sources": { - expResult: &Collector{ - summary: &prometheus.SummaryVec{ - MetricVec: &prometheus.MetricVec{}, + expResult: &EngineCollector{ + metricsCollector: metricsCollector{ + summary: &prometheus.SummaryVec{ + MetricVec: &prometheus.MetricVec{}, + }, }, }, }, "defaults": { sources: testSrc, - expResult: &Collector{ - summary: &prometheus.SummaryVec{ - MetricVec: &prometheus.MetricVec{}, + expResult: &EngineCollector{ + metricsCollector: metricsCollector{ + summary: &prometheus.SummaryVec{ + MetricVec: &prometheus.MetricVec{}, + }, }, sources: testSrc, }, @@ -255,15 +246,17 @@ func TestPromExp_NewCollector(t *testing.T) { "opts with ignores": { sources: testSrc, opts: &CollectorOpts{Ignores: []string{"one", "two"}}, - expResult: &Collector{ - summary: &prometheus.SummaryVec{ - MetricVec: &prometheus.MetricVec{}, + expResult: &EngineCollector{ + metricsCollector: metricsCollector{ + summary: &prometheus.SummaryVec{ + MetricVec: &prometheus.MetricVec{}, + }, + ignoredMetrics: []*regexp.Regexp{ + regexp.MustCompile("one"), + regexp.MustCompile("two"), + }, }, sources: testSrc, - ignoredMetrics: []*regexp.Regexp{ - regexp.MustCompile("one"), - regexp.MustCompile("two"), - }, }, }, "bad regexp in ignores": { @@ -276,21 +269,23 @@ func TestPromExp_NewCollector(t *testing.T) { log, buf := logging.NewTestLogger(t.Name()) defer test.ShowBufferOnFailure(t, buf) - result, err := NewCollector(log, tc.opts, tc.sources...) + result, err := NewEngineCollector(log, tc.opts, tc.sources...) test.CmpErr(t, tc.expErr, err) cmpOpts := []cmp.Option{ - cmpopts.IgnoreUnexported(EngineSource{}), + cmpopts.IgnoreUnexported(MetricSource{}), cmpopts.IgnoreUnexported(prometheus.SummaryVec{}), cmpopts.IgnoreUnexported(prometheus.MetricVec{}), cmpopts.IgnoreUnexported(regexp.Regexp{}), - cmp.AllowUnexported(Collector{}), + cmp.AllowUnexported(EngineCollector{}), + cmp.AllowUnexported(metricsCollector{}), cmp.FilterPath(func(p cmp.Path) bool { // Ignore a few specific fields return (strings.HasSuffix(p.String(), "log") || strings.HasSuffix(p.String(), "sourceMutex") || - strings.HasSuffix(p.String(), "cleanupSource")) + strings.HasSuffix(p.String(), "cleanupSource") || + strings.HasSuffix(p.String(), "collectFn")) }, cmp.Ignore()), } if diff := cmp.Diff(tc.expResult, result, cmpOpts...); diff != "" { @@ -338,7 +333,7 @@ func TestPromExp_Collector_Prune(t *testing.T) { } defer cleanup() - defaultCollector, err := NewCollector(log, nil, engSrc) + defaultCollector, err := NewEngineCollector(log, nil, engSrc) if err != nil { t.Fatalf("failed to create collector: %s", err.Error()) } @@ -357,12 +352,12 @@ func TestPromExp_Collector_Prune(t *testing.T) { } } - engSrc.rmSchema.mu.Lock() - for m := range engSrc.rmSchema.rankMetrics { - _, name := extractLabels(m) + engSrc.smSchema.mu.Lock() + for m := range engSrc.smSchema.sourceMetrics { + _, name := extractEngineLabels(log, m) names = append(names, name) } - engSrc.rmSchema.mu.Unlock() + engSrc.smSchema.mu.Unlock() sort.Strings(names) return @@ -373,7 +368,7 @@ func TestPromExp_Collector_Prune(t *testing.T) { for _, m := range maps { for t, m := range m { if t != telemetry.MetricTypeDirectory && t != telemetry.MetricTypeLink { - _, name := extractLabels(m.FullPath()) + _, name := extractEngineLabels(log, m.FullPath()) unique[name] = struct{}{} } } @@ -422,7 +417,7 @@ func TestPromExp_Collector_Collect(t *testing.T) { } defer cleanup() - defaultCollector, err := NewCollector(log, nil, engSrc) + defaultCollector, err := NewEngineCollector(log, nil, engSrc) if err != nil { t.Fatalf("failed to create collector: %s", err.Error()) } @@ -433,7 +428,7 @@ func TestPromExp_Collector_Collect(t *testing.T) { "engine_stats_gauge2", "engine_timer_duration", } - ignoreCollector, err := NewCollector(log, &CollectorOpts{ + ignoreCollector, err := NewEngineCollector(log, &CollectorOpts{ Ignores: ignores, }, engSrc) if err != nil { @@ -441,13 +436,10 @@ func TestPromExp_Collector_Collect(t *testing.T) { } for name, tc := range map[string]struct { - collector *Collector + collector *EngineCollector resultChan chan prometheus.Metric expMetricNames []string }{ - "nil collector": { - resultChan: make(chan prometheus.Metric), - }, "nil channel": { collector: defaultCollector, }, @@ -512,7 +504,7 @@ func TestPromExp_Collector_Collect(t *testing.T) { } } -func TestPromExp_extractLabels(t *testing.T) { +func TestPromExp_extractEngineLabels(t *testing.T) { for name, tc := range map[string]struct { input string expName string @@ -626,7 +618,10 @@ func TestPromExp_extractLabels(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - labels, name := extractLabels(tc.input) + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + labels, name := extractEngineLabels(log, tc.input) test.AssertEqual(t, name, tc.expName, "") if diff := cmp.Diff(labels, tc.expLabels); diff != "" { @@ -686,7 +681,7 @@ func TestPromExp_Collector_AddSource(t *testing.T) { log, buf := logging.NewTestLogger(t.Name()) defer test.ShowBufferOnFailure(t, buf) - collector, err := NewCollector(log, nil, tc.startSrc...) + collector, err := NewEngineCollector(log, nil, tc.startSrc...) if err != nil { t.Fatalf("failed to set up collector: %s", err) } @@ -789,7 +784,7 @@ func TestPromExp_Collector_RemoveSource(t *testing.T) { log, buf := logging.NewTestLogger(t.Name()) defer test.ShowBufferOnFailure(t, buf) - collector, err := NewCollector(log, nil, tc.startSrc...) + collector, err := NewEngineCollector(log, nil, tc.startSrc...) if err != nil { t.Fatalf("failed to set up collector: %s", err) } @@ -799,7 +794,10 @@ func TestPromExp_Collector_RemoveSource(t *testing.T) { collector.RemoveSource(tc.idx) - if diff := cmp.Diff(tc.expSrc, collector.sources, cmpopts.IgnoreUnexported(EngineSource{})); diff != "" { + cmpOpts := cmp.Options{ + cmpopts.IgnoreUnexported(MetricSource{}), + } + if diff := cmp.Diff(tc.expSrc, collector.sources, cmpOpts...); diff != "" { t.Fatalf("(-want, +got)\n%s", diff) } diff --git a/src/control/lib/telemetry/promexp/httpd.go b/src/control/lib/telemetry/promexp/httpd.go new file mode 100644 index 00000000000..cdd1955629b --- /dev/null +++ b/src/control/lib/telemetry/promexp/httpd.go @@ -0,0 +1,100 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// +//go:build linux && (amd64 || arm64) +// +build linux +// +build amd64 arm64 + +package promexp + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/daos-stack/daos/src/control/logging" +) + +type ( + // RegMonFn defines a function signature for registering a Prometheus + // monitor. + RegMonFn func(context.Context, logging.Logger) error + + // ExporterConfig defines the configuration for the Prometheus exporter. + ExporterConfig struct { + Port int + Title string + Register RegMonFn + } +) + +const ( + // EngineTelemetryPort specifies the default port for engine telemetry. + EngineTelemetryPort = 9191 + // ClientTelemetryPort specifies the default port for client telemetry. + ClientTelemetryPort = 9192 +) + +// StartExporter starts the Prometheus exporter. +func StartExporter(ctx context.Context, log logging.Logger, cfg *ExporterConfig) (func(), error) { + if cfg == nil { + return nil, errors.New("invalid exporter config: nil config") + } + + if cfg.Port <= 0 { + return nil, errors.New("invalid exporter config: bad port") + } + + if cfg.Register == nil { + return nil, errors.New("invalid exporter config: nil register function") + } + + if err := cfg.Register(ctx, log); err != nil { + return nil, errors.Wrap(err, "failed to register client monitor") + } + + listenAddress := fmt.Sprintf("0.0.0.0:%d", cfg.Port) + + srv := http.Server{Addr: listenAddress} + http.Handle("/metrics", promhttp.HandlerFor( + prometheus.DefaultGatherer, promhttp.HandlerOpts{}, + )) + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + num, err := w.Write([]byte(fmt.Sprintf(` + %s + +

%s

+

Metrics

+ + `, cfg.Title, cfg.Title))) + if err != nil { + log.Errorf("%d: %s", num, err) + } + }) + + // http listener is a blocking call + go func() { + log.Infof("Listening on %s", listenAddress) + err := srv.ListenAndServe() + log.Infof("Prometheus web exporter stopped: %s", err.Error()) + }() + + return func() { + log.Debug("Shutting down Prometheus web exporter") + + // When this cleanup function is called, the original context + // will probably have already been canceled. + timedCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + if err := srv.Shutdown(timedCtx); err != nil { + log.Noticef("HTTP server didn't shut down within timeout: %s", err.Error()) + } + }, nil +} diff --git a/src/control/lib/telemetry/promexp/httpd_test.go b/src/control/lib/telemetry/promexp/httpd_test.go new file mode 100644 index 00000000000..9c74e05c1db --- /dev/null +++ b/src/control/lib/telemetry/promexp/httpd_test.go @@ -0,0 +1,111 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// + +package promexp_test + +import ( + "context" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/pkg/errors" + + "github.com/daos-stack/daos/src/control/common/test" + "github.com/daos-stack/daos/src/control/lib/telemetry/promexp" + "github.com/daos-stack/daos/src/control/logging" +) + +func TestPromExp_StartExporter(t *testing.T) { + for name, tc := range map[string]struct { + cfg *promexp.ExporterConfig + expErr error + }{ + "nil cfg": { + expErr: errors.New("invalid exporter config"), + }, + "empty cfg invalid": { + cfg: &promexp.ExporterConfig{}, + expErr: errors.New("invalid exporter config"), + }, + "negative port": { + cfg: &promexp.ExporterConfig{ + Port: -1, + }, + expErr: errors.New("invalid exporter config"), + }, + "nil register fn": { + cfg: &promexp.ExporterConfig{ + Port: 1234, + }, + expErr: errors.New("invalid exporter config"), + }, + "register fn fails": { + cfg: &promexp.ExporterConfig{ + Port: 1234, + Register: func(context.Context, logging.Logger) error { + return errors.New("whoops") + }, + }, + expErr: errors.New("failed to register"), + }, + "success": { + cfg: &promexp.ExporterConfig{ + Port: promexp.ClientTelemetryPort, + Register: func(ctx context.Context, log logging.Logger) error { + return nil + }, + }, + }, + } { + t.Run(name, func(t *testing.T) { + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + if tc.cfg != nil { + tc.cfg.Title = t.Name() + } + cleanup, err := promexp.StartExporter(test.Context(t), log, tc.cfg) + test.CmpErr(t, tc.expErr, err) + if tc.expErr != nil { + return + } + + // Quick tests to make sure the exporter is listening and + // that our handlers are invoked. + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/", tc.cfg.Port)) + if err != nil { + t.Fatal(err) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(string(body), tc.cfg.Title) { + t.Fatalf("expected %q to contain %q", string(body), tc.cfg.Title) + } + resp.Body.Close() + + resp, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", tc.cfg.Port)) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + + cleanup() + time.Sleep(1 * time.Second) + + // Make sure the exporter is no longer listening. + _, err = http.Get(fmt.Sprintf("http://localhost:%d/", tc.cfg.Port)) + if err == nil { + t.Fatal("expected http Get to fail on closed port") + } + }) + } +} diff --git a/src/control/lib/telemetry/promexp/source.go b/src/control/lib/telemetry/promexp/source.go new file mode 100644 index 00000000000..7c85e84f4d5 --- /dev/null +++ b/src/control/lib/telemetry/promexp/source.go @@ -0,0 +1,200 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// +//go:build linux && (amd64 || arm64) +// +build linux +// +build amd64 arm64 + +package promexp + +import ( + "context" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/daos-stack/daos/src/control/lib/atm" + "github.com/daos-stack/daos/src/control/lib/telemetry" + "github.com/daos-stack/daos/src/control/logging" +) + +type ( + sourceMetricSchema struct { + mu sync.Mutex + sourceMetrics map[string]*sourceMetric + seen map[string]struct{} + addFn func(logging.Logger, telemetry.Metric) *sourceMetric + } + + MetricSource struct { + ctx context.Context + tmMutex sync.RWMutex // To protect telemetry collection + enabled atm.Bool + tmSchema *telemetry.Schema + smSchema *sourceMetricSchema + } +) + +func newSourceMetricSchema(addFn func(logging.Logger, telemetry.Metric) *sourceMetric) *sourceMetricSchema { + return &sourceMetricSchema{ + sourceMetrics: make(map[string]*sourceMetric), + seen: make(map[string]struct{}), + addFn: addFn, + } +} + +func (s *sourceMetricSchema) Prune() { + s.mu.Lock() + defer s.mu.Unlock() + + for id := range s.sourceMetrics { + if _, found := s.seen[id]; !found { + delete(s.sourceMetrics, id) + } + } + s.seen = make(map[string]struct{}) +} + +func (s *sourceMetricSchema) add(log logging.Logger, metric telemetry.Metric) (sm *sourceMetric) { + s.mu.Lock() + defer s.mu.Unlock() + + id := metric.FullPath() + s.seen[id] = struct{}{} + + var found bool + if sm, found = s.sourceMetrics[id]; !found { + sm = s.addFn(log, metric) + s.sourceMetrics[id] = sm + } else { + sm.resetVecs() + } + + return +} + +func defaultCollectorOpts() *CollectorOpts { + return &CollectorOpts{} +} + +type sourceMetric struct { + metric telemetry.Metric + baseName string + labels labelMap + gvm gvMap + cvm cvMap +} + +func (bm *sourceMetric) collect(ch chan<- prometheus.Metric) { + for _, gv := range bm.gvm { + gv.Collect(ch) + } + for _, cv := range bm.cvm { + cv.Collect(ch) + } +} + +func (bm *sourceMetric) resetVecs() { + for _, gv := range bm.gvm { + gv.Reset() + } + for _, cv := range bm.cvm { + cv.Reset() + } +} + +func newSourceMetric(log logging.Logger, m telemetry.Metric, baseName string, labels labelMap) *sourceMetric { + sm := &sourceMetric{ + metric: m, + baseName: baseName, + labels: labels, + gvm: make(gvMap), + cvm: make(cvMap), + } + + desc := m.Desc() + + switch sm.metric.Type() { + case telemetry.MetricTypeGauge, telemetry.MetricTypeTimestamp, + telemetry.MetricTypeSnapshot: + sm.gvm.add(sm.baseName, desc, sm.labels) + case telemetry.MetricTypeStatsGauge, telemetry.MetricTypeDuration: + sm.gvm.add(sm.baseName, desc, sm.labels) + for _, ms := range getMetricStats(sm.baseName, sm.metric) { + sm.gvm.add(ms.name, ms.desc, sm.labels) + } + case telemetry.MetricTypeCounter: + sm.cvm.add(sm.baseName, desc, sm.labels) + default: + log.Errorf("[%s]: metric type %d not supported", baseName, sm.metric.Type()) + } + + return sm +} + +// IsEnabled checks if the source is enabled. +func (s *MetricSource) IsEnabled() bool { + return s.enabled.IsTrue() +} + +// Enable enables the source. +func (s *MetricSource) Enable() { + s.enabled.SetTrue() +} + +// Disable disables the source. +func (s *MetricSource) Disable() { + s.enabled.SetFalse() +} + +func (s *MetricSource) Collect(log logging.Logger, ch chan<- *sourceMetric) { + if s == nil { + log.Error("nil source") + return + } + if !s.IsEnabled() { + return + } + if ch == nil { + log.Error("nil channel") + return + } + + s.tmMutex.RLock() + defer s.tmMutex.RUnlock() + + metrics := make(chan telemetry.Metric) + go func() { + if err := telemetry.CollectMetrics(s.ctx, s.tmSchema, metrics); err != nil { + log.Errorf("failed to collect metrics: %s", err) + return + } + s.tmSchema.Prune() + }() + + for metric := range metrics { + ch <- s.smSchema.add(log, metric) + } + s.smSchema.Prune() +} + +func (s *MetricSource) PruneSegments(log logging.Logger, maxSegAge time.Duration) { + if s == nil { + log.Error("nil source") + return + } + if !s.IsEnabled() { + return + } + + if err := telemetry.PruneUnusedSegments(s.ctx, maxSegAge); err != nil { + log.Errorf("failed to prune segments: %s", err) + return + } + + s.tmSchema.Prune() + s.smSchema.Prune() +} diff --git a/src/control/lib/telemetry/promexp/util.go b/src/control/lib/telemetry/promexp/util.go new file mode 100644 index 00000000000..eb9fc17631c --- /dev/null +++ b/src/control/lib/telemetry/promexp/util.go @@ -0,0 +1,152 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// +//go:build linux && (amd64 || arm64) +// +build linux +// +build amd64 arm64 + +package promexp + +import ( + "strings" + "unicode" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + + "github.com/daos-stack/daos/src/control/lib/telemetry" +) + +type labelMap map[string]string + +func (lm labelMap) keys() (keys []string) { + for label := range lm { + keys = append(keys, label) + } + + return +} + +func sanitizeMetricName(in string) string { + return strings.Map(func(r rune) rune { + switch { + // Valid names for Prometheus are limited to: + case r >= 'a' && r <= 'z': // lowercase letters + case r >= 'A' && r <= 'Z': // uppercase letters + case unicode.IsDigit(r): // digits + default: // sanitize any other character + return '_' + } + + return r + }, strings.TrimLeft(in, "/")) +} + +func matchLabel(labels labelMap, input, match, label string) bool { + if !strings.HasPrefix(input, match) { + return false + } + + splitStr := strings.SplitN(input, "_", 2) + if len(splitStr) == 2 { + labels[label] = splitStr[1] + return true + } + return false +} + +func appendName(cur, name string) string { + if cur == "" { + return name + } + return cur + "_" + name +} + +type gvMap map[string]*prometheus.GaugeVec + +func (m gvMap) add(name, help string, labels labelMap) { + if _, found := m[name]; !found { + gv := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: name, + Help: help, + }, labels.keys()) + m[name] = gv + } +} + +func (m gvMap) set(name string, value float64, labels labelMap) error { + gv, found := m[name] + if !found { + return errors.Errorf("gauge vector %s not found", name) + } + gv.With(prometheus.Labels(labels)).Set(value) + + return nil +} + +type cvMap map[string]*prometheus.CounterVec + +func (m cvMap) add(name, help string, labels labelMap) { + if _, found := m[name]; !found { + cv := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: name, + Help: help, + }, labels.keys()) + m[name] = cv + } +} + +func (m cvMap) set(name string, value float64, labels labelMap) error { + cv, found := m[name] + if !found { + return errors.Errorf("counter vector %s not found", name) + } + cv.With(prometheus.Labels(labels)).Add(value) + + return nil +} + +type metricStat struct { + name string + desc string + value float64 +} + +func getMetricStats(baseName string, m telemetry.Metric) (stats []*metricStat) { + ms, ok := m.(telemetry.StatsMetric) + if !ok { + return + } + + for name, s := range map[string]struct { + fn func() float64 + desc string + }{ + "min": { + fn: ms.FloatMin, + desc: " (min value)", + }, + "max": { + fn: ms.FloatMax, + desc: " (max value)", + }, + "mean": { + fn: ms.Mean, + desc: " (mean)", + }, + "stddev": { + fn: ms.StdDev, + desc: " (std dev)", + }, + } { + stats = append(stats, &metricStat{ + name: baseName + "_" + name, + desc: m.Desc() + s.desc, + value: s.fn(), + }) + } + + return +} diff --git a/src/control/lib/telemetry/shm.go b/src/control/lib/telemetry/shm.go new file mode 100644 index 00000000000..b4c67eec75d --- /dev/null +++ b/src/control/lib/telemetry/shm.go @@ -0,0 +1,85 @@ +// +// (C) Copyright 2024 Intel Corporation. +// +// SPDX-License-Identifier: BSD-2-Clause-Patent +// + +package telemetry + +/* +#include +#include +*/ +import "C" + +import ( + "time" + + "github.com/pkg/errors" +) + +type shmStWrp struct { + id C.int + ds C.struct_shmid_ds +} + +// Size returns the size of segment in bytes. +func (s *shmStWrp) Size() int { + return int(s.ds.shm_segsz) +} + +// Atime returns the time of last shmat(2). +func (s *shmStWrp) Atime() time.Time { + return time.Unix(int64(s.ds.shm_atime), 0) +} + +// Dtime returns the time of last shmdt(2). +func (s *shmStWrp) Dtime() time.Time { + return time.Unix(int64(s.ds.shm_dtime), 0) +} + +// Ctime returns the time of last shmctl(2) or creation time. +func (s *shmStWrp) Ctime() time.Time { + return time.Unix(int64(s.ds.shm_ctime), 0) +} + +// Cpid returns the creator pid. +func (s *shmStWrp) Cpid() int { + return int(s.ds.shm_cpid) +} + +// Lpid returns the last shmat(2)/shmdt(2) pid. +func (s *shmStWrp) Lpid() int { + return int(s.ds.shm_lpid) +} + +// Nattach returns the number of attached processes. +func (s *shmStWrp) Nattach() int { + return int(s.ds.shm_nattch) +} + +// C returns the C struct. +func (s *shmStWrp) C() *C.struct_shmid_ds { + return &s.ds +} + +func shmStat(id C.int) (*shmStWrp, error) { + st := shmStWrp{ + id: id, + } + rc, err := C.shmctl(id, C.IPC_STAT, &st.ds) + if rc != 0 { + return nil, errors.Wrapf(err, "shmctl(IPC_STAT, %d)", id) + } + + return &st, nil +} + +func shmStatKey(key C.key_t) (*shmStWrp, error) { + id, err := C.shmget(key, 0, 0) + if err != nil { + return nil, errors.Wrapf(err, "shmget(%d, 0, 0)", key) + } + + return shmStat(id) +} diff --git a/src/control/lib/telemetry/telemetry.go b/src/control/lib/telemetry/telemetry.go index 1bc64bab476..e50547544ab 100644 --- a/src/control/lib/telemetry/telemetry.go +++ b/src/control/lib/telemetry/telemetry.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2021-2022 Intel Corporation. +// (C) Copyright 2021-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -14,8 +14,16 @@ package telemetry /* #cgo LDFLAGS: -lgurt -#include "gurt/telemetry_common.h" -#include "gurt/telemetry_consumer.h" +#include +#include +#include +#include + +static int +rm_ephemeral_dir(const char *path) +{ + return d_tm_del_ephemeral_dir(path); +} */ import "C" @@ -25,12 +33,17 @@ import ( "io" "os" "path/filepath" + "sort" "strings" "sync" "time" "unsafe" "github.com/pkg/errors" + + "github.com/daos-stack/daos/src/control/common" + "github.com/daos-stack/daos/src/control/lib/daos" + "github.com/daos-stack/daos/src/control/logging" ) type MetricType int @@ -46,6 +59,11 @@ const ( MetricTypeDirectory MetricType = C.D_TM_DIRECTORY MetricTypeLink MetricType = C.D_TM_LINK + ClientJobRootID = C.DC_TM_JOB_ROOT_ID + ClientJobMax = 1024 + ClientMetricsEnabledEnv = C.DAOS_CLIENT_METRICS_ENABLE + ClientMetricsRetainEnv = C.DAOS_CLIENT_METRICS_RETAIN + BadUintVal = ^uint64(0) BadFloatVal = float64(BadUintVal) BadIntVal = int64(BadUintVal >> 1) @@ -80,7 +98,7 @@ type ( type ( handle struct { sync.RWMutex - idx uint32 + id uint32 rank *uint32 ctx *C.struct_d_tm_context root *C.struct_d_tm_node_t @@ -290,24 +308,43 @@ func collectGarbageLoop(ctx context.Context, ticker *time.Ticker) { } } +func initClientRoot(parent context.Context, shmID uint32) (context.Context, error) { + if parent == nil { + return nil, errors.New("nil parent context") + } + + shmSize := C.ulong(ClientJobMax * C.D_TM_METRIC_SIZE) + + rc := C.d_tm_init(C.int(shmID), shmSize, C.D_TM_OPEN_OR_CREATE|C.D_TM_OTHER_RW) + if rc != 0 { + return nil, errors.Errorf("failed to init client root: %s", daos.Status(rc)) + } + + return Init(parent, shmID) +} + +func InitClientRoot(ctx context.Context) (context.Context, error) { + return initClientRoot(ctx, ClientJobRootID) +} + // Init initializes the telemetry bindings -func Init(parent context.Context, idx uint32) (context.Context, error) { +func Init(parent context.Context, id uint32) (context.Context, error) { if parent == nil { return nil, errors.New("nil parent context") } - tmCtx := C.d_tm_open(C.int(idx)) + tmCtx := C.d_tm_open(C.int(id)) if tmCtx == nil { - return nil, errors.Errorf("no shared memory segment found for idx: %d", idx) + return nil, errors.Errorf("no shared memory segment found for key: %d", id) } root := C.d_tm_get_root(tmCtx) if root == nil { - return nil, errors.Errorf("no root node found in shared memory segment for idx: %d", idx) + return nil, errors.Errorf("no root node found in shared memory segment for key: %d", id) } handle := &handle{ - idx: idx, + id: id, ctx: tmCtx, root: root, } @@ -318,6 +355,11 @@ func Init(parent context.Context, idx uint32) (context.Context, error) { return newCtx, nil } +// Fini releases resources claimed by Init(). +func Fini() { + C.d_tm_fini() +} + // Detach detaches from the telemetry handle func Detach(ctx context.Context) { if hdl, err := getHandle(ctx); err == nil { @@ -408,10 +450,12 @@ func NewSchema() *Schema { } -func visit(hdl *handle, s *Schema, node *C.struct_d_tm_node_t, pathComps string, out chan<- Metric) { +type procNodeFn func(hdl *handle, id string, node *C.struct_d_tm_node_t) + +func visit(hdl *handle, node *C.struct_d_tm_node_t, pathComps string, procLinks bool, procNode procNodeFn) { var next *C.struct_d_tm_node_t - if node == nil { + if node == nil || procNode == nil { return } name := C.GoString(C.d_tm_get_name(hdl.ctx, node)) @@ -420,29 +464,30 @@ func visit(hdl *handle, s *Schema, node *C.struct_d_tm_node_t, pathComps string, id = name } - cType := node.dtn_type - switch cType { + switch node.dtn_type { case C.D_TM_DIRECTORY: next = C.d_tm_get_child(hdl.ctx, node) if next != nil { - visit(hdl, s, next, id, out) + visit(hdl, next, id, procLinks, procNode) } case C.D_TM_LINK: next = C.d_tm_follow_link(hdl.ctx, node) if next != nil { + if procLinks { + // Use next to get the linked shm key + procNode(hdl, id, next) + } + // link leads to a directory with the same name - visit(hdl, s, next, pathComps, out) + visit(hdl, next, pathComps, procLinks, procNode) } default: - m := s.Add(hdl, id, cType, node) - if m != nil { - out <- m - } + procNode(hdl, id, node) } next = C.d_tm_get_sibling(hdl.ctx, node) if next != nil && next != node { - visit(hdl, s, next, pathComps, out) + visit(hdl, next, pathComps, procLinks, procNode) } } @@ -460,8 +505,97 @@ func CollectMetrics(ctx context.Context, s *Schema, out chan<- Metric) error { return errors.New("invalid handle") } - node := hdl.root - visit(hdl, s, node, "", out) + procNode := func(hdl *handle, id string, node *C.struct_d_tm_node_t) { + m := s.Add(hdl, id, node.dtn_type, node) + if m != nil { + out <- m + } + } + + visit(hdl, hdl.root, "", false, procNode) + + return nil +} + +// PruneUnusedSegments removes shared memory segments associated with +// unused ephemeral subdirectories. +func PruneUnusedSegments(ctx context.Context, maxSegAge time.Duration) error { + log := logging.FromContext(ctx) + + hdl, err := getHandle(ctx) + if err != nil { + return err + } + hdl.Lock() + defer hdl.Unlock() + + if !hdl.isValid() { + return errors.New("invalid handle") + } + + var toPrune []string + procNode := func(hdl *handle, id string, node *C.struct_d_tm_node_t) { + if node == nil || node.dtn_type != C.D_TM_DIRECTORY { + return + } + + path := id + comps := strings.SplitN(path, string(PathSep), 2) + if strings.HasPrefix(comps[0], "ID:") && len(comps) > 1 { + path = comps[1] + } + + st, err := shmStatKey(node.dtn_shmem_key) + if err != nil { + log.Errorf("failed to shmStat(%s): %s", path, err) + return + } + + log.Tracef("path:%s shmid:%d spid:%d cpid:%d lpid:%d age:%s", + path, st.id, os.Getpid(), st.Cpid(), st.Lpid(), time.Since(st.Ctime())) + + // if the creator process is still around, don't mess with it. + if _, err := common.GetProcName(st.Cpid()); err == nil { + return + } + + if time.Since(st.Ctime()) <= maxSegAge { + return + } + + log.Tracef("adding %s to prune list", path) + toPrune = append(toPrune, path) + } + + visit(hdl, hdl.root, "", true, procNode) + + sort.Sort(sort.Reverse(sort.StringSlice(toPrune))) + for _, path := range toPrune { + log.Tracef("pruning %s", path) + if err := removeLink(hdl, path); err != nil { + log.Errorf("failed to prune %s: %s", path, err) + } + } + + return nil +} + +func removeLink(hdl *handle, path string) error { + _, err := findNode(hdl, path) + if err != nil { + return err + } + + cPath := C.CString(path) + defer C.free(unsafe.Pointer(cPath)) + rc := C.rm_ephemeral_dir(cPath) + if rc != 0 { + return errors.Wrapf(daos.Status(rc), "failed to remove link %q", path) + } + + if _, err := findNode(hdl, path); err == nil { + return errors.Errorf("failed to remove %s", path) + } return nil } diff --git a/src/control/lib/telemetry/telemetry_test.go b/src/control/lib/telemetry/telemetry_test.go index a645f0e60e4..bc63cc81399 100644 --- a/src/control/lib/telemetry/telemetry_test.go +++ b/src/control/lib/telemetry/telemetry_test.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2021-2022 Intel Corporation. +// (C) Copyright 2021-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -9,6 +9,9 @@ package telemetry import ( "context" "fmt" + "os" + "os/exec" + "strconv" "sync" "testing" "time" @@ -16,6 +19,7 @@ import ( "github.com/pkg/errors" "github.com/daos-stack/daos/src/control/common/test" + "github.com/daos-stack/daos/src/control/logging" ) func TestTelemetry_Init(t *testing.T) { @@ -50,7 +54,7 @@ func TestTelemetry_Init(t *testing.T) { t.Fatalf("can't get handle from result ctx: %v", err) } - test.AssertEqual(t, uint32(producerID), hdl.idx, "handle.idx doesn't match shmem ID") + test.AssertEqual(t, uint32(producerID), hdl.id, "handle.idx doesn't match shmem ID") hdl.RLock() defer hdl.RUnlock() @@ -179,6 +183,106 @@ func TestTelemetry_GetRank(t *testing.T) { } } +func childErrExit(err error) { + if err == nil { + err = errors.New("unknown error") + } + fmt.Fprintf(os.Stderr, "CHILD ERROR: %s\n", err) + os.Exit(1) +} + +const ( + childModeEnvVar = "TEST_CHILD_MODE" + childModeLinkTest = "CHILD_MODE_LINK_TEST" + childShmIDEnvVar = "TEST_CHILD_SHM_ID" +) + +func TestMain(m *testing.M) { + mode := os.Getenv(childModeEnvVar) + switch mode { + case "": + // default; run the test binary + os.Exit(m.Run()) + case childModeLinkTest: + runChildTelemProc() + default: + childErrExit(errors.Errorf("Unknown child mode: %q", mode)) + } +} + +func runChildTelemProc() { + pid := os.Getpid() + shmID, err := strconv.Atoi(os.Getenv(childShmIDEnvVar)) + if err != nil { + childErrExit(err) + } + + jobDir := TestMetricsMap{ + MetricTypeDirectory: &TestMetric{ + Name: "job", + }, + } + pidLink := TestMetricsMap{ + MetricTypeLink: &TestMetric{ + Name: fmt.Sprintf("job/%d", pid), + }, + } + startedAt := TestMetricsMap{ + MetricTypeTimestamp: &TestMetric{ + Name: fmt.Sprintf("job/%d/started_at", pid), + }, + } + + t := &testing.T{} + + InitTestMetricsProducer(t, shmID, 1024) + + AddTestMetrics(t, jobDir) + AddTestMetrics(t, pidLink) + AddTestMetrics(t, startedAt) + + if t.Failed() { + childErrExit(errors.New("test failed")) + } +} + +func TestTelemetry_PruneSegments(t *testing.T) { + shmID := uint32(NextTestID()) + + cmd := exec.Command(os.Args[0]) + cmd.Env = append(os.Environ(), + fmt.Sprintf("%s=%s", childModeEnvVar, childModeLinkTest), + fmt.Sprintf("%s=%d", childShmIDEnvVar, shmID), + ) + if out, err := cmd.CombinedOutput(); err != nil { + t.Errorf("child failed: %s", out) + t.Fatal(err) + } + + log, buf := logging.NewTestLogger(t.Name()) + defer test.ShowBufferOnFailure(t, buf) + + ctx, err := initClientRoot(test.MustLogContext(t, log), shmID) + if err != nil { + t.Fatal(err) + } + defer func() { + Fini() + }() + + path := fmt.Sprintf("job/%d/started_at", cmd.Process.Pid) + _, err = GetTimestamp(ctx, path) + test.CmpErr(t, nil, err) + + err = PruneUnusedSegments(ctx, time.Nanosecond) + test.CmpErr(t, nil, err) + + _, err = GetTimestamp(ctx, path) + if err == nil { + t.Fatal("expected GetTimestamp() to fail after prune") + } +} + func TestTelemetry_CollectMetrics(t *testing.T) { testMetrics := TestMetricsMap{ MetricTypeCounter: &TestMetric{ diff --git a/src/control/server/telemetry.go b/src/control/server/telemetry.go index f7f094ffe7e..4b2f624aff2 100644 --- a/src/control/server/telemetry.go +++ b/src/control/server/telemetry.go @@ -1,5 +1,5 @@ // -// (C) Copyright 2018-2022 Intel Corporation. +// (C) Copyright 2018-2024 Intel Corporation. // // SPDX-License-Identifier: BSD-2-Clause-Patent // @@ -8,13 +8,9 @@ package server import ( "context" - "fmt" - "net/http" - "time" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/daos-stack/daos/src/control/lib/ranklist" "github.com/daos-stack/daos/src/control/lib/telemetry/promexp" @@ -27,7 +23,7 @@ func regPromEngineSources(ctx context.Context, log logging.Logger, engines []Eng return nil } - c, err := promexp.NewCollector(log, &promexp.CollectorOpts{}) + c, err := promexp.NewEngineCollector(log, &promexp.CollectorOpts{}) if err != nil { return err } @@ -73,45 +69,13 @@ func regPromEngineSources(ctx context.Context, log logging.Logger, engines []Eng } func startPrometheusExporter(ctx context.Context, log logging.Logger, port int, engines []Engine) (func(), error) { - if err := regPromEngineSources(ctx, log, engines); err != nil { - return nil, err + expCfg := &promexp.ExporterConfig{ + Port: port, + Title: "DAOS Engine Telemetry", + Register: func(ctx context.Context, log logging.Logger) error { + return regPromEngineSources(ctx, log, engines) + }, } - listenAddress := fmt.Sprintf("0.0.0.0:%d", port) - - srv := http.Server{Addr: listenAddress} - http.Handle("/metrics", promhttp.HandlerFor( - prometheus.DefaultGatherer, promhttp.HandlerOpts{}, - )) - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - num, err := w.Write([]byte(` - DAOS Exporter - -

DAOS Exporter

-

Metrics

- - `)) - if err != nil { - log.Errorf("%d: %s", num, err) - } - }) - - // http listener is a blocking call - go func() { - log.Infof("Listening on %s", listenAddress) - err := srv.ListenAndServe() - log.Infof("Prometheus web exporter stopped: %s", err.Error()) - }() - - return func() { - log.Debug("Shutting down Prometheus web exporter") - - // When this cleanup function is called, the original context - // will probably have already been canceled. - timedCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - if err := srv.Shutdown(timedCtx); err != nil { - log.Noticef("HTTP server didn't shut down within timeout: %s", err.Error()) - } - }, nil + return promexp.StartExporter(ctx, log, expCfg) } diff --git a/src/gurt/telemetry.c b/src/gurt/telemetry.c index 7e776404f4c..4eafb5638db 100644 --- a/src/gurt/telemetry.c +++ b/src/gurt/telemetry.c @@ -74,10 +74,11 @@ static struct d_tm_shmem { struct d_tm_context *ctx; /** context for the producer */ struct d_tm_node_t *root; /** root node of shmem */ pthread_mutex_t add_lock; /** for synchronized access */ - uint32_t retain:1, /* retain shmem region during exit */ - sync_access:1, - retain_non_empty:1, /** retain shmem region if it is not empty */ - multiple_writer_lock:1; /** lock for multiple writer */ + uint32_t retain : 1, /** retain shmem region during exit */ + sync_access : 1, /** enable sync access to shmem */ + retain_non_empty : 1, /** retain shmem region if it is not empty */ + multiple_writer_lock : 1, /** lock for multiple writer */ + other_rw : 1; /** allow rw access to other */ int id; /** Instance ID */ } tm_shmem; @@ -216,10 +217,16 @@ static int new_shmem(key_t key, size_t size, struct d_tm_shmem_hdr **shmem) { int rc; + int flags = IPC_CREAT; + + if (tm_shmem.other_rw == 1) + flags |= 0666; + else + flags |= 0660; D_INFO("creating new shared memory segment, key=0x%x, size=%lu\n", key, size); - rc = attach_shmem(key, size, IPC_CREAT | 0660, shmem); + rc = attach_shmem(key, size, flags, shmem); if (rc < 0) D_ERROR("failed to create shared memory segment, key=0x%x: "DF_RC"\n", key, DP_RC(rc)); @@ -793,9 +800,8 @@ d_tm_init(int id, uint64_t mem_size, int flags) memset(&tm_shmem, 0, sizeof(tm_shmem)); - if ((flags & ~(D_TM_SERIALIZATION | D_TM_RETAIN_SHMEM | - D_TM_RETAIN_SHMEM_IF_NON_EMPTY | D_TM_OPEN_OR_CREATE | - D_TM_MULTIPLE_WRITER_LOCK)) != 0) { + if ((flags & ~(D_TM_SERIALIZATION | D_TM_RETAIN_SHMEM | D_TM_RETAIN_SHMEM_IF_NON_EMPTY | + D_TM_OPEN_OR_CREATE | D_TM_MULTIPLE_WRITER_LOCK | D_TM_OTHER_RW)) != 0) { D_ERROR("Invalid flags 0x%x\n", flags); rc = -DER_INVAL; goto failure; @@ -821,6 +827,11 @@ d_tm_init(int id, uint64_t mem_size, int flags) D_INFO("Require multiple write protection for id %d\n", id); } + if (flags & D_TM_OTHER_RW) { + tm_shmem.other_rw = 1; + D_INFO("Allowing read/write access to Other\n"); + } + tm_shmem.id = id; snprintf(tmp, sizeof(tmp), "ID: %d", id); key = d_tm_get_srv_key(id); @@ -2584,7 +2595,6 @@ d_tm_add_ephemeral_dir(struct d_tm_node_t **node, size_t size_bytes, new_node = d_tm_find_metric(ctx, path); if (new_node != NULL) { - D_ERROR("metric [%s] already exists\n", path); D_GOTO(fail_unlock, rc = -DER_EXIST); } @@ -2646,8 +2656,8 @@ d_tm_add_ephemeral_dir(struct d_tm_node_t **node, size_t size_bytes, if (tm_shmem.multiple_writer_lock) D_MUTEX_UNLOCK(&ctx->shmem_root->sh_multiple_writer_lock); fail: - D_ERROR("Failed to add ephemeral dir [%s]: " DF_RC "\n", path, - DP_RC(rc)); + if (rc != -DER_EXIST) + DL_ERROR(rc, "Failed to add ephemeral dir [%s]", path); return rc; } @@ -2717,7 +2727,7 @@ rm_ephemeral_dir(struct d_tm_context *ctx, struct d_tm_node_t *link) head = &shmem->sh_subregions; for (cur = conv_ptr(shmem, head->next); cur != head; cur = conv_ptr(shmem, cur->next)) { curr = d_list_entry(cur, __typeof__(*curr), rl_link); - rc = rm_ephemeral_dir(ctx, curr->rl_link_node); + rc = rm_ephemeral_dir(ctx, conv_ptr(shmem, curr->rl_link_node)); if (rc != 0) /* nothing much we can do to recover here */ D_ERROR("error removing tmp dir [%s]: "DF_RC"\n", link->dtn_name, DP_RC(rc)); diff --git a/src/include/gurt/telemetry_common.h b/src/include/gurt/telemetry_common.h index ce592326e34..81620e4a932 100644 --- a/src/include/gurt/telemetry_common.h +++ b/src/include/gurt/telemetry_common.h @@ -152,12 +152,13 @@ enum { }; enum { - D_TM_SERVER_PROCESS = 0x000, - D_TM_SERIALIZATION = 0x001, - D_TM_RETAIN_SHMEM = 0x002, - D_TM_RETAIN_SHMEM_IF_NON_EMPTY = 0x004, - D_TM_OPEN_OR_CREATE = 0x008, - D_TM_MULTIPLE_WRITER_LOCK = 0x010, + D_TM_SERVER_PROCESS = 0x000, + D_TM_SERIALIZATION = 0x001, + D_TM_RETAIN_SHMEM = 0x002, + D_TM_RETAIN_SHMEM_IF_NON_EMPTY = 0x004, + D_TM_OPEN_OR_CREATE = 0x008, + D_TM_MULTIPLE_WRITER_LOCK = 0x010, + D_TM_OTHER_RW = 0x020, }; /** Output formats */ diff --git a/src/object/obj_utils.c b/src/object/obj_utils.c index 46f065f2191..2f098fb465f 100644 --- a/src/object/obj_utils.c +++ b/src/object/obj_utils.c @@ -161,22 +161,28 @@ void * obj_metrics_alloc_internal(const char *path, int tgt_id, bool server) { struct obj_pool_metrics *metrics; + char tgt_path[32]; uint32_t opc; int rc; D_ASSERT(tgt_id >= 0); + if (server) + snprintf(tgt_path, sizeof(tgt_path), "/tgt_%u", tgt_id); + else + tgt_path[0] = '\0'; D_ALLOC_PTR(metrics); - if (metrics == NULL) + if (metrics == NULL) { + D_ERROR("failed to alloc object metrics"); return NULL; + } /** register different per-opcode counters */ for (opc = 0; opc < OBJ_PROTO_CLI_COUNT; opc++) { /** Then the total number of requests, of type counter */ rc = d_tm_add_metric(&metrics->opm_total[opc], D_TM_COUNTER, - "total number of processed object RPCs", - "ops", "%s/ops/%s/%s%u", path, - obj_opc_to_str(opc), server ? "tgt_" : "", tgt_id); + "total number of processed object RPCs", "ops", "%s/ops/%s%s", + path, obj_opc_to_str(opc), tgt_path); if (rc) D_WARN("Failed to create total counter: "DF_RC"\n", DP_RC(rc)); @@ -184,31 +190,31 @@ obj_metrics_alloc_internal(const char *path, int tgt_id, bool server) /** Total number of silently restarted updates, of type counter */ rc = d_tm_add_metric(&metrics->opm_update_restart, D_TM_COUNTER, - "total number of restarted update ops", "updates", - "%s/restarted/%s%u", path, server ? "tgt_" : "", tgt_id); + "total number of restarted update ops", "updates", "%s/restarted%s", + path, tgt_path); if (rc) D_WARN("Failed to create restarted counter: "DF_RC"\n", DP_RC(rc)); /** Total number of resent updates, of type counter */ rc = d_tm_add_metric(&metrics->opm_update_resent, D_TM_COUNTER, - "total number of resent update RPCs", "updates", - "%s/resent/%s%u", path, server ? "tgt_" : "", tgt_id); + "total number of resent update RPCs", "updates", "%s/resent%s", path, + tgt_path); if (rc) D_WARN("Failed to create resent counter: "DF_RC"\n", DP_RC(rc)); /** Total number of retry updates locally, of type counter */ rc = d_tm_add_metric(&metrics->opm_update_retry, D_TM_COUNTER, - "total number of retried update RPCs", "updates", - "%s/retry/%s%u", path, server ? "tgt_" : "", tgt_id); + "total number of retried update RPCs", "updates", "%s/retry%s", path, + tgt_path); if (rc) D_WARN("Failed to create retry cnt sensor: "DF_RC"\n", DP_RC(rc)); /** Total bytes read */ rc = d_tm_add_metric(&metrics->opm_fetch_bytes, D_TM_COUNTER, - "total number of bytes fetched/read", "bytes", - "%s/xferred/fetch/%s%u", path, server ? "tgt_" : "", tgt_id); + "total number of bytes fetched/read", "bytes", "%s/xferred/fetch%s", + path, tgt_path); if (rc) D_WARN("Failed to create bytes fetch counter: "DF_RC"\n", DP_RC(rc)); @@ -216,23 +222,23 @@ obj_metrics_alloc_internal(const char *path, int tgt_id, bool server) /** Total bytes written */ rc = d_tm_add_metric(&metrics->opm_update_bytes, D_TM_COUNTER, "total number of bytes updated/written", "bytes", - "%s/xferred/update/%s%u", path, server ? "tgt_" : "", tgt_id); + "%s/xferred/update%s", path, tgt_path); if (rc) D_WARN("Failed to create bytes update counter: "DF_RC"\n", DP_RC(rc)); /** Total number of EC full-stripe update operations, of type counter */ rc = d_tm_add_metric(&metrics->opm_update_ec_full, D_TM_COUNTER, - "total number of EC sull-stripe updates", "updates", - "%s/EC_update/full_stripe/%s%u", path, server ? "tgt_" : "", tgt_id); + "total number of EC full-stripe updates", "updates", + "%s/EC_update/full_stripe%s", path, tgt_path); if (rc) D_WARN("Failed to create EC full stripe update counter: "DF_RC"\n", DP_RC(rc)); /** Total number of EC partial update operations, of type counter */ rc = d_tm_add_metric(&metrics->opm_update_ec_partial, D_TM_COUNTER, - "total number of EC sull-partial updates", "updates", - "%s/EC_update/partial/%s%u", path, server ? "tgt_" : "", tgt_id); + "total number of EC partial updates", "updates", + "%s/EC_update/partial%s", path, tgt_path); if (rc) D_WARN("Failed to create EC partial update counter: "DF_RC"\n", DP_RC(rc)); diff --git a/src/object/srv_mod.c b/src/object/srv_mod.c index ba63a66ab62..9c5c42370e4 100644 --- a/src/object/srv_mod.c +++ b/src/object/srv_mod.c @@ -449,7 +449,7 @@ static struct dss_module_ops ds_obj_mod_ops = { static void * obj_metrics_alloc(const char *path, int tgt_id) { - return obj_metrics_alloc_internal(path, tgt_id, false); + return obj_metrics_alloc_internal(path, tgt_id, true); } struct daos_module_metrics obj_metrics = { diff --git a/src/tests/ftest/util/agent_utils_params.py b/src/tests/ftest/util/agent_utils_params.py index 46b793f31ef..0b669d95483 100644 --- a/src/tests/ftest/util/agent_utils_params.py +++ b/src/tests/ftest/util/agent_utils_params.py @@ -57,10 +57,16 @@ def __init__(self, filename, common_yaml): # Specifies the log level for agent logs. # - exclude_fabric_ifaces: , Ignore a subset of fabric interfaces when selecting # an interface for client applications. + # - telemetry_port: , e.g. 9192 + # Enable Prometheus endpoint for client telemetry. + # - telemetry_retain: , e.g. 5m + # Time to retain per-client telemetry data. self.runtime_dir = BasicParameter(None, "/var/run/daos_agent") self.log_file = LogParameter(log_dir, None, "daos_agent.log") self.control_log_mask = BasicParameter(None, "debug") self.exclude_fabric_ifaces = BasicParameter(None) + self.telemetry_port = BasicParameter(None) + self.telemetry_retain = BasicParameter(None) def update_log_file(self, name): """Update the log file name for the daos agent. diff --git a/utils/config/daos_agent.yml b/utils/config/daos_agent.yml index 3656d486268..3172bffc5d7 100644 --- a/utils/config/daos_agent.yml +++ b/utils/config/daos_agent.yml @@ -26,6 +26,20 @@ # default: 10001 #port: 10001 +## Enable HTTP endpoint for remote telemetry collection. +# Note that enabling the endpoint automatically enables +# client telemetry collection. +# +## default endpoint state: disabled +## default endpoint port: 9192 +#telemetry_port: 9192 + +## Retain client telemetry for a period of time after the client +# process exits. +# +## default 0 (do not retain telemetry after client exit) +#telemetry_retain: 1m + ## Transport Credentials Specifying certificates to secure communications # #transport_config: