Skip to content

Commit

Permalink
updates nextmode
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 29, 2023
1 parent ba2d8b8 commit 3a875d9
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 65 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 0.10.4.9017
Version: 0.10.4.9018
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
5 changes: 4 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# nanonext 0.10.4.9017 (development)
# nanonext 0.10.4.9018

*This is a major stability release bundling the 'libnng' v1.6.0 source code.*

#### New Features

Expand All @@ -12,6 +14,7 @@
* `nextmode()` improvements:
+ simplified function signature to take a 'refhook' argument comprising a list of serialization / unserialization functions.
+ registered 'refhook' functions apply to external pointer type objects only.
+ format changes make use with earlier package versions incompatible.
* `reap()` updated to always return either a zero on success or else an integer 'errorValue'.
* `pipe_notify()` arguments 'add', 'remove' and 'flag' now default to FALSE instead of TRUE for easier selective specification of the events to signal.
* Fixes regression in release 0.10.4 that caused a potential segfault using `ncurl()` with 'follow' set to TRUE when the server returns a missing or invalid relocation address.
Expand Down
30 changes: 15 additions & 15 deletions R/docs.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
#'
#' @section Bus (mesh networks):
#'
#' [protocol, bus] The bus protocol is useful for routing applications or for
#' building mesh networks where every peer is connected to every other peer.
#' In this protocol, each message sent by a node is sent to every one of its
#' directly connected peers. This socket may be used to send and receive
#' messages. Sending messages will attempt to deliver to each directly
#' connected peer.
#' [protocol, bus] The bus protocol is useful for routing applications or
#' for building mesh networks where every peer is connected to every other
#' peer. In this protocol, each message sent by a node is sent to every one
#' of its directly connected peers. This socket may be used to send and
#' receive messages. Sending messages will attempt to deliver to each
#' directly connected peer.
#'
#' Messages are only sent to directly connected peers. This means that in
#' the event that a peer is connected indirectly, it will not receive
Expand Down Expand Up @@ -58,10 +58,10 @@
#' In the pipeline pattern, pushers distribute messages to pullers, hence
#' useful for solving producer/consumer problems.
#'
#' If multiple peers are connected, the pattern attempts to distribute fairly.
#' Each message sent by a pusher will be sent to one of its peer pullers,
#' chosen in a round-robin fashion. This property makes this pattern useful
#' in load-balancing scenarios.
#' If multiple peers are connected, the pattern attempts to distribute
#' fairly. Each message sent by a pusher will be sent to one of its peer
#' pullers, chosen in a round-robin fashion. This property makes this
#' pattern useful in load-balancing scenarios.
#'
#' [protocol, push] The push protocol is one half of a pipeline pattern. The
#' other side is the pull protocol.
Expand Down Expand Up @@ -230,11 +230,11 @@ NULL
#' (::1) would be specified as tcp://[::1]:80.
#'
#' \item The special value of 0 (INADDR_ANY) can be used for a listener to
#' indicate that it should listen on all interfaces on the host. A short-hand
#' for this form is to either omit the address, or specify the asterisk (*)
#' character. For example, the following three URIs are all equivalent, and
#' could be used to listen to port 9999 on the host: (1) tcp://0.0.0.0:9999
#' (2) tcp://*:9999 (3) tcp://:9999
#' indicate that it should listen on all interfaces on the host. A
#' shorthand for this form is to either omit the address, or specify the
#' asterisk (*) character. For example, the following three URIs are all
#' equivalent, and could be used to listen to port 9999 on the host:
#' (1) tcp://0.0.0.0:9999 (2) tcp://*:9999 (3) tcp://:9999
#' }
#'
#' @section TLS:
Expand Down
20 changes: 10 additions & 10 deletions man/protocols.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions man/transports.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ static void saio_complete(void *arg) {
if (res)
nng_msg_free(nng_aio_get_msg(saio->aio));

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
nng_mtx_lock(shr_mtx);
saio->result = res - !res;
nng_mtx_unlock(shr_mtx);
Expand All @@ -192,7 +192,7 @@ static void isaio_complete(void *arg) {
if (iaio->data != NULL)
R_Free(iaio->data);

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
nng_mtx_lock(shr_mtx);
iaio->result = res - !res;
nng_mtx_unlock(shr_mtx);
Expand All @@ -209,7 +209,7 @@ static void raio_complete(void *arg) {
if (res == 0)
raio->data = nng_aio_get_msg(raio->aio);

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
nng_mtx_lock(shr_mtx);
raio->result = res - !res;
nng_mtx_unlock(shr_mtx);
Expand Down Expand Up @@ -243,7 +243,7 @@ static void iraio_complete(void *arg) {
nano_aio *iaio = (nano_aio *) arg;
const int res = nng_aio_result(iaio->aio);

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
nng_mtx_lock(shr_mtx);
iaio->result = res - !res;
nng_mtx_unlock(shr_mtx);
Expand Down Expand Up @@ -346,7 +346,7 @@ static void reqsaio_finalizer(SEXP xptr) {
if (R_ExternalPtrAddr(xptr) == NULL)
return;
nano_aio *xp = (nano_aio *) R_ExternalPtrAddr(xptr);
#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
nng_ctx *ctx = (nng_ctx *) xp->data;
nng_ctx_close(*ctx);
#endif
Expand Down Expand Up @@ -647,7 +647,7 @@ SEXP rnng_aio_result(SEXP env) {

nano_aio *saio = (nano_aio *) R_ExternalPtrAddr(aio);

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
int res;
nng_mtx_lock(shr_mtx);
res = saio->result;
Expand Down Expand Up @@ -679,7 +679,7 @@ SEXP rnng_aio_get_msg(SEXP env) {

nano_aio *raio = (nano_aio *) R_ExternalPtrAddr(aio);

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
int res;
nng_mtx_lock(shr_mtx);
res = raio->result;
Expand Down Expand Up @@ -839,7 +839,7 @@ SEXP rnng_unresolved2(SEXP aio) {

nano_aio *aiop = (nano_aio *) R_ExternalPtrAddr(coreaio);

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
int res;
nng_mtx_lock(shr_mtx);
res = aiop->result;
Expand Down Expand Up @@ -1238,7 +1238,7 @@ SEXP rnng_aio_http(SEXP env, SEXP response, SEXP type) {

nano_aio *haio = (nano_aio *) R_ExternalPtrAddr(aio);

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
int res;
nng_mtx_lock(shr_mtx);
res = haio->result;
Expand Down Expand Up @@ -1581,7 +1581,7 @@ SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeou
}

nano_aio *saio = R_Calloc(1, nano_aio);
#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
saio->data = ctx;
#endif

Expand Down Expand Up @@ -1993,7 +1993,7 @@ SEXP rnng_cv_request(SEXP con, SEXP data, SEXP cvar, SEXP sendmode, SEXP recvmod
}

nano_aio *saio = R_Calloc(1, nano_aio);
#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
saio->data = ctx;
#endif

Expand Down
36 changes: 19 additions & 17 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ void nano_serialize_next(nano_buf *buf, SEXP object) {

NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE);
buf->buf[0] = 7u;
buf->buf[2] = special_bit;
buf->cur += 8;
buf->buf[1] = special_bit;
buf->cur += 12;

struct R_outpstream_st output_stream;

Expand All @@ -222,7 +222,7 @@ void nano_serialize_next(nano_buf *buf, SEXP object) {
R_Serialize(object, &output_stream);

if (nano_refList != R_NilValue) {
*((uint32_t *) (buf->buf + 4)) = (uint32_t) buf->cur;
*((uint64_t *) (buf->buf + 4)) = (uint64_t) buf->cur;
SEXP call, out;
PROTECT(call = Rf_lcons(nano_refHookIn, Rf_cons(nano_refList, R_NilValue)));
PROTECT(out = Rf_eval(call, R_GlobalEnv));
Expand Down Expand Up @@ -278,19 +278,21 @@ SEXP nano_unserialize(unsigned char *buf, const size_t sz) {
offset = 0;
cur = 0;
break;
case 7: ;
SEXP raw, call;
offset = *(uint32_t *) (buf + 4);
if (offset) {
PROTECT(raw = Rf_allocVector(RAWSXP, sz - offset));
memcpy(STDVEC_DATAPTR(raw), buf + offset, sz - offset);
PROTECT(call = Rf_lcons(nano_refHookOut, Rf_cons(raw, R_NilValue)));
PROTECT(reflist = Rf_eval(call, R_GlobalEnv));
if (TYPEOF(reflist) != VECSXP)
Rf_error("unserialization refhook did not return a list");
case 7:
if (sz > 12 && (buf[12] == 66 || buf[12] == 88)) {
offset = *(uint64_t *) (buf + 4);
if (offset) {
SEXP raw, call;
PROTECT(raw = Rf_allocVector(RAWSXP, sz - offset));
memcpy(STDVEC_DATAPTR(raw), buf + offset, sz - offset);
PROTECT(call = Rf_lcons(nano_refHookOut, Rf_cons(raw, R_NilValue)));
PROTECT(reflist = Rf_eval(call, R_GlobalEnv));
if (TYPEOF(reflist) != VECSXP)
Rf_error("unserialization refhook did not return a list");
}
cur = 12;
break;
}
cur = 8;
break;
default:
Rf_warning("received data could not be unserialized");
return nano_decode(buf, sz, 8);
Expand Down Expand Up @@ -696,7 +698,7 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) {

nng_msg *msgp;

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG

const nng_duration dur = block == R_NilValue ? NNG_DURATION_DEFAULT :
TYPEOF(block) == LGLSXP ? (LOGICAL(block)[0] == 1) * NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(block);
Expand Down Expand Up @@ -877,7 +879,7 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
const int mod = nano_matcharg(mode);
nng_msg *msgp;

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG

const nng_duration dur = block == R_NilValue ? NNG_DURATION_DEFAULT :
TYPEOF(block) == LGLSXP ? (LOGICAL(block)[0] == 1) * NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(block);
Expand Down
6 changes: 3 additions & 3 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ SEXP nano_sendAio;
SEXP nano_success;
SEXP nano_unresolved;

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
nng_mtx *shr_mtx;
#endif

Expand Down Expand Up @@ -221,7 +221,7 @@ static const R_ExternalMethodDef externalMethods[] = {
void attribute_visible R_init_nanonext(DllInfo* dll) {
RegisterSymbols();
PreserveObjects();
#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
nng_mtx_alloc(&shr_mtx);
#endif
R_registerRoutines(dll, NULL, callMethods, NULL, externalMethods);
Expand All @@ -231,7 +231,7 @@ void attribute_visible R_init_nanonext(DllInfo* dll) {

void attribute_visible R_unload_nanonext(DllInfo *info) {
ReleaseObjects();
#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
nng_mtx_free(shr_mtx);
#endif
}
8 changes: 6 additions & 2 deletions src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

#include <nng/nng.h>

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#define NANONEXT_LEGACY_NNG
#endif

#ifdef NANONEXT_PROTOCOLS
#include <nng/protocol/bus0/bus.h>
#include <nng/protocol/pair0/pair.h>
Expand All @@ -38,7 +42,7 @@
#include <nng/supplemental/http/http.h>
#include <nng/supplemental/tls/tls.h>
#include <nng/supplemental/util/platform.h>
#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6
#ifdef NANONEXT_LEGACY_NNG
extern nng_mtx *shr_mtx;
#endif

Expand Down Expand Up @@ -117,7 +121,7 @@ typedef struct nano_cv_s {

#ifdef NANONEXT_KEYCERT
#include <mbedtls/version.h>
#if MBEDTLS_VERSION_MAJOR == 2
#if MBEDTLS_VERSION_MAJOR < 3
#include <mbedtls/config.h>
#endif
#include <mbedtls/platform.h>
Expand Down

0 comments on commit 3a875d9

Please sign in to comment.