Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-8331 client: Export client metrics via agent #13545

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/client/api/metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ bool daos_client_metric_retain;
int
dc_tm_init(void)
{
int metrics_tag;
pid_t pid;
int rc;
struct d_tm_node_t *started_at;
int metrics_tag;
pid_t pid;
int rc;

d_getenv_bool(DAOS_CLIENT_METRICS_ENABLE, &daos_client_metric);
if (!daos_client_metric)
Expand Down Expand Up @@ -64,10 +65,18 @@ dc_tm_init(void)
rc = d_tm_add_ephemeral_dir(NULL, MAX_IDS_SIZE(INIT_JOB_NUM), "%s/%u",
dc_jobid, pid);
if (rc != 0) {
DL_ERROR(rc, "add metric %s/%u failed.\n", dc_jobid, pid);
DL_ERROR(rc, "add metric %s/%u failed.", dc_jobid, pid);
D_GOTO(out, rc);
}

rc = d_tm_add_metric(&started_at, D_TM_TIMESTAMP, "Timestamp of client startup", NULL,
"%s/%u/%s", dc_jobid, pid, "started_at");
if (rc != 0) {
DL_ERROR(rc, "add metric %s/%u/started_at failed.", dc_jobid, pid);
D_GOTO(out, rc);
}

d_tm_record_timestamp(started_at);
out:
if (rc)
d_tm_fini();
Expand Down
1 change: 1 addition & 0 deletions src/common/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ COMMON_FILES = ['debug.c', 'mem.c', 'fail_loc.c', 'lru.c',
'cipher.c', 'cipher_isal.c', 'qat.c', 'fault_domain.c',
'tls.c', 'metrics.c']


def build_daos_common(denv, client):
""" Building non-pmem version for client's common lib"""
benv = denv.Clone()
Expand Down
13 changes: 12 additions & 1 deletion src/control/cmd/daos_agent/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -54,6 +54,13 @@ type Config struct {
DisableAutoEvict bool `yaml:"disable_auto_evict,omitempty"`
ExcludeFabricIfaces common.StringSet `yaml:"exclude_fabric_ifaces,omitempty"`
FabricInterfaces []*NUMAFabricConfig `yaml:"fabric_ifaces,omitempty"`
TelemetryPort int `yaml:"telemetry_port,omitempty"`
TelemetryRetain time.Duration `yaml:"telemetry_retain,omitempty"`
}

// TelemetryEnabled returns true if client telemetry collection/export is enabled.
func (c *Config) TelemetryEnabled() bool {
return c.TelemetryPort > 0
}

// NUMAFabricConfig defines a list of fabric interfaces that belong to a NUMA
Expand Down Expand Up @@ -88,6 +95,10 @@ func LoadConfig(cfgPath string) (*Config, error) {
return nil, fmt.Errorf("invalid system name: %s", cfg.SystemName)
}

if cfg.TelemetryRetain > 0 && cfg.TelemetryPort == 0 {
mjmac marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("telemetry_retain requires telemetry_port")
}

return cfg, nil
}

Expand Down
75 changes: 59 additions & 16 deletions src/control/cmd/daos_agent/infocache.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand All @@ -8,6 +8,7 @@ package main

import (
"context"
"fmt"
"net"
"strings"
"sync"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/daos-stack/daos/src/control/lib/control"
"github.com/daos-stack/daos/src/control/lib/hardware"
"github.com/daos-stack/daos/src/control/lib/hardware/hwprov"
"github.com/daos-stack/daos/src/control/lib/telemetry"
"github.com/daos-stack/daos/src/control/logging"
)

