Skip to content

Commit

Permalink
HG: add HG_Diag_dump_counters() to dump diagnostic counters
Browse files Browse the repository at this point in the history
Add rpc_req_recv_active_count and rpc_multi_recv_copy_count counters

Add HG_Class_get_counters() to retrieve internal counters
  • Loading branch information
soumagne committed Aug 26, 2024
1 parent 6951751 commit 5770325
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 8 deletions.
25 changes: 25 additions & 0 deletions src/mercury.c
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,31 @@ HG_Set_log_stream(const char *level, FILE *stream)
}
}

/*---------------------------------------------------------------------------*/
void
HG_Diag_dump_counters(void)
{
#ifndef _WIN32
hg_log_dump_counters(&HG_LOG_OUTLET(hg_diag));
#endif
}

/*---------------------------------------------------------------------------*/
hg_return_t
HG_Class_get_counters(
const hg_class_t *hg_class, struct hg_diag_counters *diag_counters)
{
hg_return_t ret;

HG_CHECK_SUBSYS_ERROR(
cls, hg_class == NULL, error, ret, HG_INVALID_ARG, "NULL HG class");

return HG_Core_class_get_counters(hg_class->core_class, diag_counters);

error:
return ret;
}

/*---------------------------------------------------------------------------*/
hg_return_t
HG_Class_set_handle_create_callback(hg_class_t *hg_class,
Expand Down
19 changes: 19 additions & 0 deletions src/mercury.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ HG_Set_log_func(int (*log_func)(FILE *stream, const char *format, ...));
HG_PUBLIC void
HG_Set_log_stream(const char *level, FILE *stream);

/**
* Dump diagnostic counters into the existing log stream.
*/
HG_PUBLIC void
HG_Diag_dump_counters(void);

/**
* Obtain the name of the given class.
*
Expand Down Expand Up @@ -298,6 +304,19 @@ HG_Class_set_data(
static HG_INLINE void *
HG_Class_get_data(const hg_class_t *hg_class) HG_WARN_UNUSED_RESULT;

/**
* Get diagnostic counters associated to HG class.
* (Requires debug enabled build)
*
* \param hg_class [IN] pointer to HG class
* \param diag_counters [IN/OUT] pointer to counters struct
*
* \return HG_SUCCESS or corresponding HG error code
*/
HG_PUBLIC hg_return_t
HG_Class_get_counters(
const hg_class_t *hg_class, struct hg_diag_counters *diag_counters);

/**
* Set callback to be called on HG handle creation. Handles are created
* both on HG_Create() and HG_Context_create() calls. This allows upper layers
Expand Down
99 changes: 98 additions & 1 deletion src/mercury_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ struct hg_core_counters {
hg_atomic_int64_t *rpc_resp_recv_count; /* RPC responses received */
hg_atomic_int64_t *rpc_req_extra_count; /* RPC that require extra data */
hg_atomic_int64_t *rpc_resp_extra_count; /* RPC that require extra data */
hg_atomic_int64_t *bulk_count; /* Bulk count */
hg_atomic_int64_t *rpc_req_recv_active_count; /* Currently active RPCs */
hg_atomic_int64_t *rpc_multi_recv_copy_count; /* RPCs requests received that
required a copy */
hg_atomic_int64_t *bulk_count; /* Bulk count */
};

/* HG class */
Expand Down Expand Up @@ -362,6 +365,9 @@ struct hg_core_private_handle {
uint8_t cookie; /* Cookie */
bool multi_recv_copy; /* Copy on multi-recv */
bool reuse; /* Re-use handle once ref_count is 0 */
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
bool active;
#endif
};

/* HG op id */
Expand Down Expand Up @@ -434,6 +440,13 @@ hg_core_init(const char *na_info_string, bool na_listen, unsigned int version,
static hg_return_t
hg_core_finalize(struct hg_core_private_class *hg_core_class);

/**
* Get counters.
*/
static void
hg_core_class_get_counters(const struct hg_core_counters *counters,
struct hg_diag_counters *diag_counters);

/**
* Create context.
*/
Expand Down Expand Up @@ -1077,6 +1090,10 @@ hg_core_counters_init(struct hg_core_counters *hg_core_counters)
* order */
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->bulk_count, "bulk_count",
"Bulk transfers (inc. extra bulks)");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_multi_recv_copy_count,
"rpc_multi_recv_copy_count", "RPC requests received requiring a copy");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_req_recv_active_count,
"rpc_req_recv_active_count", "RPC requests received still active");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_resp_extra_count,
"rpc_resp_extra_count", "RPCs with extra bulk response");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_req_extra_count,
Expand Down Expand Up @@ -1496,6 +1513,31 @@ hg_core_finalize(struct hg_core_private_class *hg_core_class)
return ret;
}

