diff --git a/DESCRIPTION b/DESCRIPTION index 112fed7b7..65e0e2e7a 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 0.8.2.9001 +Version: 0.8.2.9002 Description: Lightweight parallel code execution, local or distributed across the network. Designed for simplicity, a 'mirai' evaluates an arbitrary expression asynchronously, resolving automatically upon completion. diff --git a/NAMESPACE b/NAMESPACE index d5f665c29..11e9a1ac5 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -16,6 +16,7 @@ export(is_mirai_error) export(is_mirai_interrupt) export(launch) export(mirai) +export(saisei) export(server) export(stop_mirai) export(unresolved) @@ -26,6 +27,7 @@ importFrom(nanonext,cv) importFrom(nanonext,cv_value) importFrom(nanonext,is_error_value) importFrom(nanonext,listen) +importFrom(nanonext,lock) importFrom(nanonext,mclock) importFrom(nanonext,msleep) importFrom(nanonext,opt) diff --git a/NEWS.md b/NEWS.md index 31ca5302f..95f1f463a 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,7 @@ -# mirai 0.8.2.9001 (development) +# mirai 0.8.2.9002 (development) * `dispatcher()` gains the argument 'token' for appending a unique token to each URL the dispatcher listens at. +* `saisei()` implemented to regenerate the token used by a given dispatcher socket. * Internal performance enhancements. # mirai 0.8.2 diff --git a/R/mirai-package.R b/R/mirai-package.R index 79d2b5d3e..3ca4c1007 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -49,7 +49,7 @@ #' @author Charlie Gao \email{charlie.gao@@shikokuchuo.net} #' (\href{https://orcid.org/0000-0002-0750-061X}{ORCID}) #' -#' @importFrom nanonext call_aio context cv cv_value is_error_value listen +#' @importFrom nanonext call_aio context cv cv_value is_error_value listen lock #' mclock msleep opt parse_url pipe_notify random recv recv_aio_signal #' request request_signal send sha1 socket stat stop_aio unresolved wait #' weakref<- diff --git a/R/mirai.R b/R/mirai.R index 29fa618ea..58def03bd 100644 --- a/R/mirai.R +++ b/R/mirai.R @@ -217,7 +217,7 @@ dispatcher <- function(client, url = NULL, n = NULL, asyncdial = TRUE, } for (i in seq_n) { - nurl <- if (auto) sprintf(.urlfmt, random()) else + nurl <- if (auto) sprintf(.urlfmt, if (token) i else random()) else if (vectorised) url[i] else if (is.null(ports)) sprintf("%s/%d", url, i) else sub(ports[1L], ports[i], url, fixed = TRUE) @@ -228,6 +228,7 @@ dispatcher <- function(client, url = NULL, n = NULL, asyncdial = TRUE, ncv <- cv() pipe_notify(nsock, cv = ncv, cv2 = cv, flag = FALSE) && stop() listen(nsock, url = nurl, error = TRUE) + if (token) lock(nsock, cv = ncv) if (i == 1L && !auto && parse_url(opt(attr(nsock, "listener")[[1L]], "url"))[["port"]] == "0") { realport <- opt(attr(nsock, "listener")[[1L]], "tcp-bound-port") nurl <- sub("(?<=:)0(?![^/])", realport, nurl, perl = TRUE) @@ -273,12 +274,18 @@ dispatcher <- function(client, url = NULL, n = NULL, asyncdial = TRUE, ctrchannel && !unresolved(cmessage) && { i <- .subset2(cmessage, "data") if (i) { - close(servers[[i]]) - servers[[i]] <- socket(protocol = "req") - active[[i]] <- cv() - pipe_notify(servers[[i]], cv = active[[i]], cv2 = cv, flag = FALSE) && stop() - data <- servernames[i] <- append_token(basenames[i]) - listen(servers[[i]], url = data, error = TRUE) + if (i > 0 && i <= n) { + close(servers[[i]]) + servers[[i]] <- socket(protocol = "req") + active[[i]] <- cv() + pipe_notify(servers[[i]], cv = active[[i]], cv2 = cv, flag = FALSE) && stop() + data <- servernames[i] <- append_token(basenames[i]) + listen(servers[[i]], url = data, error = TRUE) + if (token) lock(servers[[i]], cv = active[[i]]) + } else { + data <- NULL + } + } else { data <- `attributes<-`(c(activevec, assigned - complete, assigned, complete, instance), list(dim = c(n, 5L), dimnames = list(servernames, statnames))) @@ -452,8 +459,8 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau #' @param ... additional arguments passed through to \code{\link{dispatcher}} if #' using active dispatch and/or \code{\link{server}} if launching local daemons. #' @param .compute (optional) character compute profile to use for creating the -#' daemons (each compute profile can have its own set of daemons for -#' connecting to different resources). +#' daemons (each compute profile has its own set of daemons for connecting +#' to different resources). #' #' @return Setting daemons: integer number of daemons set, or the character #' client URL. @@ -643,7 +650,7 @@ daemons <- function(n, url = NULL, dispatcher = TRUE, ..., .compute = "default") missing(n) && missing(url) && return(list(connections = if (length(..[[.compute]][["sock"]])) stat(..[[.compute]][["sock"]], "pipes") else 0, - daemons = if (length(..[[.compute]][["sockc"]])) query_nodes(..[[.compute]][["sockc"]], dispatcher) else + daemons = if (length(..[[.compute]][["sockc"]])) query_nodes(..[[.compute]][["sockc"]], 0L) else if (length(..[[.compute]][["proc"]])) ..[[.compute]][["proc"]] else 0L)) if (is.null(..[[.compute]])) `[[<-`(.., .compute, new.env(hash = FALSE, parent = environment(daemons))) @@ -974,10 +981,48 @@ print.miraiInterrupt <- function(x, ...) { launch <- function(args) system2(command = .command, args = c("-e", shQuote(args)), stdout = NULL, stderr = NULL, wait = FALSE) +#' Saisei - Regenerate Token +#' +#' When using daemons with a local dispatcher service, replaces an existing +#' socket at the dispatcher with a new one, listening to a URL incorporating +#' a newly-generated token. +#' +#' @param i integer \code{i}th daemon to replace. +#' @param .compute (optional) character compute profile to use (each compute +#' profile has its own set of daemons for connecting to different resources). +#' +#' @return The regenerated character URL upon success, or else NULL. +#' +#' @details Care should be taken when calling this function as the specified +#' socket will be closed and replaced immediately without waiting for any +#' task in progress to finish. +#' +#' @examples +#' if (interactive()) { +#' # Only run examples in interactive R sessions +#' +#' daemons(1, token = TRUE) +#' daemons() +#' saisei(i = 1L) +#' daemons() +#' +#' daemons(0) +#' +#' } +#' +#' @export +#' +saisei <- function(i, .compute = "default") { + + length(..[[.compute]][["sockc"]]) || return() + query_nodes(..[[.compute]][["sockc"]], as.integer(i)) + +} + # internals -------------------------------------------------------------------- query_nodes <- function(sock, command) { - send(sock, data = if (is.integer(command)) command else 0L, mode = 2L) + send(sock, data = command, mode = 2L) recv(sock, mode = 1L, block = 1000L) } diff --git a/man/daemons.Rd b/man/daemons.Rd index 8873976f2..1c076b4b2 100644 --- a/man/daemons.Rd +++ b/man/daemons.Rd @@ -22,8 +22,8 @@ the client and ensures FIFO scheduling, queueing tasks if necessary using active dispatch and/or \code{\link{server}} if launching local daemons.} \item{.compute}{(optional) character compute profile to use for creating the -daemons (each compute profile can have its own set of daemons for -connecting to different resources).} +daemons (each compute profile has its own set of daemons for connecting +to different resources).} } \value{ Setting daemons: integer number of daemons set, or the character diff --git a/man/saisei.Rd b/man/saisei.Rd new file mode 100644 index 000000000..7f58f5e37 --- /dev/null +++ b/man/saisei.Rd @@ -0,0 +1,41 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/mirai.R +\name{saisei} +\alias{saisei} +\title{Saisei - Regenerate Token} +\usage{ +saisei(i, .compute = "default") +} +\arguments{ +\item{i}{integer \code{i}th daemon to replace.} + +\item{.compute}{(optional) character compute profile to use (each compute +profile has its own set of daemons for connecting to different resources).} +} +\value{ +The regenerated character URL upon success, or else NULL. +} +\description{ +When using daemons with a local dispatcher service, replaces an existing + socket at the dispatcher with a new one, listening to a URL incorporating + a newly-generated token. +} +\details{ +Care should be taken when calling this function as the specified + socket will be closed and replaced immediately without waiting for any + task in progress to finish. +} +\examples{ +if (interactive()) { +# Only run examples in interactive R sessions + +daemons(1, token = TRUE) +daemons() +saisei(i = 1L) +daemons() + +daemons(0) + +} + +} diff --git a/tests/tests.R b/tests/tests.R index 884e21be0..cfc827460 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -101,7 +101,10 @@ if (Sys.getenv("NOT_CRAN") == "true") { nanotest(is.matrix(status <- daemons()[["daemons"]])) nanotesto(status[, "status_online"]) nanotesto(status[, "instance #"]) - nanotest(is.character(daemons(,,1L)$daemons)) + nanotest(is.character(saisei(i = 1L))) + nanotest(is.matrix(status <- daemons()[["daemons"]])) + nanotestz(status[, "status_online"]) + nanotestz(status[, "instance #"]) nanotestz(daemons(0)) Sys.sleep(1L) }