Skip to content

Commit

Permalink
fixes nanomsg#284 want async demo
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Mar 13, 2018
1 parent 55e98ae commit e51a971
Show file tree
Hide file tree
Showing 2 changed files with 277 additions and 0 deletions.
53 changes: 53 additions & 0 deletions demo/async/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
= async

This is a simple asynchronous demo, that demonstrates use of the RAW
option with a server, along with async message handling, to obtain a
very high level of asynchronous operation, suitable for use in a highly
concurrent server application.

== Compiling

You can override the level of concurrency with the `PARALLEL`
define. This determines how many requests the server will accept
at a time, and keep outstanding. Note that for our toy
implementation, we create this many "logical" flows of execution
(these are _NOT_ threads), where a request is followed by a reply.

The value of `PARALLEL` must be at least one, and may be as large
as your memory will permit. (The default value is 32.)

On UNIX-style systems:

[source, bash]
----
% export CPPFLAGS="-D PARALLEL=32 -I /usr/local/include"
% export LDFLAGS="-L /usr/local/lib -lnng"
% export CC="cc"
% ${CC} ${CPPFLAGS} async.c -o async ${LDFLAGS}
----

== Running

To run the server, use the arguments `__url__ -s`.

To run the client, use the arguments `__url__ __msec__`.

The _msec_ is a "delay" time that server will wait before responding.
We have these delays so simulate long running work.

In the following example, all of the clients should complete within
2 seconds. (Assuming `PARALLEL` is defined to be large enough.)

[source,bash]
----
% export URL="tcp://127.0.0.1:55995"
# start the server
% ./async $URL -s &
# start a bunch of clients
# Note that these all run concurrently!
% ./async $URL 2 &
% ./async $URL 2 &
% ./async $URL 2 &
% ./async $URL 2 &
% ./async $URL 2 &
----
224 changes: 224 additions & 0 deletions demo/async/async.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright 2018 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

// This program serves as an example for how to write an async RPC service,
// using the RAW request/reply pattern and nn_poll. The server receives
// messages and keeps them on a list, replying to them.

// Our demonstration application layer protocol is simple. The client sends
// a number of milliseconds to wait before responding. The server just gives
// back an empty reply after waiting that long.

// To run this program, start the server as async_demo <url> -s
// Then connect to it with the client as async_client <url> <msec>.
//
// For example:
//
// % ./async tcp://127.0.0.1:5555 -s &
// % ./async tcp://127.0.0.1:5555 323
// Request took 324 milliseconds.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/supplemental/util/platform.h>

// Parallel is the maximum number of outstanding requests we can handle.
// This is *NOT* the number of threads in use, but instead represents
// outstanding work items. Select a small number to reduce memory size.
// (Each one of these can be thought of as a request-reply loop.)
#ifndef PARALLEL
#define PARALLEL 32
#endif

// The server keeps a list of work items, sorted by expiration time,
// so that we can use this to set the timeout to the correct value for
// use in poll.
struct work {
enum { INIT, RECV, WAIT, SEND } state;
nng_aio * aio;
nng_socket sock;
nng_msg * msg;
};

void
fatal(const char *func, int rv)
{
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
exit(1);
}

void
server_cb(void *arg)
{
struct work *work = arg;
nng_msg * msg;
int rv;
uint32_t when;

switch (work->state) {
case INIT:
work->state = RECV;
nng_recv_aio(work->sock, work->aio);
break;
case RECV:
if ((rv = nng_aio_result(work->aio)) != 0) {
fatal("nng_recv_aio", rv);
}
msg = nng_aio_get_msg(work->aio);
if ((rv = nng_msg_trim_u32(msg, &when)) != 0) {
// bad message, just ignore it.
nng_msg_free(msg);
nng_recv_aio(work->sock, work->aio);
return;
}
work->msg = msg;
work->state = WAIT;
nng_sleep_aio(when, work->aio);
break;
case WAIT:
// We could add more data to the message here.
nng_aio_set_msg(work->aio, work->msg);
work->msg = NULL;
work->state = SEND;
nng_send_aio(work->sock, work->aio);
break;
case SEND:
if ((rv = nng_aio_result(work->aio)) != 0) {
nng_msg_free(work->msg);
fatal("nng_send_aio", rv);
}
work->state = RECV;
nng_recv_aio(work->sock, work->aio);
break;
default:
fatal("bad state!", NNG_ESTATE);
break;
}
}

struct work *
alloc_work(nng_socket sock)
{
struct work *w;
int rv;

if ((w = nng_alloc(sizeof(*w))) == NULL) {
fatal("nng_alloc", NNG_ENOMEM);
}
if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
fatal("nng_aio_alloc", rv);
}
w->state = INIT;
w->sock = sock;
return (w);
}

// The server runs forever.
int
server(const char *url)
{
nng_socket sock;
struct work *works[PARALLEL];
int rv;
int i;

/* Create the socket. */
rv = nng_rep0_open(&sock);
if (rv != 0) {
fatal("nng_rep0_open", rv);
}
if ((rv = nng_setopt_int(sock, NNG_OPT_RAW, 1)) != 0) {
fatal("nng_setopt_int", rv);
}

for (i = 0; i < PARALLEL; i++) {
works[i] = alloc_work(sock);
}

if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
fatal("nng_listen", rv);
}

for (i = 0; i < PARALLEL; i++) {
server_cb(works[i]); // this starts them going (INIT state)
}

for (;;) {
nng_msleep(3600000); // neither pause() nor sleep() portable
}
}

/* The client runs just once, and then returns. */
int
client(const char *url, const char *msecstr)
{
nng_socket sock;
int rv;
nng_msg * msg;
nng_time start;
nng_time end;
unsigned msec;

msec = atoi(msecstr) * 1000;

if ((rv = nng_req0_open(&sock)) != 0) {
fatal("nng_req0_open", rv);
}

if ((rv = nng_dial(sock, url, NULL, 0)) < 0) {
fatal("nng_dial", rv);
}

start = nng_clock();

if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
fatal("nng_msg_alloc", rv);
}
if ((rv = nng_msg_append_u32(msg, msec)) != 0) {
fatal("nng_msg_append_u32", rv);
}

if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
fatal("nng_send", rv);
}

if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
fatal("nng_recvmsg", rv);
}
end = nng_clock();
nng_msg_free(msg);
nng_close(sock);

printf("Request took %u milliseconds.\n", (uint32_t)(end - start));
return (0);
}

int
main(int argc, char **argv)
{
int rc;

if (argc < 3) {
fprintf(stderr, "Usage: %s <url> [-s|<secs>]\n", argv[0]);
exit(EXIT_FAILURE);
}
if (strcmp(argv[2], "-s") == 0) {
rc = server(argv[1]);
} else {
rc = client(argv[1], argv[2]);
}
exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}

0 comments on commit e51a971

Please sign in to comment.