Expand All @@ -36,17 +38,20 @@ type fabricScanFn func(ctx context.Context, providers ...string) (*NUMAFabric, e
// NewInfoCache creates a new InfoCache with appropriate parameters set.
func NewInfoCache(ctx context.Context, log logging.Logger, client control.UnaryInvoker, cfg *Config) *InfoCache {
ic := &InfoCache{
log: log,
ignoreIfaces: cfg.ExcludeFabricIfaces,
client: client,
cache: cache.NewItemCache(log),
getAttachInfo: control.GetAttachInfo,
fabricScan: getFabricScanFn(log, cfg, hwprov.DefaultFabricScanner(log)),
netIfaces: net.Interfaces,
devClassGetter: hwprov.DefaultNetDevClassProvider(log),
devStateGetter: hwprov.DefaultNetDevStateProvider(log),
log: log,
ignoreIfaces: cfg.ExcludeFabricIfaces,
client: client,
cache: cache.NewItemCache(log),
getAttachInfoCb: control.GetAttachInfo,
fabricScan: getFabricScanFn(log, cfg, hwprov.DefaultFabricScanner(log)),
netIfaces: net.Interfaces,
devClassGetter: hwprov.DefaultNetDevClassProvider(log),
devStateGetter: hwprov.DefaultNetDevStateProvider(log),
}

ic.clientTelemetryEnabled.Store(cfg.TelemetryEnabled())
ic.clientTelemetryRetain.Store(cfg.TelemetryRetain > 0)

if cfg.DisableCache {
ic.DisableAttachInfoCache()
ic.DisableFabricCache()
Expand Down Expand Up @@ -198,12 +203,14 @@ type InfoCache struct {
cache *cache.ItemCache
fabricCacheDisabled atm.Bool
attachInfoCacheDisabled atm.Bool
clientTelemetryEnabled atm.Bool
clientTelemetryRetain atm.Bool

getAttachInfo getAttachInfoFn
fabricScan fabricScanFn
netIfaces func() ([]net.Interface, error)
devClassGetter hardware.NetDevClassProvider
devStateGetter hardware.NetDevStateProvider
getAttachInfoCb getAttachInfoFn
fabricScan fabricScanFn
netIfaces func() ([]net.Interface, error)
devClassGetter hardware.NetDevClassProvider
devStateGetter hardware.NetDevStateProvider

client control.UnaryInvoker
attachInfoRefresh time.Duration
Expand Down Expand Up @@ -292,6 +299,41 @@ func (c *InfoCache) EnableStaticFabricCache(ctx context.Context, nf *NUMAFabric)
c.EnableFabricCache()
}

func (c *InfoCache) getAttachInfo(ctx context.Context, rpcClient control.UnaryInvoker, req *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
if c == nil {
return nil, errors.New("InfoCache is nil")
}
if c.getAttachInfoCb == nil {
return nil, errors.New("getAttachInfoFn is nil")
}

resp, err := c.getAttachInfoCb(ctx, rpcClient, req)
if err != nil {
return nil, err
}
c.adjustAttachInfo(resp)
return resp, nil
}

// adjustAttachInfo performs any necessary adjustments to the attach info
// before returning it.
func (c *InfoCache) adjustAttachInfo(resp *control.GetAttachInfoResp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make other modifications to the GetAttachInfoResp struct in mgmt_rpc.go before returning to the client. Infocache has been caching only what the server sends back. I'm not exactly opposed to doing it this way, but it feels a little odd making our client-side modifications in two different layers.

if c == nil || resp == nil {
return
}

if c.clientTelemetryEnabled.IsTrue() {
resp.ClientNetHint.EnvVars = append(resp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsEnabledEnv),
)
if c.clientTelemetryRetain.IsTrue() {
resp.ClientNetHint.EnvVars = append(resp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsRetainEnv),
)
}
}
}

// GetAttachInfo fetches the attach info from the cache, and refreshes if necessary.
func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.GetAttachInfoResp, error) {
if c == nil {
Expand All @@ -308,7 +350,8 @@ func (c *InfoCache) GetAttachInfo(ctx context.Context, sys string) (*control.Get
}
createItem := func() (cache.Item, error) {
c.log.Debugf("cache miss for %s", sysAttachInfoKey(sys))
return newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo), nil
cai := newCachedAttachInfo(c.attachInfoRefresh, sys, c.client, c.getAttachInfo)
return cai, nil
}

item, release, err := c.cache.GetOrCreate(ctx, sysAttachInfoKey(sys), createItem)
Expand Down
86 changes: 68 additions & 18 deletions src/control/cmd/daos_agent/infocache_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2020-2023 Intel Corporation.
// (C) Copyright 2020-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand All @@ -8,20 +8,23 @@ package main

import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"

"github.com/daos-stack/daos/src/control/build"
"github.com/daos-stack/daos/src/control/common"
"github.com/daos-stack/daos/src/control/common/test"
"github.com/daos-stack/daos/src/control/lib/cache"
"github.com/daos-stack/daos/src/control/lib/control"
"github.com/daos-stack/daos/src/control/lib/hardware"
"github.com/daos-stack/daos/src/control/lib/telemetry"
"github.com/daos-stack/daos/src/control/logging"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
)

