Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #1663 Request/Reply Protocol Throughput and Scalability #1726

Merged
merged 1 commit into from
Dec 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions docs/man/nng_req.7.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
= nng_req(7)
//
// Copyright 2018 Staysail Systems, Inc. <[email protected]>
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This document is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -101,9 +101,26 @@ The following protocol-specific option is available.
When a new request is started, a timer of this duration is also started.
If no reply is received before this timer expires, then the request will
be resent.
(Requests are also automatically resent if the peer to whom
the original request was sent disconnects, or if a peer becomes available
while the requester is waiting for an available peer.)
+
(Requests are also automatically resent if the peer to whom
the original request was sent disconnects, or if a peer becomes available
while the requester is waiting for an available peer.)
+
Resending may be deferred up to the value of the `NNG_OPT_RESENDTICK` parameter.

((`NNG_OPT_REQ_RESENDTICK`))::

(xref:nng_duration.5.adoc[`nng_duration`])
This is the granularity of the clock that is used to check for resending.
The default is a second. Setting this to a higher rate will allow for
more timely resending to occur, but may incur significant additional
overhead when the socket has many outstanding requests (contexts).
+
When there are no requests outstanding that have a resend set, then
the clock does not tick at all.
+
This option is shared for all contexts on a socket, and is only available for the socket itself.


=== Protocol Headers

Expand Down
1 change: 1 addition & 0 deletions include/nng/protocol/reqrep0/req.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ NNG_DECL int nng_req0_open_raw(nng_socket *);
#define NNG_REQ0_PEER_NAME "rep"

#define NNG_OPT_REQ_RESENDTIME "req:resend-time"
#define NNG_OPT_REQ_RESENDTICK "req:resend-tick"

#ifdef __cplusplus
}
Expand Down
7 changes: 7 additions & 0 deletions src/compat/nanomsg/nn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,13 @@ nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz)
nn_seterror(rv);
return (-1);
}
if ((nnlevel == NN_REQ) && (nnopt == NN_REQ_RESEND_IVL)) {
// Only one context here, so it won't be too bad to tick
// as quickly as this, and it avoids some possible friction
// (e.g. with legacy tests).
(void) nng_socket_set_ms(sid, NNG_OPT_REQ_RESENDTICK, 10);
}

return (0);
}

Expand Down
167 changes: 118 additions & 49 deletions src/sp/protocol/reqrep0/req.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,49 @@ typedef struct req0_ctx req0_ctx;

static void req0_run_send_queue(req0_sock *, nni_aio_completions *);
static void req0_ctx_reset(req0_ctx *);
static void req0_ctx_timeout(void *);
static void req0_pipe_fini(void *);
static void req0_ctx_fini(void *);
static void req0_ctx_init(void *, void *);
static void req0_retry_cb(void *);

// A req0_ctx is a "context" for the request. It uses most of the
// socket, but keeps track of its own outstanding replays, the request ID,
// and so forth.
struct req0_ctx {
req0_sock *sock;
nni_list_node sock_node; // node on the socket context list
nni_list_node send_node; // node on the send_queue
nni_list_node pipe_node; // node on the pipe list
uint32_t request_id; // request ID, without high bit set
nni_aio *recv_aio; // user aio waiting to recv - only one!
nni_aio *send_aio; // user aio waiting to send
nng_msg *req_msg; // request message (owned by protocol)
size_t req_len; // length of request message (for stats)
nng_msg *rep_msg; // reply message
nni_timer_node timer;
nni_duration retry;
bool conn_reset; // sent message w/o retry, peer disconnect
req0_sock *sock;
nni_list_node sock_node; // node on the socket context list
nni_list_node send_node; // node on the send_queue
nni_list_node pipe_node; // node on the pipe list
nni_list_node retry_node; // node on the socket retry list
uint32_t request_id; // request ID, without high bit set
nni_aio *recv_aio; // user aio waiting to recv - only one!
nni_aio *send_aio; // user aio waiting to send
nng_msg *req_msg; // request message (owned by protocol)
size_t req_len; // length of request message (for stats)
nng_msg *rep_msg; // reply message
nni_duration retry;
nni_time retry_time; // retry after this expires
bool conn_reset; // sent message w/o retry, peer disconnect
};

// A req0_sock is our per-socket protocol private structure.
struct req0_sock {
nni_duration retry;
bool closed;
bool retry_active; // true if retry aio running
nni_atomic_int ttl;
req0_ctx master; // base socket master
nni_list ready_pipes;
nni_list busy_pipes;
nni_list stop_pipes;
nni_list contexts;
nni_list send_queue; // contexts waiting to send.
nni_id_map requests; // contexts by request ID
nni_list retry_queue;
nni_aio retry_aio; // retry timer
nni_id_map requests; // contexts by request ID
nni_pollable readable;
nni_pollable writable;
nni_duration retry_tick; // clock interval for retry timer
nni_mtx mtx;
};

