Skip to content

Commit

Permalink
DAOS-16693 telemetry: Avoid race between init/read (#15306) (#15322)
Browse files Browse the repository at this point in the history
In rare cases, a reader may attempt to access a telemetry
node after it has been added to the tree, but before it
has been fully initialized. Use an atomic to prevent
reads before the initialization has completed. Unlucky
readers will get a -DER_AGAIN instead of crashing.

Signed-off-by: Michael MacDonald <[email protected]>
  • Loading branch information
mjmac authored Oct 22, 2024
1 parent b913d3e commit ffa1c9d
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 47 deletions.
20 changes: 12 additions & 8 deletions src/control/lib/telemetry/counter.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2021-2022 Intel Corporation.
// (C) Copyright 2021-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -37,18 +37,22 @@ func (c *Counter) FloatValue() float64 {
}

func (c *Counter) Value() uint64 {
ctrVal := BadUintVal
if c.handle == nil || c.node == nil {
return BadUintVal
return ctrVal
}

var val C.uint64_t

res := C.d_tm_get_counter(c.handle.ctx, &val, c.node)
if res == C.DER_SUCCESS {
return uint64(val)
fetch := func() C.int {
var val C.uint64_t
res := C.d_tm_get_counter(c.handle.ctx, &val, c.node)
if res == C.DER_SUCCESS {
ctrVal = uint64(val)
}
return res
}
c.fetchValWithRetry(fetch)

return BadUintVal
return ctrVal
}

func newCounter(hdl *handle, path string, name *string, node *C.struct_d_tm_node_t) *Counter {
Expand Down
20 changes: 12 additions & 8 deletions src/control/lib/telemetry/duration.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2021-2022 Intel Corporation.
// (C) Copyright 2021-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -34,18 +34,22 @@ func (d *Duration) Type() MetricType {
}

func (d *Duration) Value() time.Duration {
durValue := BadDuration
if d.handle == nil || d.node == nil {
return BadDuration
return durValue
}

var tms C.struct_timespec

res := C.d_tm_get_duration(d.handle.ctx, &tms, &d.stats, d.node)
if res == C.DER_SUCCESS {
return time.Duration(tms.tv_sec)*time.Second + time.Duration(tms.tv_nsec)*time.Nanosecond
fetch := func() C.int {
var tms C.struct_timespec
res := C.d_tm_get_duration(d.handle.ctx, &tms, &d.stats, d.node)
if res == C.DER_SUCCESS {
durValue = time.Duration(tms.tv_sec)*time.Second + time.Duration(tms.tv_nsec)*time.Nanosecond
}
return res
}
d.fetchValWithRetry(fetch)

return BadDuration
return durValue
}

func (d *Duration) FloatValue() float64 {
Expand Down
38 changes: 23 additions & 15 deletions src/control/lib/telemetry/gauge.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2021-2022 Intel Corporation.
// (C) Copyright 2021-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -41,18 +41,22 @@ func (g *Gauge) FloatValue() float64 {

// Value returns the value as an unsigned integer.
func (g *Gauge) Value() uint64 {
gaugeVal := BadUintVal
if g.handle == nil || g.node == nil {
return BadUintVal
return gaugeVal
}

var val C.uint64_t

res := C.d_tm_get_gauge(g.handle.ctx, &val, nil, g.node)
if res == C.DER_SUCCESS {
return uint64(val)
fetch := func() C.int {
var val C.uint64_t
res := C.d_tm_get_gauge(g.handle.ctx, &val, nil, g.node)
if res == C.DER_SUCCESS {
gaugeVal = uint64(val)
}
return res
}
g.fetchValWithRetry(fetch)

return BadUintVal
return gaugeVal
}

func newGauge(hdl *handle, path string, name *string, node *C.struct_d_tm_node_t) *Gauge {
Expand Down Expand Up @@ -103,18 +107,22 @@ func (g *StatsGauge) FloatValue() float64 {

// Value returns the gauge value as an unsigned integer.
func (g *StatsGauge) Value() uint64 {
gaugeVal := BadUintVal
if g.handle == nil || g.node == nil {
return BadUintVal
return gaugeVal
}

var val C.uint64_t

res := C.d_tm_get_gauge(g.handle.ctx, &val, &g.stats, g.node)
if res == C.DER_SUCCESS {
return uint64(val)
fetch := func() C.int {
var val C.uint64_t
res := C.d_tm_get_gauge(g.handle.ctx, &val, &g.stats, g.node)
if res == C.DER_SUCCESS {
gaugeVal = uint64(val)
}
return res
}
g.fetchValWithRetry(fetch)

return BadUintVal
return gaugeVal
}

func newStatsGauge(hdl *handle, path string, name *string, node *C.struct_d_tm_node_t) *StatsGauge {
Expand Down
20 changes: 12 additions & 8 deletions src/control/lib/telemetry/snapshot.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2021-2022 Intel Corporation.
// (C) Copyright 2021-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -34,18 +34,22 @@ func (s *Snapshot) Type() MetricType {
}

func (s *Snapshot) Value() time.Time {
timeVal := time.Time{} // zero val
if s.handle == nil || s.node == nil {
return time.Time{}
return timeVal
}

var tms C.struct_timespec

res := C.d_tm_get_timer_snapshot(s.handle.ctx, &tms, s.node)
if res == C.DER_SUCCESS {
return time.Unix(int64(tms.tv_sec), int64(tms.tv_nsec))
fetch := func() C.int {
var tms C.struct_timespec
res := C.d_tm_get_timer_snapshot(s.handle.ctx, &tms, s.node)
if res == C.DER_SUCCESS {
timeVal = time.Unix(int64(tms.tv_sec), int64(tms.tv_nsec))
}
return res
}
s.fetchValWithRetry(fetch)

return time.Time{}
return timeVal
}

func (s *Snapshot) FloatValue() float64 {
Expand Down
12 changes: 12 additions & 0 deletions src/control/lib/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ const (
BadDuration = time.Duration(BadIntVal)

PathSep = filepath.Separator

maxFetchRetries = 1
)

type (
Expand Down Expand Up @@ -304,6 +306,16 @@ func (mb *metricBase) String() string {
return strings.TrimSpace(string(buf[:bytes.Index(buf, []byte{0})]))
}

func (mb *metricBase) fetchValWithRetry(fetchFn func() C.int) C.int {
var rc C.int
for i := 0; i < maxFetchRetries; i++ {
if rc = fetchFn(); rc == C.DER_SUCCESS {
return rc
}
}
return rc
}

func (sm *statsMetric) Min() uint64 {
return uint64(sm.stats.dtm_min)
}
Expand Down
22 changes: 14 additions & 8 deletions src/control/lib/telemetry/timestamp.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// (C) Copyright 2021-2022 Intel Corporation.
// (C) Copyright 2021-2024 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
Expand Down Expand Up @@ -34,16 +34,22 @@ func (t *Timestamp) Type() MetricType {
}

func (t *Timestamp) Value() time.Time {
zero := time.Time{}
timeVal := time.Time{} // zero val
if t.handle == nil || t.node == nil {
return zero
return timeVal
}
var clk C.time_t
res := C.d_tm_get_timestamp(t.handle.ctx, &clk, t.node)
if res == C.DER_SUCCESS {
return time.Unix(int64(clk), 0)

fetch := func() C.int {
var clk C.time_t
res := C.d_tm_get_timestamp(t.handle.ctx, &clk, t.node)
if res == C.DER_SUCCESS {
timeVal = time.Unix(int64(clk), 0)
}
return res
}
return zero
t.fetchValWithRetry(fetch)

return timeVal
}

// FloatValue converts the timestamp to time in seconds since the UNIX epoch.
Expand Down
41 changes: 41 additions & 0 deletions src/gurt/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <float.h>
#include <pthread.h>
#include <malloc.h>
#include <gurt/atomic.h>
#include <gurt/common.h>
#include <gurt/list.h>
#include <sys/shm.h>
Expand Down Expand Up @@ -624,6 +625,7 @@ alloc_node(struct d_tm_shmem_hdr *shmem, struct d_tm_node_t **newnode,
goto out;
tmp->dtn_metric = NULL;
tmp->dtn_sibling = NULL;
atomic_store_relaxed(&tmp->dtn_readable, false);

*newnode = node;
out:
Expand Down Expand Up @@ -2409,6 +2411,9 @@ add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type,
pthread_mutexattr_destroy(&mattr);
temp->dtn_protect = true;
}

atomic_store_relaxed(&temp->dtn_readable, true);

if (node != NULL)
*node = temp;

Expand Down Expand Up @@ -3090,6 +3095,15 @@ d_tm_try_del_ephemeral_dir(const char *fmt, ...)
return rc;
}

static bool
node_is_readable(struct d_tm_node_t *node)
{
if (node == NULL)
return false;

return atomic_load_relaxed(&node->dtn_readable);
}

/**
* Creates histogram counters for the given node. It calculates the
* extents of each bucket and creates counters at the path specified that
Expand Down Expand Up @@ -3278,6 +3292,9 @@ d_tm_get_num_buckets(struct d_tm_context *ctx,
if (ctx == NULL || histogram == NULL || node == NULL)
return -DER_INVAL;

if (unlikely(!node_is_readable(node)))
return -DER_AGAIN;

rc = validate_node_ptr(ctx, node, &shmem);
if (rc != 0)
return rc;
Expand Down Expand Up @@ -3341,6 +3358,9 @@ d_tm_get_bucket_range(struct d_tm_context *ctx, struct d_tm_bucket_t *bucket,
if (rc != 0)
return rc;

if (unlikely(!node_is_readable(node)))
return -DER_AGAIN;

if (!has_stats(node))
return -DER_OP_NOT_PERMITTED;

Expand Down Expand Up @@ -3392,6 +3412,9 @@ d_tm_get_counter(struct d_tm_context *ctx, uint64_t *val,
if (node->dtn_type != D_TM_COUNTER)
return -DER_OP_NOT_PERMITTED;

if (unlikely(!node_is_readable(node)))
return -DER_AGAIN;

/* "ctx == NULL" is server side fast version to read the counter. */
if (ctx == NULL) {
metric_data = node->dtn_metric;
Expand Down Expand Up @@ -3441,6 +3464,9 @@ d_tm_get_timestamp(struct d_tm_context *ctx, time_t *val,
if (node->dtn_type != D_TM_TIMESTAMP)
return -DER_OP_NOT_PERMITTED;

if (unlikely(!node_is_readable(node)))
return -DER_AGAIN;

metric_data = conv_ptr(shmem, node->dtn_metric);
if (metric_data != NULL) {
d_tm_node_lock(node);
Expand Down Expand Up @@ -3470,6 +3496,9 @@ d_tm_get_meminfo(struct d_tm_context *ctx, struct d_tm_meminfo_t *meminfo,
if (node->dtn_type != D_TM_MEMINFO)
return -DER_OP_NOT_PERMITTED;

if (unlikely(!node_is_readable(node)))
return -DER_AGAIN;

metric_data = conv_ptr(shmem, node->dtn_metric);
if (metric_data != NULL) {
d_tm_node_lock(node);
Expand Down Expand Up @@ -3513,6 +3542,9 @@ d_tm_get_timer_snapshot(struct d_tm_context *ctx, struct timespec *tms,
if (!(node->dtn_type & D_TM_TIMER_SNAPSHOT))
return -DER_OP_NOT_PERMITTED;

if (unlikely(!node_is_readable(node)))
return -DER_AGAIN;

metric_data = conv_ptr(shmem, node->dtn_metric);
if (metric_data != NULL) {
d_tm_node_lock(node);
Expand Down Expand Up @@ -3563,6 +3595,9 @@ d_tm_get_duration(struct d_tm_context *ctx, struct timespec *tms,
if (!(node->dtn_type & D_TM_DURATION))
return -DER_OP_NOT_PERMITTED;

if (unlikely(!node_is_readable(node)))
return -DER_AGAIN;

metric_data = conv_ptr(shmem, node->dtn_metric);
if (metric_data == NULL)
return -DER_METRIC_NOT_FOUND;
Expand Down Expand Up @@ -3628,6 +3663,9 @@ d_tm_get_gauge(struct d_tm_context *ctx, uint64_t *val,
if (!is_gauge(node))
return -DER_OP_NOT_PERMITTED;

if (unlikely(!node_is_readable(node)))
return -DER_AGAIN;

metric_data = conv_ptr(shmem, node->dtn_metric);
if (metric_data != NULL) {
dtm_stats = conv_ptr(shmem, metric_data->dtm_stats);
Expand Down Expand Up @@ -3700,6 +3738,9 @@ int d_tm_get_metadata(struct d_tm_context *ctx, char **desc, char **units,
if (node->dtn_type == D_TM_DIRECTORY)
return -DER_OP_NOT_PERMITTED;

if (unlikely(!node_is_readable(node)))
return -DER_AGAIN;

metric_data = conv_ptr(shmem, node->dtn_metric);
if (metric_data != NULL) {
d_tm_node_lock(node);
Expand Down
3 changes: 3 additions & 0 deletions src/include/gurt/telemetry_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <gurt/common.h>

#include <stdatomic.h>

#define D_TM_VERSION 1
#define D_TM_MAX_NAME_LEN 256
#define D_TM_MAX_DESC_LEN 128
Expand Down Expand Up @@ -236,6 +238,7 @@ struct d_tm_node_t {
pthread_mutex_t dtn_lock; /** individual mutex */
struct d_tm_metric_t *dtn_metric; /** values */
bool dtn_protect; /** synchronized access */
_Atomic bool dtn_readable; /** fully initialized and ready for reads */
};

struct d_tm_nodeList_t {
Expand Down

0 comments on commit ffa1c9d

Please sign in to comment.