From 4bb4622643855f09995ebb3a48d6e135a22cdd8a Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Mon, 1 Apr 2024 19:01:30 -0700 Subject: [PATCH 01/10] Add later dependency --- DESCRIPTION | 4 ++++ NAMESPACE | 1 + R/nanonext-package.R | 1 + src/later_shim.cpp | 5 +++++ src/later_shim.h | 7 +++++++ 5 files changed, 18 insertions(+) create mode 100644 src/later_shim.cpp create mode 100644 src/later_shim.h diff --git a/DESCRIPTION b/DESCRIPTION index fc3f85077..8ff2069de 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -30,6 +30,10 @@ SystemRequirements: 'libnng' >= 1.5 and 'libmbedtls' >= 2.5, or 'cmake' to compile NNG and/or Mbed TLS included in package sources Depends: R (>= 3.5) +Imports: + later +LinkingTo: + later Suggests: knitr, markdown diff --git a/NAMESPACE b/NAMESPACE index a9f83b67f..d630e2792 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -96,6 +96,7 @@ export(until_) export(wait) export(wait_) export(write_cert) +importFrom(later,later) importFrom(stats,start) importFrom(tools,md5sum) importFrom(utils,.DollarNames) diff --git a/R/nanonext-package.R b/R/nanonext-package.R index fff6f714a..d39594d39 100644 --- a/R/nanonext-package.R +++ b/R/nanonext-package.R @@ -95,6 +95,7 @@ #' @importFrom stats start #' @importFrom tools md5sum #' @importFrom utils .DollarNames +#' @importFrom later later #' @useDynLib nanonext, .registration = TRUE #' "_PACKAGE" diff --git a/src/later_shim.cpp b/src/later_shim.cpp new file mode 100644 index 000000000..62e90e173 --- /dev/null +++ b/src/later_shim.cpp @@ -0,0 +1,5 @@ +#include + +extern "C" void later2(void (*func)(void*), void* data, double secs) { + later::later(func, data, secs); +} diff --git a/src/later_shim.h b/src/later_shim.h new file mode 100644 index 000000000..7a9345b47 --- /dev/null +++ b/src/later_shim.h @@ -0,0 +1,7 @@ +#ifndef LATER_SHIM_H +#define LATER_SHIM_H + +// This is simply a shim so that later::later can be accessed from C, not C++ +void later2(void (*func)(void*), void* data, double secs); + +#endif From ce12e73621c2d0c08015e805b25d247cd451f6d6 Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 2 Apr 2024 11:00:12 -0700 Subject: [PATCH 02/10] Implement promises for recv_aio[_signal] --- NAMESPACE | 2 ++ R/aio.R | 62 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/aio.c | 34 +++++++++++++++++++++++---- src/nanonext.h | 5 ++-- 4 files changed, 94 insertions(+), 9 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index d630e2792..385c06dcc 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -33,6 +33,8 @@ S3method(print,recvAio) S3method(print,sendAio) S3method(print,tlsConfig) S3method(print,unresolvedValue) +S3method(promises::as.promise,recvAio) +S3method(promises::is.promising,recvAio) S3method(start,nanoDialer) S3method(start,nanoListener) export("%~>%") diff --git a/R/aio.R b/R/aio.R index b0a610412..6a8119f53 100644 --- a/R/aio.R +++ b/R/aio.R @@ -118,7 +118,14 @@ recv_aio <- function(con, "integer", "logical", "numeric", "raw", "string"), timeout = NULL, n = 65536L) - data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment()) + data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment(), + function() { + cb <- data$callback + if (!is.null(cb)) { + cb(data) + } + } + ) #' Receive Async and Signal a Condition #' @@ -157,7 +164,58 @@ recv_aio_signal <- function(con, "integer", "logical", "numeric", "raw", "string"), timeout = NULL, n = 65536L) - data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment()) + data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment(), + function() { + cb <- data$callback + if (!is.null(cb)) { + cb(data) + } + } + ) + +#' @exportS3Method promises::is.promising +is.promising.recvAio <- function(x) { + TRUE +} + +#' @exportS3Method promises::as.promise +as.promise.recvAio <- function(x) { + prom <- x$promise + + if (is.null(prom)) { + prom <- promises::promise(function(resolve, reject) { + assign("callback", function(...) { + + # WARNING: x$data is heavily side-effecty! + value <- x$data + + if (is_error_value(value)) { + reject(simpleError(nng_error(value))) + } else { + resolve(value) + } + }, x) + }) + + # WARNING: x$data is heavily side-effecty! + value <- x$data + + if (!inherits(value, "unresolvedValue")) { + if (is_error_value(value)) { + prom <- promises::promise_reject(simpleError(nng_error(value))) + } else { + prom <- promises::promise_resolve(value) + } + } + + # Save for next time. This is not just an optimization but essential for + # correct behavior if as.promise is called multiple times, because only one + # `callback` can exist on the recvAio object at a time. + assign("promise", prom, x) + } + + prom +} # Core aio functions ----------------------------------------------------------- diff --git a/src/aio.c b/src/aio.c index 97606f517..eb80a7823 100644 --- a/src/aio.c +++ b/src/aio.c @@ -20,6 +20,7 @@ #define NANONEXT_SUPPLEMENTALS #define NANONEXT_SIGNALS #include "nanonext.h" +#include "later_shim.h" // internals ------------------------------------------------------------------- @@ -195,6 +196,21 @@ static void isaio_complete(void *arg) { } + +static void raio_invoke_cb(void* arg) { + nano_aio *raio = (nano_aio *) arg; + if (raio->cb == NULL || Rf_isNull(raio->cb)) return; + SEXP func = (SEXP)raio->cb; + SEXP callExpr, result; + if (!Rf_isNull(func)) { + PROTECT(callExpr = Rf_lcons(func, R_NilValue)); // Prepare call + PROTECT(result = Rf_eval(callExpr, R_GlobalEnv)); // Execute call + + UNPROTECT(2); + R_ReleaseObject(func); + } +} + static void raio_complete(void *arg) { nano_aio *raio = (nano_aio *) arg; @@ -210,6 +226,7 @@ static void raio_complete(void *arg) { raio->result = res - !res; #endif + later2(raio_invoke_cb, arg, 0); } static void raio_complete_signal(void *arg) { @@ -229,6 +246,7 @@ static void raio_complete_signal(void *arg) { nng_cv_wake(cv); nng_mtx_unlock(mtx); + later2(raio_invoke_cb, arg, 0); } static void request_complete_signal(void *arg) { @@ -709,7 +727,7 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) { } SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, - const SEXP bytes, const SEXP clo, nano_cv *ncv) { + const SEXP bytes, const SEXP clo, const SEXP cb, nano_cv *ncv) { const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout); const int signal = ncv != NULL; @@ -725,6 +743,12 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, raio->next = ncv; raio->type = RECVAIO; raio->mode = mod; + if (Rf_isNull(cb)) { + raio->cb = NULL; + } else { + R_PreserveObject(cb); + raio->cb = (void*)cb; + } if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : raio_complete, raio))) goto exitlevel1; @@ -791,19 +815,19 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, } -SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) { +SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) { - return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, NULL); + return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, NULL); } -SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) { +SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) { if (R_ExternalPtrTag(cvar) != nano_CvSymbol) Rf_error("'cv' is not a valid Condition Variable"); nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar); - return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, ncv); + return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, ncv); } diff --git a/src/nanonext.h b/src/nanonext.h index 74c43e633..24a868975 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -88,6 +88,7 @@ typedef struct nano_aio_s { int result; void *data; void *next; + void *cb; } nano_aio; typedef struct nano_cv_s { @@ -250,8 +251,8 @@ SEXP rnng_protocol_open(SEXP, SEXP); SEXP rnng_random(SEXP, SEXP); SEXP rnng_reap(SEXP); SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP); -SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP); -SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_send(SEXP, SEXP, SEXP, SEXP); From 7e3284c1a48ed18555604f97fead1f5e58984b8e Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Thu, 11 Apr 2024 15:37:09 +0100 Subject: [PATCH 03/10] simplifications --- DESCRIPTION | 3 ++- R/aio.R | 26 ++++++++++++++------------ src/aio.c | 24 ++++++------------------ src/init.c | 4 ++-- 4 files changed, 24 insertions(+), 33 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 8ff2069de..1a6380379 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -36,6 +36,7 @@ LinkingTo: later Suggests: knitr, - markdown + markdown, + promises VignetteBuilder: knitr RoxygenNote: 7.3.1 diff --git a/R/aio.R b/R/aio.R index 6a8119f53..c1db76198 100644 --- a/R/aio.R +++ b/R/aio.R @@ -120,8 +120,8 @@ recv_aio <- function(con, n = 65536L) data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment(), function() { - cb <- data$callback - if (!is.null(cb)) { + cb <- .subset2(data, "callback") + if (is.function(cb)) { cb(data) } } @@ -166,8 +166,8 @@ recv_aio_signal <- function(con, n = 65536L) data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment(), function() { - cb <- data$callback - if (!is.null(cb)) { + cb <- .subset2(data, "callback") + if (is.function(cb)) { cb(data) } } @@ -180,14 +180,15 @@ is.promising.recvAio <- function(x) { #' @exportS3Method promises::as.promise as.promise.recvAio <- function(x) { - prom <- x$promise - if (is.null(prom)) { - prom <- promises::promise(function(resolve, reject) { + promise <- .subset2(x, "promise") + + if (is.null(promise)) { + promise <- promises::promise(function(resolve, reject) { assign("callback", function(...) { # WARNING: x$data is heavily side-effecty! - value <- x$data + value <- .subset2(x, "data") if (is_error_value(value)) { reject(simpleError(nng_error(value))) @@ -202,19 +203,20 @@ as.promise.recvAio <- function(x) { if (!inherits(value, "unresolvedValue")) { if (is_error_value(value)) { - prom <- promises::promise_reject(simpleError(nng_error(value))) + promise <- promises::promise_reject(simpleError(nng_error(value))) } else { - prom <- promises::promise_resolve(value) + promise <- promises::promise_resolve(value) } } # Save for next time. This is not just an optimization but essential for # correct behavior if as.promise is called multiple times, because only one # `callback` can exist on the recvAio object at a time. - assign("promise", prom, x) + assign("promise", promise, x) } - prom + promise + } # Core aio functions ----------------------------------------------------------- diff --git a/src/aio.c b/src/aio.c index eb80a7823..6f332dbe7 100644 --- a/src/aio.c +++ b/src/aio.c @@ -196,19 +196,12 @@ static void isaio_complete(void *arg) { } - -static void raio_invoke_cb(void* arg) { +static void raio_invoke_cb(void *arg) { nano_aio *raio = (nano_aio *) arg; - if (raio->cb == NULL || Rf_isNull(raio->cb)) return; - SEXP func = (SEXP)raio->cb; - SEXP callExpr, result; - if (!Rf_isNull(func)) { - PROTECT(callExpr = Rf_lcons(func, R_NilValue)); // Prepare call - PROTECT(result = Rf_eval(callExpr, R_GlobalEnv)); // Execute call - - UNPROTECT(2); - R_ReleaseObject(func); - } + SEXP callExpr; + PROTECT(callExpr = Rf_lcons((SEXP) raio->cb, R_NilValue)); + (void) Rf_eval(callExpr, R_GlobalEnv); + UNPROTECT(1); } static void raio_complete(void *arg) { @@ -743,12 +736,7 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, raio->next = ncv; raio->type = RECVAIO; raio->mode = mod; - if (Rf_isNull(cb)) { - raio->cb = NULL; - } else { - R_PreserveObject(cb); - raio->cb = (void*)cb; - } + raio->cb = cb; if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : raio_complete, raio))) goto exitlevel1; diff --git a/src/init.c b/src/init.c index 9bf9645f0..165b6ca6a 100644 --- a/src/init.c +++ b/src/init.c @@ -155,8 +155,8 @@ static const R_CallMethodDef callMethods[] = { {"rnng_random", (DL_FUNC) &rnng_random, 2}, {"rnng_reap", (DL_FUNC) &rnng_reap, 1}, {"rnng_recv", (DL_FUNC) &rnng_recv, 4}, - {"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 5}, - {"rnng_recv_aio_signal", (DL_FUNC) &rnng_recv_aio_signal, 6}, + {"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6}, + {"rnng_recv_aio_signal", (DL_FUNC) &rnng_recv_aio_signal, 7}, {"rnng_request", (DL_FUNC) &rnng_request, 6}, {"rnng_request_signal", (DL_FUNC) &rnng_request_signal, 7}, {"rnng_send", (DL_FUNC) &rnng_send, 4}, From 2ccc4d4df70bea72043b3a3cfcc9dffa94d6d4dc Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 17 Apr 2024 14:27:46 +0100 Subject: [PATCH 04/10] implements request_promise() --- DESCRIPTION | 3 +- NAMESPACE | 3 +- R/aio.R | 64 +-------------------------- R/context.R | 16 +++++++ R/nanonext-package.R | 2 +- man/request.Rd | 11 +++++ src/aio.c | 100 ++++++++++++++++++++++++++++++++++--------- src/init.c | 5 ++- src/nanonext.h | 5 ++- 9 files changed, 117 insertions(+), 92 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 1a6380379..8ff2069de 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -36,7 +36,6 @@ LinkingTo: later Suggests: knitr, - markdown, - promises + markdown VignetteBuilder: knitr RoxygenNote: 7.3.1 diff --git a/NAMESPACE b/NAMESPACE index 385c06dcc..add71dece 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -33,8 +33,6 @@ S3method(print,recvAio) S3method(print,sendAio) S3method(print,tlsConfig) S3method(print,unresolvedValue) -S3method(promises::as.promise,recvAio) -S3method(promises::is.promising,recvAio) S3method(start,nanoDialer) S3method(start,nanoListener) export("%~>%") @@ -77,6 +75,7 @@ export(recv_aio) export(recv_aio_signal) export(reply) export(request) +export(request_promise) export(request_signal) export(send) export(send_aio) diff --git a/R/aio.R b/R/aio.R index c1db76198..b0a610412 100644 --- a/R/aio.R +++ b/R/aio.R @@ -118,14 +118,7 @@ recv_aio <- function(con, "integer", "logical", "numeric", "raw", "string"), timeout = NULL, n = 65536L) - data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment(), - function() { - cb <- .subset2(data, "callback") - if (is.function(cb)) { - cb(data) - } - } - ) + data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment()) #' Receive Async and Signal a Condition #' @@ -164,60 +157,7 @@ recv_aio_signal <- function(con, "integer", "logical", "numeric", "raw", "string"), timeout = NULL, n = 65536L) - data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment(), - function() { - cb <- .subset2(data, "callback") - if (is.function(cb)) { - cb(data) - } - } - ) - -#' @exportS3Method promises::is.promising -is.promising.recvAio <- function(x) { - TRUE -} - -#' @exportS3Method promises::as.promise -as.promise.recvAio <- function(x) { - - promise <- .subset2(x, "promise") - - if (is.null(promise)) { - promise <- promises::promise(function(resolve, reject) { - assign("callback", function(...) { - - # WARNING: x$data is heavily side-effecty! - value <- .subset2(x, "data") - - if (is_error_value(value)) { - reject(simpleError(nng_error(value))) - } else { - resolve(value) - } - }, x) - }) - - # WARNING: x$data is heavily side-effecty! - value <- x$data - - if (!inherits(value, "unresolvedValue")) { - if (is_error_value(value)) { - promise <- promises::promise_reject(simpleError(nng_error(value))) - } else { - promise <- promises::promise_resolve(value) - } - } - - # Save for next time. This is not just an optimization but essential for - # correct behavior if as.promise is called multiple times, because only one - # `callback` can exist on the recvAio object at a time. - assign("promise", promise, x) - } - - promise - -} + data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment()) # Core aio functions ----------------------------------------------------------- diff --git a/R/context.R b/R/context.R index 0a70df53d..9f3285479 100644 --- a/R/context.R +++ b/R/context.R @@ -269,3 +269,19 @@ request_signal <- function(context, "integer", "logical", "numeric", "raw", "string"), timeout = NULL) data <- .Call(rnng_request_signal, context, data, cv, send_mode, recv_mode, timeout, environment()) + +#' @rdname request +#' @export +#' +request_promise <- function(context, + data, + cv, + send_mode = c("serial", "raw", "next"), + recv_mode = c("serial", "character", "complex", "double", + "integer", "logical", "numeric", "raw", "string"), + timeout = NULL) + data <- .Call(rnng_request_promise, context, data, cv, send_mode, recv_mode, timeout, environment(), + function() { + cb <- .subset2(data, "callback") + if (is.function(cb)) cb(data) + }) diff --git a/R/nanonext-package.R b/R/nanonext-package.R index d39594d39..c5cf6c148 100644 --- a/R/nanonext-package.R +++ b/R/nanonext-package.R @@ -92,10 +92,10 @@ #' @author Charlie Gao \email{charlie.gao@@shikokuchuo.net} #' (\href{https://orcid.org/0000-0002-0750-061X}{ORCID}) #' +#' @importFrom later later #' @importFrom stats start #' @importFrom tools md5sum #' @importFrom utils .DollarNames -#' @importFrom later later #' @useDynLib nanonext, .registration = TRUE #' "_PACKAGE" diff --git a/man/request.Rd b/man/request.Rd index dccf3fca7..61fb84056 100644 --- a/man/request.Rd +++ b/man/request.Rd @@ -3,6 +3,7 @@ \name{request} \alias{request} \alias{request_signal} +\alias{request_promise} \title{Request over Context (RPC Client for Req/Rep Protocol)} \usage{ request( @@ -23,6 +24,16 @@ request_signal( "numeric", "raw", "string"), timeout = NULL ) + +request_promise( + context, + data, + cv, + send_mode = c("serial", "raw", "next"), + recv_mode = c("serial", "character", "complex", "double", "integer", "logical", + "numeric", "raw", "string"), + timeout = NULL +) } \arguments{ \item{context}{a Context.} diff --git a/src/aio.c b/src/aio.c index 6f332dbe7..2149ed139 100644 --- a/src/aio.c +++ b/src/aio.c @@ -196,14 +196,6 @@ static void isaio_complete(void *arg) { } -static void raio_invoke_cb(void *arg) { - nano_aio *raio = (nano_aio *) arg; - SEXP callExpr; - PROTECT(callExpr = Rf_lcons((SEXP) raio->cb, R_NilValue)); - (void) Rf_eval(callExpr, R_GlobalEnv); - UNPROTECT(1); -} - static void raio_complete(void *arg) { nano_aio *raio = (nano_aio *) arg; @@ -219,7 +211,6 @@ static void raio_complete(void *arg) { raio->result = res - !res; #endif - later2(raio_invoke_cb, arg, 0); } static void raio_complete_signal(void *arg) { @@ -239,7 +230,6 @@ static void raio_complete_signal(void *arg) { nng_cv_wake(cv); nng_mtx_unlock(mtx); - later2(raio_invoke_cb, arg, 0); } static void request_complete_signal(void *arg) { @@ -262,6 +252,55 @@ static void request_complete_signal(void *arg) { } +static void raio_invoke_cb(void *arg) { + SEXP callExpr; + PROTECT(callExpr = Rf_lcons((SEXP) arg, R_NilValue)); + (void) Rf_eval(callExpr, R_GlobalEnv); + UNPROTECT(1); + R_ReleaseObject(arg); +} + +static void raio_complete_cb(void *arg) { + + nano_aio *raio = (nano_aio *) arg; + const int res = nng_aio_result(raio->aio); + if (res == 0) + raio->data = nng_aio_get_msg(raio->aio); + +#ifdef NANONEXT_LEGACY_NNG + nng_mtx_lock(shr_mtx); + raio->result = res - !res; + nng_mtx_unlock(shr_mtx); +#else + raio->result = res - !res; +#endif + + later2(raio_invoke_cb, raio->cb, 0); + +} + +static void request_complete_cb(void *arg) { + + nano_aio *raio = (nano_aio *) arg; + nano_aio *saio = (nano_aio *) raio->next; + nano_cv *ncv = (nano_cv *) saio->next; + nng_cv *cv = ncv->cv; + nng_mtx *mtx = ncv->mtx; + + const int res = nng_aio_result(raio->aio); + if (res == 0) + raio->data = nng_aio_get_msg(raio->aio); + + nng_mtx_lock(mtx); + raio->result = res - !res; + ncv->condition++; + nng_cv_wake(cv); + nng_mtx_unlock(mtx); + + later2(raio_invoke_cb, raio->cb, 0); + +} + static void iraio_complete(void *arg) { nano_aio *iaio = (nano_aio *) arg; @@ -720,7 +759,7 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) { } SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, - const SEXP bytes, const SEXP clo, const SEXP cb, nano_cv *ncv) { + const SEXP bytes, const SEXP clo, nano_cv *ncv) { const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout); const int signal = ncv != NULL; @@ -736,7 +775,6 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, raio->next = ncv; raio->type = RECVAIO; raio->mode = mod; - raio->cb = cb; if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : raio_complete, raio))) goto exitlevel1; @@ -803,19 +841,19 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, } -SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) { +SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) { - return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, NULL); + return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, NULL); } -SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) { +SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) { if (R_ExternalPtrTag(cvar) != nano_CvSymbol) Rf_error("'cv' is not a valid Condition Variable"); nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar); - return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, ncv); + return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, ncv); } @@ -1224,11 +1262,13 @@ SEXP rnng_ncurl_session_close(SEXP session) { // request --------------------------------------------------------------------- SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode, - const SEXP recvmode, const SEXP timeout, const SEXP clo, nano_cv *ncv) { + const SEXP recvmode, const SEXP timeout, const SEXP clo, + nano_cv *ncv, const SEXP cb) { const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout); const int mod = nano_matcharg(recvmode); const int signal = ncv != NULL; + const int promises = cb != NULL; nng_ctx *ctx = (nng_ctx *) R_ExternalPtrAddr(con); SEXP aio, env, fun; nano_buf buf; @@ -1267,8 +1307,15 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode, raio->type = RECVAIO; raio->mode = mod; raio->next = saio; - - if ((xc = nng_aio_alloc(&raio->aio, signal ? request_complete_signal : raio_complete, raio))) + if (promises) + R_PreserveObject(cb); + raio->cb = cb; + + if ((xc = nng_aio_alloc(&raio->aio, + promises ? + (signal ? request_complete_cb : raio_complete_cb) : + (signal ? request_complete_signal : raio_complete), + raio))) goto exitlevel2; nng_aio_set_timeout(raio->aio, dur); @@ -1306,7 +1353,7 @@ SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeou if (R_ExternalPtrTag(con) != nano_ContextSymbol) Rf_error("'con' is not a valid Context"); - return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, NULL); + return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, NULL, NULL); } @@ -1318,7 +1365,18 @@ SEXP rnng_request_signal(SEXP con, SEXP data, SEXP cvar, SEXP sendmode, SEXP rec Rf_error("'cv' is not a valid Condition Variable"); nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar); - return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv); + return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv, NULL); + +} + +SEXP rnng_request_promise(SEXP con, SEXP data, SEXP cvar, SEXP sendmode, SEXP recvmode, SEXP timeout, SEXP clo, SEXP cb) { + + if (R_ExternalPtrTag(con) != nano_ContextSymbol) + Rf_error("'con' is not a valid Context"); + + nano_cv *ncv = R_ExternalPtrTag(cvar) == nano_CvSymbol ? (nano_cv *) R_ExternalPtrAddr(cvar) : NULL; + + return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv, cb); } diff --git a/src/init.c b/src/init.c index 165b6ca6a..50c4c90da 100644 --- a/src/init.c +++ b/src/init.c @@ -155,9 +155,10 @@ static const R_CallMethodDef callMethods[] = { {"rnng_random", (DL_FUNC) &rnng_random, 2}, {"rnng_reap", (DL_FUNC) &rnng_reap, 1}, {"rnng_recv", (DL_FUNC) &rnng_recv, 4}, - {"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6}, - {"rnng_recv_aio_signal", (DL_FUNC) &rnng_recv_aio_signal, 7}, + {"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 5}, + {"rnng_recv_aio_signal", (DL_FUNC) &rnng_recv_aio_signal, 6}, {"rnng_request", (DL_FUNC) &rnng_request, 6}, + {"rnng_request_promise", (DL_FUNC) &rnng_request_promise, 8}, {"rnng_request_signal", (DL_FUNC) &rnng_request_signal, 7}, {"rnng_send", (DL_FUNC) &rnng_send, 4}, {"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 5}, diff --git a/src/nanonext.h b/src/nanonext.h index 24a868975..000ff9a50 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -251,9 +251,10 @@ SEXP rnng_protocol_open(SEXP, SEXP); SEXP rnng_random(SEXP, SEXP); SEXP rnng_reap(SEXP); SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP); -SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); -SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_request_promise(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_send(SEXP, SEXP, SEXP, SEXP); SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP); From b0533856a57529cfd7a5183b60f159f08f0366b0 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 17 Apr 2024 15:05:57 +0100 Subject: [PATCH 05/10] R_UnwindProtect eval --- src/aio.c | 13 +++++++++---- src/core.c | 2 +- src/nanonext.h | 1 + 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/aio.c b/src/aio.c index 2149ed139..1ec1b9088 100644 --- a/src/aio.c +++ b/src/aio.c @@ -252,12 +252,17 @@ static void request_complete_signal(void *arg) { } +static void release_object(void *data, Rboolean jump) { + if (jump) + R_ReleaseObject((SEXP) data); +} + static void raio_invoke_cb(void *arg) { - SEXP callExpr; - PROTECT(callExpr = Rf_lcons((SEXP) arg, R_NilValue)); - (void) Rf_eval(callExpr, R_GlobalEnv); + SEXP call, data = (SEXP) arg; + PROTECT(call = Rf_lcons(data, R_NilValue)); + (void) R_UnwindProtect(eval_safe, call, release_object, data, NULL); UNPROTECT(1); - R_ReleaseObject(arg); + R_ReleaseObject(data); } static void raio_complete_cb(void *arg) { diff --git a/src/core.c b/src/core.c index c6723da7b..07899210e 100644 --- a/src/core.c +++ b/src/core.c @@ -42,7 +42,7 @@ SEXP mk_error(const int xc) { } -static SEXP eval_safe (void *call) { +SEXP eval_safe (void *call) { return Rf_eval((SEXP) call, R_GlobalEnv); } diff --git a/src/nanonext.h b/src/nanonext.h index 000ff9a50..eac75336a 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -192,6 +192,7 @@ typedef struct nano_buf_s { SEXP mk_error(const int); SEXP mk_error_ncurl(const int); +SEXP eval_safe(void *); nano_buf nano_char_buf(const SEXP); SEXP nano_decode(unsigned char *, const size_t, const int); void nano_encode(nano_buf *, const SEXP); From 0af45c36c58519991ba47dea19de7629eef3f7ce Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 17 Apr 2024 15:21:36 +0100 Subject: [PATCH 06/10] add credit --- README.Rmd | 1 + README.md | 3 +++ 2 files changed, 4 insertions(+) diff --git a/README.Rmd b/README.Rmd index 381fba95f..730cd8ea3 100644 --- a/README.Rmd +++ b/README.Rmd @@ -169,6 +169,7 @@ We would like to acknowledge in particular: - [Garrett D'Amore](https://github.com/gdamore), author of the NNG library, for generous advice and for implementing a feature request specifically for a more efficient 'aio' implementation in `nanonext`. - The [R Consortium](https://www.r-consortium.org/) for funding the development of the secure TLS capabilities in the package, and [Henrik Bengtsson](https://github.com/HenrikBengtsson) and [Will Landau](https://github.com/wlandau/)'s roles in making this possible. +- [Joe Cheng](https://github.com/jcheng5/) for prototyping the integration of `nanonext` with `later` to support the next generation of completely event-driven promises in `mirai`. - [R Core](https://www.r-project.org/contributors.html) for various auxiliary functions for serialisation and raw / character conversion, which have been adopted by the package. - [Luke Tierney](https://github.com/ltierney/) and [Mike Cheng](https://github.com/coolbutuseless) for meticulous documentation of the R serialization mechanism, which led to the package's own implementation of a low-level interface to R serialization. - [Jeroen Ooms](https://github.com/jeroen) - for his 'Anticonf (tm)' configure script, on which our original 'configure' was based, although much modified since. diff --git a/README.md b/README.md index 7533e483d..8408c6d50 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,9 @@ We would like to acknowledge in particular: development of the secure TLS capabilities in the package, and [Henrik Bengtsson](https://github.com/HenrikBengtsson) and [Will Landau](https://github.com/wlandau/)’s roles in making this possible. +- [Joe Cheng](https://github.com/jcheng5/) for prototyping the + integration of `nanonext` with `later` to support the next generation + of completely event-driven promises in `mirai`. - [R Core](https://www.r-project.org/contributors.html) for various auxiliary functions for serialisation and raw / character conversion, which have been adopted by the package. From f379cb0386fadb71459bb7cc754bd6113429c6db Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 17 Apr 2024 21:57:35 +0100 Subject: [PATCH 07/10] simplify --- R/context.R | 6 +----- src/aio.c | 35 +++++++++++++++++++---------------- src/init.c | 2 +- src/nanonext.h | 2 +- 4 files changed, 22 insertions(+), 23 deletions(-) diff --git a/R/context.R b/R/context.R index 9f3285479..525cfb6d3 100644 --- a/R/context.R +++ b/R/context.R @@ -280,8 +280,4 @@ request_promise <- function(context, recv_mode = c("serial", "character", "complex", "double", "integer", "logical", "numeric", "raw", "string"), timeout = NULL) - data <- .Call(rnng_request_promise, context, data, cv, send_mode, recv_mode, timeout, environment(), - function() { - cb <- .subset2(data, "callback") - if (is.function(cb)) cb(data) - }) + data <- .Call(rnng_request_promise, context, data, cv, send_mode, recv_mode, timeout, environment()) diff --git a/src/aio.c b/src/aio.c index 1ec1b9088..fa68226c2 100644 --- a/src/aio.c +++ b/src/aio.c @@ -258,11 +258,11 @@ static void release_object(void *data, Rboolean jump) { } static void raio_invoke_cb(void *arg) { - SEXP call, data = (SEXP) arg; - PROTECT(call = Rf_lcons(data, R_NilValue)); - (void) R_UnwindProtect(eval_safe, call, release_object, data, NULL); + SEXP call, env = (SEXP) arg; + PROTECT(call = Rf_lcons(CADR(ATTRIB(env)), R_NilValue)); + (void) R_UnwindProtect(eval_safe, call, release_object, env, NULL); UNPROTECT(1); - R_ReleaseObject(data); + R_ReleaseObject(env); } static void raio_complete_cb(void *arg) { @@ -280,7 +280,8 @@ static void raio_complete_cb(void *arg) { raio->result = res - !res; #endif - later2(raio_invoke_cb, raio->cb, 0); + if (CADR(ATTRIB(raio->cb)) != R_NilValue) + later2(raio_invoke_cb, raio->cb, 0); } @@ -302,7 +303,8 @@ static void request_complete_cb(void *arg) { nng_cv_wake(cv); nng_mtx_unlock(mtx); - later2(raio_invoke_cb, raio->cb, 0); + if (CADR(ATTRIB(raio->cb)) != R_NilValue) + later2(raio_invoke_cb, raio->cb, 0); } @@ -1268,12 +1270,11 @@ SEXP rnng_ncurl_session_close(SEXP session) { SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode, const SEXP recvmode, const SEXP timeout, const SEXP clo, - nano_cv *ncv, const SEXP cb) { + nano_cv *ncv, const int promises) { const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout); const int mod = nano_matcharg(recvmode); const int signal = ncv != NULL; - const int promises = cb != NULL; nng_ctx *ctx = (nng_ctx *) R_ExternalPtrAddr(con); SEXP aio, env, fun; nano_buf buf; @@ -1309,12 +1310,14 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode, nng_ctx_send(*ctx, saio->aio); raio = R_Calloc(1, nano_aio); + PROTECT(env = Rf_allocSExp(ENVSXP)); raio->type = RECVAIO; raio->mode = mod; raio->next = saio; - if (promises) - R_PreserveObject(cb); - raio->cb = cb; + if (promises) { + R_PreserveObject(env); + raio->cb = env; + } if ((xc = nng_aio_alloc(&raio->aio, promises ? @@ -1330,7 +1333,6 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode, PROTECT(aio = R_MakeExternalPtr(raio, nano_AioSymbol, R_NilValue)); R_RegisterCFinalizerEx(aio, request_finalizer, TRUE); - PROTECT(env = Rf_allocSExp(ENVSXP)); NANO_CLASS(env, "recvAio"); Rf_defineVar(nano_AioSymbol, aio, env); @@ -1344,6 +1346,7 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode, return env; exitlevel2: + UNPROTECT(1); R_Free(raio); nng_aio_free(saio->aio); exitlevel1: @@ -1358,7 +1361,7 @@ SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeou if (R_ExternalPtrTag(con) != nano_ContextSymbol) Rf_error("'con' is not a valid Context"); - return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, NULL, NULL); + return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, NULL, 0); } @@ -1370,18 +1373,18 @@ SEXP rnng_request_signal(SEXP con, SEXP data, SEXP cvar, SEXP sendmode, SEXP rec Rf_error("'cv' is not a valid Condition Variable"); nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar); - return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv, NULL); + return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv, 0); } -SEXP rnng_request_promise(SEXP con, SEXP data, SEXP cvar, SEXP sendmode, SEXP recvmode, SEXP timeout, SEXP clo, SEXP cb) { +SEXP rnng_request_promise(SEXP con, SEXP data, SEXP cvar, SEXP sendmode, SEXP recvmode, SEXP timeout, SEXP clo) { if (R_ExternalPtrTag(con) != nano_ContextSymbol) Rf_error("'con' is not a valid Context"); nano_cv *ncv = R_ExternalPtrTag(cvar) == nano_CvSymbol ? (nano_cv *) R_ExternalPtrAddr(cvar) : NULL; - return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv, cb); + return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv, 1); } diff --git a/src/init.c b/src/init.c index 50c4c90da..f13e0cf33 100644 --- a/src/init.c +++ b/src/init.c @@ -158,7 +158,7 @@ static const R_CallMethodDef callMethods[] = { {"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 5}, {"rnng_recv_aio_signal", (DL_FUNC) &rnng_recv_aio_signal, 6}, {"rnng_request", (DL_FUNC) &rnng_request, 6}, - {"rnng_request_promise", (DL_FUNC) &rnng_request_promise, 8}, + {"rnng_request_promise", (DL_FUNC) &rnng_request_promise, 7}, {"rnng_request_signal", (DL_FUNC) &rnng_request_signal, 7}, {"rnng_send", (DL_FUNC) &rnng_send, 4}, {"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 5}, diff --git a/src/nanonext.h b/src/nanonext.h index eac75336a..2a36bbfa5 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -255,7 +255,7 @@ SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP); SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); -SEXP rnng_request_promise(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_request_promise(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_send(SEXP, SEXP, SEXP, SEXP); SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP); From 3233638e3d96e47bd188ad787eba9aea517bd46e Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Thu, 18 Apr 2024 08:59:42 +0100 Subject: [PATCH 08/10] document --- DESCRIPTION | 2 +- NAMESPACE | 2 +- NEWS.md | 5 ++++- R/context.R | 20 +++++++++++++------- man/request.Rd | 10 +++++++--- 5 files changed, 26 insertions(+), 13 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 8ff2069de..6653bea49 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: nanonext Type: Package Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library -Version: 0.13.6.9000 +Version: 0.13.6.9002 Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library implementing 'Scalability Protocols', a reliable, high-performance standard for common communications patterns including diff --git a/NAMESPACE b/NAMESPACE index add71dece..f66fa746b 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -75,7 +75,7 @@ export(recv_aio) export(recv_aio_signal) export(reply) export(request) -export(request_promise) +export(request2) export(request_signal) export(send) export(send_aio) diff --git a/NEWS.md b/NEWS.md index 032f74ee3..4e12142ae 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,7 @@ -# nanonext 0.13.6 (development) +# nanonext 0.13.6.9002 (development) + +* Integrates with the `later` package to provide the foundation for truly event-driven (non-polling) promises (thanks @jcheng5 for the initial prototype in #28), where side-effects are enacted asynchronously upon aio completion. +* Adds `request2()` for creating a request that may be turned into an event-driven promise. # nanonext 0.13.6 diff --git a/R/context.R b/R/context.R index 525cfb6d3..6aa92551d 100644 --- a/R/context.R +++ b/R/context.R @@ -270,14 +270,20 @@ request_signal <- function(context, timeout = NULL) data <- .Call(rnng_request_signal, context, data, cv, send_mode, recv_mode, timeout, environment()) +#' Request2 (RPC Client for Req/Rep Protocol) +#' +#' \strong{request2} is the next generation request function that optionally +#' takes a condition variable for signalling, and supports event-driven +#' promises. +#' #' @rdname request #' @export #' -request_promise <- function(context, - data, - cv, - send_mode = c("serial", "raw", "next"), - recv_mode = c("serial", "character", "complex", "double", - "integer", "logical", "numeric", "raw", "string"), - timeout = NULL) +request2 <- function(context, + data, + cv = NULL, + send_mode = c("serial", "raw", "next"), + recv_mode = c("serial", "character", "complex", "double", + "integer", "logical", "numeric", "raw", "string"), + timeout = NULL) data <- .Call(rnng_request_promise, context, data, cv, send_mode, recv_mode, timeout, environment()) diff --git a/man/request.Rd b/man/request.Rd index 61fb84056..c1c1c475c 100644 --- a/man/request.Rd +++ b/man/request.Rd @@ -3,7 +3,7 @@ \name{request} \alias{request} \alias{request_signal} -\alias{request_promise} +\alias{request2} \title{Request over Context (RPC Client for Req/Rep Protocol)} \usage{ request( @@ -25,10 +25,10 @@ request_signal( timeout = NULL ) -request_promise( +request2( context, data, - cv, + cv = NULL, send_mode = c("serial", "raw", "next"), recv_mode = c("serial", "character", "complex", "double", "integer", "logical", "numeric", "raw", "string"), @@ -68,6 +68,10 @@ Implements a caller/client for the req node of the req/rep protocol. Sends A signalling version of the function takes a 'conditionVariable' as an additional argument and signals it when the async receive is complete. + +\strong{request2} is the next generation request function that optionally + takes a condition variable for signalling, and supports event-driven + promises. } \details{ Sending the request and receiving the result are both performed From b8ed2360e13e7c32e22bb089442d3851be43a283 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Thu, 18 Apr 2024 09:35:07 +0100 Subject: [PATCH 09/10] more efficient aio structure; remove legacy nng compat; requires nng >= 1.6.0 --- DESCRIPTION | 2 +- NEWS.md | 1 + README.Rmd | 2 +- README.md | 2 +- configure | 4 +-- src/aio.c | 83 ++++---------------------------------------------- src/core.c | 52 ------------------------------- src/init.c | 10 ------ src/nanonext.h | 8 ----- 9 files changed, 12 insertions(+), 152 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 6653bea49..06c3155d5 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -26,7 +26,7 @@ License: GPL (>= 3) BugReports: https://github.com/shikokuchuo/nanonext/issues URL: https://shikokuchuo.net/nanonext/, https://github.com/shikokuchuo/nanonext/ Encoding: UTF-8 -SystemRequirements: 'libnng' >= 1.5 and 'libmbedtls' >= 2.5, or 'cmake' to +SystemRequirements: 'libnng' >= 1.6 and 'libmbedtls' >= 2.5, or 'cmake' to compile NNG and/or Mbed TLS included in package sources Depends: R (>= 3.5) diff --git a/NEWS.md b/NEWS.md index 4e12142ae..817704246 100644 --- a/NEWS.md +++ b/NEWS.md @@ -2,6 +2,7 @@ * Integrates with the `later` package to provide the foundation for truly event-driven (non-polling) promises (thanks @jcheng5 for the initial prototype in #28), where side-effects are enacted asynchronously upon aio completion. * Adds `request2()` for creating a request that may be turned into an event-driven promise. +* Updates minimum 'libnng' version requirement to v1.6.0 (if a suitable system-installed version is not found, the bundled version is compiled from source). # nanonext 0.13.6 diff --git a/README.Rmd b/README.Rmd index 730cd8ea3..55312b09c 100644 --- a/README.Rmd +++ b/README.Rmd @@ -149,7 +149,7 @@ vignette("nanonext", package = "nanonext") #### Linux / Mac / Solaris -Installation from source requires 'libnng' >= v1.5.0 and 'libmbedtls' >= 2.5.0 (suitable installations are automatically detected), or else 'cmake' to compile 'libnng' v1.7.3 and 'libmbedtls' v3.6.0 LTS included within the package sources. +Installation from source requires 'libnng' >= v1.6.0 and 'libmbedtls' >= 2.5.0 (suitable installations are automatically detected), or else 'cmake' to compile 'libnng' v1.7.3 and 'libmbedtls' v3.6.0 LTS included within the package sources. **It is recommended for optimal performance and stability to let the package automatically compile bundled versions of 'libmbedtls' and 'libnng' during installation.** To ensure the libraries are compiled from source even if system installations are present, set the `NANONEXT_LIBS` environment variable prior to installation e.g. by `Sys.setenv(NANONEXT_LIBS = 1)`. diff --git a/README.md b/README.md index 8408c6d50..995103e9f 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,7 @@ vignette("nanonext", package = "nanonext") #### Linux / Mac / Solaris -Installation from source requires ‘libnng’ \>= v1.5.0 and ‘libmbedtls’ +Installation from source requires ‘libnng’ \>= v1.6.0 and ‘libmbedtls’ \>= 2.5.0 (suitable installations are automatically detected), or else ‘cmake’ to compile ‘libnng’ v1.7.3 and ‘libmbedtls’ v3.6.0 LTS included within the package sources. diff --git a/configure b/configure index 88a9e1be4..5c8ab0809 100755 --- a/configure +++ b/configure @@ -128,7 +128,7 @@ fi echo "#include int main() { -#if NNG_MAJOR_VERSION < 1 || NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 5 +#if NNG_MAJOR_VERSION < 1 || NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6 *(void *) 0 = 0; #endif }" | ${CC} ${NNG_CFLAGS} -xc - -o /dev/null > /dev/null 2>&1 @@ -140,7 +140,7 @@ fi if [ $? -ne 0 ] then - echo "No existing 'libnng' >= 1.5 found" + echo "No existing 'libnng' >= 1.6 found" echo "Detecting 'cmake'..." which cmake if [ $? -ne 0 ] diff --git a/src/aio.c b/src/aio.c index fa68226c2..d86e96adf 100644 --- a/src/aio.c +++ b/src/aio.c @@ -160,14 +160,7 @@ static void saio_complete(void *arg) { const int res = nng_aio_result(saio->aio); if (res) nng_msg_free(nng_aio_get_msg(saio->aio)); - -#ifdef NANONEXT_LEGACY_NNG - nng_mtx_lock(shr_mtx); - saio->result = res - !res; - nng_mtx_unlock(shr_mtx); -#else saio->result = res - !res; -#endif } @@ -185,14 +178,7 @@ static void isaio_complete(void *arg) { const int res = nng_aio_result(iaio->aio); if (iaio->data != NULL) R_Free(iaio->data); - -#ifdef NANONEXT_LEGACY_NNG - nng_mtx_lock(shr_mtx); iaio->result = res - !res; - nng_mtx_unlock(shr_mtx); -#else - iaio->result = res - !res; -#endif } @@ -203,13 +189,7 @@ static void raio_complete(void *arg) { if (res == 0) raio->data = nng_aio_get_msg(raio->aio); -#ifdef NANONEXT_LEGACY_NNG - nng_mtx_lock(shr_mtx); - raio->result = res - !res; - nng_mtx_unlock(shr_mtx); -#else raio->result = res - !res; -#endif } @@ -271,17 +251,11 @@ static void raio_complete_cb(void *arg) { const int res = nng_aio_result(raio->aio); if (res == 0) raio->data = nng_aio_get_msg(raio->aio); - -#ifdef NANONEXT_LEGACY_NNG - nng_mtx_lock(shr_mtx); raio->result = res - !res; - nng_mtx_unlock(shr_mtx); -#else - raio->result = res - !res; -#endif - if (CADR(ATTRIB(raio->cb)) != R_NilValue) - later2(raio_invoke_cb, raio->cb, 0); + nano_aio *saio = (nano_aio *) raio->next; + if (CADR(ATTRIB((SEXP) saio->data)) != R_NilValue) + later2(raio_invoke_cb, saio->data, 0); } @@ -303,8 +277,8 @@ static void request_complete_cb(void *arg) { nng_cv_wake(cv); nng_mtx_unlock(mtx); - if (CADR(ATTRIB(raio->cb)) != R_NilValue) - later2(raio_invoke_cb, raio->cb, 0); + if (CADR(ATTRIB((SEXP) saio->data)) != R_NilValue) + later2(raio_invoke_cb, saio->data, 0); } @@ -312,14 +286,7 @@ static void iraio_complete(void *arg) { nano_aio *iaio = (nano_aio *) arg; const int res = nng_aio_result(iaio->aio); - -#ifdef NANONEXT_LEGACY_NNG - nng_mtx_lock(shr_mtx); - iaio->result = res - !res; - nng_mtx_unlock(shr_mtx); -#else iaio->result = res - !res; -#endif } @@ -367,9 +334,6 @@ static void request_finalizer(SEXP xptr) { if (R_ExternalPtrAddr(xptr) == NULL) return; nano_aio *xp = (nano_aio *) R_ExternalPtrAddr(xptr); nano_aio *saio = (nano_aio *) xp->next; -#ifdef NANONEXT_LEGACY_NNG - nng_ctx_close(*(nng_ctx *) saio->data); -#endif nng_aio_free(saio->aio); nng_aio_free(xp->aio); if (xp->data != NULL) @@ -447,15 +411,7 @@ SEXP rnng_aio_result(SEXP env) { nano_aio *saio = (nano_aio *) R_ExternalPtrAddr(aio); -#ifdef NANONEXT_LEGACY_NNG - int res; - nng_mtx_lock(shr_mtx); - res = saio->result; - nng_mtx_unlock(shr_mtx); - if (res == 0) -#else if (nng_aio_busy(saio->aio)) -#endif return nano_unresolved; if (saio->result > 0) @@ -479,15 +435,7 @@ SEXP rnng_aio_get_msg(SEXP env) { nano_aio *raio = (nano_aio *) R_ExternalPtrAddr(aio); -#ifdef NANONEXT_LEGACY_NNG - int res; - nng_mtx_lock(shr_mtx); - res = raio->result; - nng_mtx_unlock(shr_mtx); - if (res == 0) -#else if (nng_aio_busy(raio->aio)) -#endif return nano_unresolved; if (raio->result > 0) @@ -652,15 +600,7 @@ SEXP rnng_unresolved2(SEXP aio) { nano_aio *aiop = (nano_aio *) R_ExternalPtrAddr(coreaio); -#ifdef NANONEXT_LEGACY_NNG - int res; - nng_mtx_lock(shr_mtx); - res = aiop->result; - nng_mtx_unlock(shr_mtx); - return Rf_ScalarLogical(!res); -#else return Rf_ScalarLogical(nng_aio_busy(aiop->aio)); -#endif } @@ -1004,15 +944,7 @@ SEXP rnng_aio_http(SEXP env, SEXP response, SEXP type) { nano_aio *haio = (nano_aio *) R_ExternalPtrAddr(aio); -#ifdef NANONEXT_LEGACY_NNG - int res; - nng_mtx_lock(shr_mtx); - res = haio->result; - nng_mtx_unlock(shr_mtx); - if (res == 0) -#else if (nng_aio_busy(haio->aio)) -#endif return nano_unresolved; if (haio->result > 0) @@ -1292,9 +1224,6 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode, } saio = R_Calloc(1, nano_aio); -#ifdef NANONEXT_LEGACY_NNG - saio->data = ctx; -#endif saio->next = ncv; if ((xc = nng_msg_alloc(&msg, 0))) @@ -1316,7 +1245,7 @@ SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode, raio->next = saio; if (promises) { R_PreserveObject(env); - raio->cb = env; + saio->data = env; } if ((xc = nng_aio_alloc(&raio->aio, diff --git a/src/core.c b/src/core.c index 07899210e..81d053e92 100644 --- a/src/core.c +++ b/src/core.c @@ -982,30 +982,6 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) { nng_ctx *ctxp = (nng_ctx *) R_ExternalPtrAddr(con); nng_msg *msgp; -#ifdef NANONEXT_LEGACY_NNG - - nng_aio *aiop; - - if ((xc = nng_msg_alloc(&msgp, 0))) - goto exitlevel1; - - if ((xc = nng_msg_append(msgp, buf.buf, buf.cur)) || - (xc = nng_aio_alloc(&aiop, NULL, NULL))) { - nng_msg_free(msgp); - goto exitlevel1; - } - - nng_aio_set_msg(aiop, msgp); - nng_aio_set_timeout(aiop, flags < 0 ? 0 : flags > 0 ? flags : (*NANO_INTEGER(block) == 1) * NNG_DURATION_DEFAULT); - nng_ctx_send(*ctxp, aiop); - NANO_FREE(buf); - nng_aio_wait(aiop); - if ((xc = nng_aio_result(aiop))) - nng_msg_free(nng_aio_get_msg(aiop)); - nng_aio_free(aiop); - -#else - if (flags <= 0) { if ((xc = nng_msg_alloc(&msgp, 0))) @@ -1043,8 +1019,6 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) { } -#endif - } else if (ptrtag == nano_StreamSymbol) { nano_encode(&buf, data); @@ -1136,30 +1110,6 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) { nng_ctx *ctxp = (nng_ctx *) R_ExternalPtrAddr(con); nng_msg *msgp; -#ifdef NANONEXT_LEGACY_NNG - - nng_aio *aiop; - - if ((xc = nng_aio_alloc(&aiop, NULL, NULL))) - goto exitlevel1; - nng_aio_set_timeout(aiop, flags < 0 ? 0 : flags > 0 ? flags : (*NANO_INTEGER(block) == 1) * NNG_DURATION_DEFAULT); - nng_ctx_recv(*ctxp, aiop); - - nng_aio_wait(aiop); - if ((xc = nng_aio_result(aiop))) { - nng_aio_free(aiop); - goto exitlevel1; - } - - msgp = nng_aio_get_msg(aiop); - nng_aio_free(aiop); - buf = nng_msg_body(msgp); - sz = nng_msg_len(msgp); - res = nano_decode(buf, sz, mod); - nng_msg_free(msgp); - -#else - if (flags <= 0) { xc = nng_ctx_recvmsg(*ctxp, &msgp, (flags < 0 || *NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK); @@ -1195,8 +1145,6 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) { } -#endif - } else if (ptrtag == nano_StreamSymbol) { mod = nano_matchargs(mode); diff --git a/src/init.c b/src/init.c index f13e0cf33..b2d4c26c0 100644 --- a/src/init.c +++ b/src/init.c @@ -50,10 +50,6 @@ SEXP nano_refHook; SEXP nano_success; SEXP nano_unresolved; -#ifdef NANONEXT_LEGACY_NNG -nng_mtx *shr_mtx; -#endif - static void RegisterSymbols(void) { nano_AioSymbol = Rf_install("aio"); nano_ContextSymbol = Rf_install("context"); @@ -193,9 +189,6 @@ static const R_ExternalMethodDef externalMethods[] = { void attribute_visible R_init_nanonext(DllInfo* dll) { RegisterSymbols(); PreserveObjects(); -#ifdef NANONEXT_LEGACY_NNG - nng_mtx_alloc(&shr_mtx); -#endif R_registerRoutines(dll, NULL, callMethods, NULL, externalMethods); R_useDynamicSymbols(dll, FALSE); R_forceSymbols(dll, TRUE); @@ -203,7 +196,4 @@ void attribute_visible R_init_nanonext(DllInfo* dll) { void attribute_visible R_unload_nanonext(DllInfo *info) { ReleaseObjects(); -#ifdef NANONEXT_LEGACY_NNG - nng_mtx_free(shr_mtx); -#endif } diff --git a/src/nanonext.h b/src/nanonext.h index 2a36bbfa5..f0abad23d 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -21,10 +21,6 @@ #include -#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6 -#define NANONEXT_LEGACY_NNG -#endif - #ifdef NANONEXT_PROTOCOLS #include #include @@ -45,9 +41,6 @@ #ifdef NANONEXT_SUPPLEMENTALS #include #include -#ifdef NANONEXT_LEGACY_NNG -extern nng_mtx *shr_mtx; -#endif typedef struct nano_listener_s { nng_listener list; @@ -88,7 +81,6 @@ typedef struct nano_aio_s { int result; void *data; void *next; - void *cb; } nano_aio; typedef struct nano_cv_s { From e8226fbec713340860e485c18064bc81ffa52f4f Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Thu, 18 Apr 2024 10:06:03 +0100 Subject: [PATCH 10/10] safer and more efficient to release cb object in request finalizer --- NEWS.md | 5 +++++ src/aio.c | 24 +++++++++++------------- src/core.c | 5 +++-- src/nanonext.h | 1 - 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/NEWS.md b/NEWS.md index 817704246..4aeb74dd6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,7 +1,12 @@ # nanonext 0.13.6.9002 (development) +#### New Features + * Integrates with the `later` package to provide the foundation for truly event-driven (non-polling) promises (thanks @jcheng5 for the initial prototype in #28), where side-effects are enacted asynchronously upon aio completion. * Adds `request2()` for creating a request that may be turned into an event-driven promise. + +#### Updates + * Updates minimum 'libnng' version requirement to v1.6.0 (if a suitable system-installed version is not found, the bundled version is compiled from source). # nanonext 0.13.6 diff --git a/src/aio.c b/src/aio.c index d86e96adf..d8dc8b2a1 100644 --- a/src/aio.c +++ b/src/aio.c @@ -232,17 +232,11 @@ static void request_complete_signal(void *arg) { } -static void release_object(void *data, Rboolean jump) { - if (jump) - R_ReleaseObject((SEXP) data); -} - static void raio_invoke_cb(void *arg) { - SEXP call, env = (SEXP) arg; - PROTECT(call = Rf_lcons(CADR(ATTRIB(env)), R_NilValue)); - (void) R_UnwindProtect(eval_safe, call, release_object, env, NULL); + SEXP call, cb = (SEXP) arg; + PROTECT(call = Rf_lcons(cb, R_NilValue)); + (void) Rf_eval(call, R_GlobalEnv); UNPROTECT(1); - R_ReleaseObject(env); } static void raio_complete_cb(void *arg) { @@ -254,8 +248,9 @@ static void raio_complete_cb(void *arg) { raio->result = res - !res; nano_aio *saio = (nano_aio *) raio->next; - if (CADR(ATTRIB((SEXP) saio->data)) != R_NilValue) - later2(raio_invoke_cb, saio->data, 0); + SEXP ax = CADR(ATTRIB((SEXP) saio->data)); + if (ax != R_NilValue) + later2(raio_invoke_cb, ax, 0); } @@ -277,8 +272,9 @@ static void request_complete_cb(void *arg) { nng_cv_wake(cv); nng_mtx_unlock(mtx); - if (CADR(ATTRIB((SEXP) saio->data)) != R_NilValue) - later2(raio_invoke_cb, saio->data, 0); + SEXP ax = CADR(ATTRIB((SEXP) saio->data)); + if (ax != R_NilValue) + later2(raio_invoke_cb, ax, 0); } @@ -338,6 +334,8 @@ static void request_finalizer(SEXP xptr) { nng_aio_free(xp->aio); if (xp->data != NULL) nng_msg_free((nng_msg *) xp->data); + if (saio->data != NULL) + R_ReleaseObject((SEXP) saio->data); R_Free(saio); R_Free(xp); diff --git a/src/core.c b/src/core.c index 81d053e92..73d44abcb 100644 --- a/src/core.c +++ b/src/core.c @@ -42,12 +42,13 @@ SEXP mk_error(const int xc) { } -SEXP eval_safe (void *call) { +static SEXP eval_safe (void *call) { return Rf_eval((SEXP) call, R_GlobalEnv); } static void rl_reset(void *data, Rboolean jump) { - if (jump && data == NULL) + (void) data; + if (jump) SET_TAG(nano_refHook, R_NilValue); } diff --git a/src/nanonext.h b/src/nanonext.h index f0abad23d..566ac8028 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -184,7 +184,6 @@ typedef struct nano_buf_s { SEXP mk_error(const int); SEXP mk_error_ncurl(const int); -SEXP eval_safe(void *); nano_buf nano_char_buf(const SEXP); SEXP nano_decode(unsigned char *, const size_t, const int); void nano_encode(nano_buf *, const SEXP);