/*---------------------------------------------------------------------------*/
static void
hg_core_class_get_counters(const struct hg_core_counters *counters,
struct hg_diag_counters *diag_counters)
{
*diag_counters = (struct hg_diag_counters){
.rpc_req_sent_count =
(uint64_t) hg_atomic_get64(counters->rpc_req_sent_count),
.rpc_req_recv_count =
(uint64_t) hg_atomic_get64(counters->rpc_req_recv_count),
.rpc_resp_sent_count =
(uint64_t) hg_atomic_get64(counters->rpc_resp_sent_count),
.rpc_resp_recv_count =
(uint64_t) hg_atomic_get64(counters->rpc_resp_recv_count),
.rpc_req_extra_count =
(uint64_t) hg_atomic_get64(counters->rpc_req_extra_count),
.rpc_resp_extra_count =
(uint64_t) hg_atomic_get64(counters->rpc_resp_extra_count),
.rpc_req_recv_active_count =
(uint64_t) hg_atomic_get64(counters->rpc_req_recv_active_count),
.rpc_multi_recv_copy_count =
(uint64_t) hg_atomic_get64(counters->rpc_multi_recv_copy_count),
.bulk_count = (uint64_t) hg_atomic_get64(counters->bulk_count)};
}

/*---------------------------------------------------------------------------*/
void
hg_core_bulk_incr(hg_core_class_t *hg_core_class)
Expand Down Expand Up @@ -3389,6 +3431,14 @@ hg_core_destroy(struct hg_core_private_handle *hg_core_handle)
return HG_SUCCESS; /* Cannot free yet */
}

#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
if (hg_core_handle->active) {
hg_atomic_decr64(HG_CORE_HANDLE_CLASS(hg_core_handle)
->counters.rpc_req_recv_active_count);
hg_core_handle->active = false;
}
#endif

/* Re-use handle if we were listening, otherwise destroy it */
if (hg_core_handle->reuse &&
!hg_atomic_get32(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->unposting)) {
Expand Down Expand Up @@ -4466,6 +4516,12 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info)
hg_thread_spin_lock(&hg_core_handle_pool->pending_list.lock);
LIST_REMOVE(hg_core_handle, pending);
hg_thread_spin_unlock(&hg_core_handle_pool->pending_list.lock);
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
/* Increment counter */
hg_atomic_incr64(HG_CORE_HANDLE_CLASS(hg_core_handle)
->counters.rpc_req_recv_active_count);
hg_core_handle->active = true;
#endif

if (callback_info->ret == NA_SUCCESS) {
/* Extend pool if all handles are being utilized */
Expand Down Expand Up @@ -4568,6 +4624,12 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
ret = hg_core_handle_pool_get(context->handle_pool, &hg_core_handle);
HG_CHECK_SUBSYS_HG_ERROR(
rpc, error, ret, "Could not get handle from pool");
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
/* Increment counter */
hg_atomic_incr64(HG_CORE_HANDLE_CLASS(hg_core_handle)
->counters.rpc_req_recv_active_count);
hg_core_handle->active = true;
#endif
hg_core_handle->multi_recv_op = multi_recv_op;
hg_atomic_incr32(&multi_recv_op->op_count);
hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_MULTI_RECV);
Expand Down Expand Up @@ -4619,6 +4681,12 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
"Copying multi-recv payload of size %zu for handle (%p)",
hg_core_handle->core_handle.in_buf_used,
(void *) hg_core_handle);
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
/* Increment counter */
hg_atomic_incr64(HG_CORE_CONTEXT_CLASS(context)
->counters.rpc_multi_recv_copy_count);
#endif

memcpy(hg_core_handle->in_buf_storage,
na_cb_info_multi_recv_unexpected->actual_buf,
hg_core_handle->core_handle.in_buf_used);
Expand Down Expand Up @@ -6059,6 +6127,35 @@ HG_Core_set_more_data_callback(struct hg_core_class *hg_core_class,
return ret;
}

/*---------------------------------------------------------------------------*/
hg_return_t
HG_Core_class_get_counters(const hg_core_class_t *hg_core_class,
struct hg_diag_counters *diag_counters)
{
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
const struct hg_core_private_class *private_class =
(const struct hg_core_private_class *) hg_core_class;
#endif
hg_return_t ret;

HG_CHECK_SUBSYS_ERROR(cls, hg_core_class == NULL, error, ret,
HG_INVALID_ARG, "NULL HG core class");
HG_CHECK_SUBSYS_ERROR(cls, diag_counters == NULL, error, ret,
HG_INVALID_ARG, "NULL pointer to diag_counters");
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
hg_core_class_get_counters(&private_class->counters, diag_counters);
#else
HG_LOG_SUBSYS_ERROR(cls, "Counters not supported in current build, please "
"build with MERCURY_ENABLE_DEBUG");
return HG_OPNOTSUPPORTED;
#endif

return HG_SUCCESS;

error:
return ret;
}

/*---------------------------------------------------------------------------*/
hg_core_context_t *
HG_Core_context_create(hg_core_class_t *hg_core_class)
Expand Down
13 changes: 13 additions & 0 deletions src/mercury_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,19 @@ static HG_INLINE void *
HG_Core_class_get_data(
const hg_core_class_t *hg_core_class) HG_WARN_UNUSED_RESULT;