type testInfoCacheParams struct {
Expand All @@ -32,6 +35,8 @@ type testInfoCacheParams struct {
mockNetDevStateGetter hardware.NetDevStateProvider
disableFabricCache bool
disableAttachInfoCache bool
enableClientTelemetry bool
retainClientTelemetry bool
ctlInvoker control.Invoker
cachedItems []cache.Item
}
Expand All @@ -43,16 +48,19 @@ func newTestInfoCache(t *testing.T, log logging.Logger, params testInfoCachePara
}

ic := &InfoCache{
log: log,
getAttachInfo: params.mockGetAttachInfo,
fabricScan: params.mockScanFabric,
devClassGetter: params.mockNetDevClassGetter,
devStateGetter: params.mockNetDevStateGetter,
netIfaces: params.mockNetIfaces,
client: params.ctlInvoker,
cache: c,
log: log,
getAttachInfoCb: params.mockGetAttachInfo,
fabricScan: params.mockScanFabric,
devClassGetter: params.mockNetDevClassGetter,
devStateGetter: params.mockNetDevStateGetter,
netIfaces: params.mockNetIfaces,
client: params.ctlInvoker,
cache: c,
}

ic.clientTelemetryEnabled.Store(params.enableClientTelemetry)
ic.clientTelemetryRetain.Store(params.retainClientTelemetry)

if ic.netIfaces == nil {
ic.netIfaces = func() ([]net.Interface, error) {
return []net.Interface{
Expand Down Expand Up @@ -714,6 +722,14 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
NetDevClass: uint32(hardware.Ether),
},
}
telemEnabledResp := copyGetAttachInfoResp(ctlResp)
telemEnabledResp.ClientNetHint.EnvVars = append(telemEnabledResp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsEnabledEnv),
)
telemRetainedResp := copyGetAttachInfoResp(telemEnabledResp)
telemRetainedResp.ClientNetHint.EnvVars = append(telemRetainedResp.ClientNetHint.EnvVars,
fmt.Sprintf("%s=1", telemetry.ClientMetricsRetainEnv),
)

for name, tc := range map[string]struct {
getInfoCache func(logging.Logger) *InfoCache
Expand All @@ -734,7 +750,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
disableAttachInfoCache: true,
})
},
remoteResp: ctlResp,
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: ctlResp,
expRemote: true,
},
Expand All @@ -748,11 +764,45 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
expErr: errors.New("mock remote"),
expRemote: true,
},
"cache disabled; client telemetry enabled": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{
disableAttachInfoCache: true,
enableClientTelemetry: true,
})
},
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: telemEnabledResp,
expRemote: true,
},
"cache enabled; client telemetry enabled": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{
enableClientTelemetry: true,
})
},
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: telemEnabledResp,
expRemote: true,
expCached: true,
},
"cache enabled; client telemetry enabled; client telemetry retained": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{
enableClientTelemetry: true,
retainClientTelemetry: true,
})
},
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: telemRetainedResp,
expRemote: true,
expCached: true,
},
"enabled but empty": {
getInfoCache: func(l logging.Logger) *InfoCache {
return newTestInfoCache(t, l, testInfoCacheParams{})
},
remoteResp: ctlResp,
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: ctlResp,
expRemote: true,
expCached: true,
Expand All @@ -772,7 +822,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
fetch: func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
return nil, errors.New("shouldn't call cached remote")
},
lastResponse: ctlResp,
lastResponse: copyGetAttachInfoResp(ctlResp),
cacheItem: cacheItem{lastCached: time.Now()},
system: "test",
})
Expand All @@ -790,7 +840,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
fetch: func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
return nil, errors.New("shouldn't call cached remote")
},
lastResponse: ctlResp,
lastResponse: copyGetAttachInfoResp(ctlResp),
cacheItem: cacheItem{lastCached: time.Now()},
system: build.DefaultSystemName,
})
Expand All @@ -814,7 +864,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {
return ic
},
system: "somethingelse",
remoteResp: ctlResp,
remoteResp: copyGetAttachInfoResp(ctlResp),
expResp: ctlResp,
expCached: true,
expRemote: true,
Expand All @@ -831,7 +881,7 @@ func TestAgent_InfoCache_GetAttachInfo(t *testing.T) {

calledRemote := false
if ic != nil {
ic.getAttachInfo = func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
ic.getAttachInfoCb = func(_ context.Context, _ control.UnaryInvoker, _ *control.GetAttachInfoReq) (*control.GetAttachInfoResp, error) {
calledRemote = true
return tc.remoteResp, tc.remoteErr
}
Expand Down
Loading
Loading