Expand Down Expand Up @@ -95,16 +100,20 @@ req0_sock_init(void *arg, nni_sock *sock)
NNI_LIST_INIT(&s->busy_pipes, req0_pipe, node);
NNI_LIST_INIT(&s->stop_pipes, req0_pipe, node);
NNI_LIST_INIT(&s->send_queue, req0_ctx, send_node);
NNI_LIST_INIT(&s->retry_queue, req0_ctx, retry_node);
NNI_LIST_INIT(&s->contexts, req0_ctx, sock_node);

// this is "semi random" start for request IDs.
s->retry = NNI_SECOND * 60;
s->retry = NNI_SECOND * 60;
s->retry_tick = NNI_SECOND; // how often we check for retries

req0_ctx_init(&s->master, s);

nni_pollable_init(&s->writable);
nni_pollable_init(&s->readable);

nni_aio_init(&s->retry_aio, req0_retry_cb, s);

nni_atomic_init(&s->ttl);
nni_atomic_set(&s->ttl, 8);
}
Expand All @@ -130,6 +139,7 @@ req0_sock_fini(void *arg)
{
req0_sock *s = arg;

nni_aio_stop(&s->retry_aio);
nni_mtx_lock(&s->mtx);
NNI_ASSERT(nni_list_empty(&s->busy_pipes));
NNI_ASSERT(nni_list_empty(&s->stop_pipes));
Expand All @@ -140,6 +150,7 @@ req0_sock_fini(void *arg)
nni_pollable_fini(&s->readable);
nni_pollable_fini(&s->writable);
nni_id_map_fini(&s->requests);
nni_aio_fini(&s->retry_aio);
nni_mtx_fini(&s->mtx);
}

Expand Down Expand Up @@ -235,13 +246,17 @@ req0_pipe_close(void *arg)
req0_ctx_reset(ctx);
ctx->conn_reset = true;
}
} else {
// Reset the timer on this so it expires immediately.
// This is actually easier than canceling the timer and
// running the send_queue separately. (In particular,
// it avoids a potential deadlock on cancelling the
// timer.)
nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO);
} else if (ctx->req_msg != NULL) {
// Reset the retry time to make it expire immediately.
// Also move this immediately to the resend queue.
// The timer should still be firing, so we don't need
// to restart or reschedule that.
ctx->retry_time = nni_clock() + ctx->retry;

if (!nni_list_node_active(&ctx->send_node)) {
nni_list_append(&s->send_queue, ctx);
req0_run_send_queue(s, NULL);
}
}
}
nni_mtx_unlock(&s->mtx);
Expand Down Expand Up @@ -363,16 +378,41 @@ req0_recv_cb(void *arg)
}

static void
req0_ctx_timeout(void *arg)
req0_retry_cb(void *arg)
{
req0_ctx *ctx = arg;
req0_sock *s = ctx->sock;

req0_sock *s = arg;
req0_ctx *ctx;
nni_time now;
bool reschedule = false;

// The design of this is that retries are infrequent, because
// we should normally be succeeding. We also hope that we are not
// executing this linear scan of all requests too often, once
// per clock tick is all we want.
now = nni_clock();
nni_mtx_lock(&s->mtx);
if ((ctx->req_msg != NULL) && (!s->closed)) {
if (s->closed || (nni_aio_result(&s->retry_aio) != 0)) {
nni_mtx_unlock(&s->mtx);
return;
}

NNI_LIST_FOREACH (&s->retry_queue, ctx) {
if (ctx->retry_time > now || (ctx->req_msg == NULL)) {
continue;
}
if (!nni_list_node_active(&ctx->send_node)) {
nni_list_append(&s->send_queue, ctx);
}
reschedule = true;
}
if (!nni_list_empty(&s->retry_queue)) {
// if there are still jobs in the queue waiting to be
// retried, do them.
nni_sleep_aio(s->retry_tick, &s->retry_aio);
} else {
s->retry_active = false;
}
if (reschedule) {
req0_run_send_queue(s, NULL);
}
nni_mtx_unlock(&s->mtx);
Expand All @@ -384,8 +424,6 @@ req0_ctx_init(void *arg, void *sock)
req0_sock *s = sock;
req0_ctx *ctx = arg;

nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx);

nni_mtx_lock(&s->mtx);
ctx->sock = s;
ctx->recv_aio = NULL;
Expand Down Expand Up @@ -415,9 +453,6 @@ req0_ctx_fini(void *arg)
req0_ctx_reset(ctx);
nni_list_remove(&s->contexts, ctx);
nni_mtx_unlock(&s->mtx);

nni_timer_cancel(&ctx->timer);
nni_timer_fini(&ctx->timer);
}