/**
* Get diagnostic counters associated to HG core class.
* (Requires debug enabled build)
*
* \param hg_core_class [IN] pointer to HG core class
* \param diag_counters [IN/OUT] pointer to counters struct
*
* \return HG_SUCCESS or corresponding HG error code
*/
HG_PUBLIC hg_return_t
HG_Core_class_get_counters(const hg_core_class_t *hg_core_class,
struct hg_diag_counters *diag_counters);

/**
* Create a new context. Must be destroyed by calling HG_Core_context_destroy().
*
Expand Down
16 changes: 16 additions & 0 deletions src/mercury_core_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,22 @@ typedef enum {
*/
#define HG_CORE_SM (1 << 0)

/**
* Counters.
*/
struct hg_diag_counters {
uint64_t rpc_req_sent_count; /* RPC requests sent */
uint64_t rpc_req_recv_count; /* RPC requests received */
uint64_t rpc_resp_sent_count; /* RPC responses sent */
uint64_t rpc_resp_recv_count; /* RPC responses received */
uint64_t rpc_req_extra_count; /* RPCs that required extra data */
uint64_t rpc_resp_extra_count; /* RPCs that required extra data */
uint64_t rpc_req_recv_active_count; /* Currently active RPCs */
uint64_t rpc_multi_recv_copy_count; /* RPCs requests received that
required a copy */
uint64_t bulk_count; /* Bulk transfer count */
};

/*****************/
/* Public Macros */
/*****************/
Expand Down
6 changes: 2 additions & 4 deletions src/util/mercury_dlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,11 @@ hg_dlog_dump_counters(struct hg_dlog *d,

if (!SLIST_EMPTY(&d->cnts32) || !SLIST_EMPTY(&d->cnts64)) {
log_func(stream,
"### ----------------------\n"
"### --------------------------\n"
"### (%s) counter log summary\n"
"### ----------------------\n",
"### --------------------------\n",
(d->dlog_magic + strlen(HG_DLOG_STDMAGIC)));

log_func(stream, "# Counters\n");
SLIST_FOREACH (dc32, &d->cnts32, l) {
log_func(stream, "# %s: %" PRId32 " [%s]\n", dc32->name,
hg_atomic_get32(&dc32->c), dc32->descr);
Expand All @@ -271,7 +270,6 @@ hg_dlog_dump_counters(struct hg_dlog *d,
log_func(stream, "# %s: %" PRId64 " [%s]\n", dc64->name,
hg_atomic_get64(&dc64->c), dc64->descr);
}
log_func(stream, "# -\n");
}

hg_thread_mutex_unlock(&d->dlock);
Expand Down
18 changes: 15 additions & 3 deletions src/util/mercury_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,20 @@ hg_log_outlet_deregister(struct hg_log_outlet *hg_log_outlet)
hg_log_outlet->registered = false;
}

/*---------------------------------------------------------------------------*/
void
hg_log_dump_counters(struct hg_log_outlet *hg_log_outlet)
{
if (hg_log_outlet->debug_log &&
hg_log_outlet->level >= HG_LOG_LEVEL_MIN_DEBUG) {
FILE *stream = hg_log_streams_g[hg_log_outlet->level]
? hg_log_streams_g[hg_log_outlet->level]
: *hg_log_std_streams_g[hg_log_outlet->level];
hg_dlog_dump_counters(
hg_log_outlet->debug_log, hg_log_func_g, stream, 0);
}
}

/*---------------------------------------------------------------------------*/
void
hg_log_write(struct hg_log_outlet *hg_log_outlet, enum hg_log_level log_level,
Expand Down Expand Up @@ -567,9 +581,7 @@ hg_log_vwrite(struct hg_log_outlet *hg_log_outlet, enum hg_log_level log_level,
no_return ? "" : "\n", HG_LOG_RESET);
#else
/* Print using logging function */
hg_log_func_g(stream,
"# [%lf] %s->%s: [%s] %s%s%s:%d\n"
" # %s(): %s%s",
hg_log_func_g(stream, "# [%lf] %s->%s [%s] %s%s%s:%d %s() %s%s",
hg_time_to_double(tv), "mercury", hg_log_outlet->name, level_name,
module ? module : "", module ? ":" : "", file, line, func, buf,
no_return ? "" : "\n");
Expand Down
8 changes: 8 additions & 0 deletions src/util/mercury_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,14 @@ hg_log_outlet_register(struct hg_log_outlet *outlet);
HG_UTIL_PUBLIC void
hg_log_outlet_deregister(struct hg_log_outlet *outlet);

/**
* Dump counters associated to log outlet.
*
* \param outlet [IN] log outlet
*/
HG_UTIL_PUBLIC void
hg_log_dump_counters(struct hg_log_outlet *hg_log_outlet);

/**
* Write log.
*
Expand Down

0 comments on commit 5770325

Please sign in to comment.