static int
Expand Down Expand Up @@ -448,20 +483,20 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list)
return;
}

// We have a place to send it, so do the send.
// We have a place to send it, so send it.
// If a sending error occurs that causes the message to
// be dropped, we rely on the resend timer to pick it up.
// We also notify the completion callback if this is the
// first send attempt.
nni_list_remove(&s->send_queue, ctx);

// Schedule a resubmit timer. We only do this if we got
// Schedule a retry. We only do this if we got
// a pipe to send to. Otherwise, we should get handled
// the next time that the send_queue is run. We don't do this
// if the retry is "disabled" with NNG_DURATION_INFINITE.
if (ctx->retry > 0) {
nni_timer_schedule(
&ctx->timer, nni_clock() + ctx->retry);
nni_list_node_remove(&ctx->retry_node);
nni_list_append(&s->retry_queue, ctx);
}

// Put us on the pipe list of active contexts.
Expand Down Expand Up @@ -489,7 +524,7 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list)
}

// At this point, we will never give this message back to
// to the user, so we don't have to worry about making it
// the user, so we don't have to worry about making it
// unique. We can freely clone it.
nni_msg_clone(ctx->req_msg);
nni_aio_set_msg(&p->aio_send, ctx->req_msg);
Expand All @@ -503,16 +538,7 @@ req0_ctx_reset(req0_ctx *ctx)
req0_sock *s = ctx->sock;
// Call with sock lock held!

// We cannot safely "wait" using nni_timer_cancel, but this removes
// any scheduled timer activation. If the timeout is already running
// concurrently, it will still run. It should do nothing, because
// we toss the request. There is still a very narrow race if the
// timeout fires, but doesn't actually start running before we
// both finish this function, *and* manage to reschedule another
// request. The consequence of that occurring is that the request
// will be emitted on the wire twice. This is not actually tragic.
nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER);

nni_list_node_remove(&ctx->retry_node);
nni_list_node_remove(&ctx->pipe_node);
nni_list_node_remove(&ctx->send_node);
if (ctx->request_id != 0) {
Expand Down Expand Up @@ -561,7 +587,7 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv)
// entire state machine. This allows us to preserve the
// semantic of exactly one receive operation per send
// operation, and should be the least surprising for users. The
// main consequence is that if a receive operation is completed
// main consequence is that if the operation is completed
// (in error or otherwise), the user must submit a new send
// operation to restart the state machine.
req0_ctx_reset(ctx);
Expand Down Expand Up @@ -713,6 +739,15 @@ req0_ctx_send(void *arg, nni_aio *aio)
ctx->send_aio = aio;
nni_aio_set_msg(aio, NULL);

if (ctx->retry > 0) {
ctx->retry_time = nni_clock() + ctx->retry;
nni_list_append(&s->retry_queue, ctx);
if (!s->retry_active) {
s->retry_active = true;
nni_sleep_aio(s->retry_tick, &s->retry_aio);
}
}

// Stick us on the send_queue list.
nni_list_append(&s->send_queue, ctx);

Expand Down Expand Up @@ -771,6 +806,34 @@ req0_sock_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t)
return (req0_ctx_get_resend_time(&s->master, buf, szp, t));
}

static int
req0_sock_set_resend_tick(
void *arg, const void *buf, size_t sz, nni_opt_type t)
{
req0_sock *s = arg;
nng_duration tick;
int rv;

if ((rv = nni_copyin_ms(&tick, buf, sz, t)) == 0) {
nni_mtx_lock(&s->mtx);
s->retry_tick = tick;
nni_mtx_unlock(&s->mtx);
}
return (rv);
}

static int
req0_sock_get_resend_tick(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
req0_sock *s = arg;
nng_duration tick;

nni_mtx_lock(&s->mtx);
tick = s->retry_tick;
nni_mtx_unlock(&s->mtx);
return (nni_copyout_ms(tick, buf, szp, t));
}

static int
req0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
Expand Down Expand Up @@ -846,6 +909,12 @@ static nni_option req0_sock_options[] = {
.o_name = NNG_OPT_SENDFD,
.o_get = req0_sock_get_send_fd,
},
{
.o_name = NNG_OPT_REQ_RESENDTICK,
.o_get = req0_sock_get_resend_tick,
.o_set = req0_sock_set_resend_tick,
},

// terminate list
{
.o_name = NULL,
Expand Down
Loading
Loading