Skip to content

Commit

Permalink
implements saisei()
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Apr 4, 2023
1 parent 16c0e46 commit c265c46
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 17 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.8.2.9001
Version: 0.8.2.9002
Description: Lightweight parallel code execution, local or distributed across
the network. Designed for simplicity, a 'mirai' evaluates an arbitrary
expression asynchronously, resolving automatically upon completion.
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export(is_mirai_error)
export(is_mirai_interrupt)
export(launch)
export(mirai)
export(saisei)
export(server)
export(stop_mirai)
export(unresolved)
Expand All @@ -26,6 +27,7 @@ importFrom(nanonext,cv)
importFrom(nanonext,cv_value)
importFrom(nanonext,is_error_value)
importFrom(nanonext,listen)
importFrom(nanonext,lock)
importFrom(nanonext,mclock)
importFrom(nanonext,msleep)
importFrom(nanonext,opt)
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# mirai 0.8.2.9001 (development)
# mirai 0.8.2.9002 (development)

* `dispatcher()` gains the argument 'token' for appending a unique token to each URL the dispatcher listens at.
* `saisei()` implemented to regenerate the token used by a given dispatcher socket.
* Internal performance enhancements.

# mirai 0.8.2
Expand Down
2 changes: 1 addition & 1 deletion R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
#' @author Charlie Gao \email{charlie.gao@@shikokuchuo.net}
#' (\href{https://orcid.org/0000-0002-0750-061X}{ORCID})
#'
#' @importFrom nanonext call_aio context cv cv_value is_error_value listen
#' @importFrom nanonext call_aio context cv cv_value is_error_value listen lock
#' mclock msleep opt parse_url pipe_notify random recv recv_aio_signal
#' request request_signal send sha1 socket stat stop_aio unresolved wait
#' weakref<-
Expand Down
67 changes: 56 additions & 11 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ dispatcher <- function(client, url = NULL, n = NULL, asyncdial = TRUE,
}

for (i in seq_n) {
nurl <- if (auto) sprintf(.urlfmt, random()) else
nurl <- if (auto) sprintf(.urlfmt, if (token) i else random()) else
if (vectorised) url[i] else
if (is.null(ports)) sprintf("%s/%d", url, i) else
sub(ports[1L], ports[i], url, fixed = TRUE)
Expand All @@ -228,6 +228,7 @@ dispatcher <- function(client, url = NULL, n = NULL, asyncdial = TRUE,
ncv <- cv()
pipe_notify(nsock, cv = ncv, cv2 = cv, flag = FALSE) && stop()
listen(nsock, url = nurl, error = TRUE)
if (token) lock(nsock, cv = ncv)
if (i == 1L && !auto && parse_url(opt(attr(nsock, "listener")[[1L]], "url"))[["port"]] == "0") {
realport <- opt(attr(nsock, "listener")[[1L]], "tcp-bound-port")
nurl <- sub("(?<=:)0(?![^/])", realport, nurl, perl = TRUE)
Expand Down Expand Up @@ -273,12 +274,18 @@ dispatcher <- function(client, url = NULL, n = NULL, asyncdial = TRUE,
ctrchannel && !unresolved(cmessage) && {
i <- .subset2(cmessage, "data")
if (i) {
close(servers[[i]])
servers[[i]] <- socket(protocol = "req")
active[[i]] <- cv()
pipe_notify(servers[[i]], cv = active[[i]], cv2 = cv, flag = FALSE) && stop()
data <- servernames[i] <- append_token(basenames[i])
listen(servers[[i]], url = data, error = TRUE)
if (i > 0 && i <= n) {
close(servers[[i]])
servers[[i]] <- socket(protocol = "req")
active[[i]] <- cv()
pipe_notify(servers[[i]], cv = active[[i]], cv2 = cv, flag = FALSE) && stop()
data <- servernames[i] <- append_token(basenames[i])
listen(servers[[i]], url = data, error = TRUE)
if (token) lock(servers[[i]], cv = active[[i]])
} else {
data <- NULL
}

} else {
data <- `attributes<-`(c(activevec, assigned - complete, assigned, complete, instance),
list(dim = c(n, 5L), dimnames = list(servernames, statnames)))
Expand Down Expand Up @@ -452,8 +459,8 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau
#' @param ... additional arguments passed through to \code{\link{dispatcher}} if
#' using active dispatch and/or \code{\link{server}} if launching local daemons.
#' @param .compute (optional) character compute profile to use for creating the
#' daemons (each compute profile can have its own set of daemons for
#' connecting to different resources).
#' daemons (each compute profile has its own set of daemons for connecting
#' to different resources).
#'
#' @return Setting daemons: integer number of daemons set, or the character
#' client URL.
Expand Down Expand Up @@ -643,7 +650,7 @@ daemons <- function(n, url = NULL, dispatcher = TRUE, ..., .compute = "default")

missing(n) && missing(url) &&
return(list(connections = if (length(..[[.compute]][["sock"]])) stat(..[[.compute]][["sock"]], "pipes") else 0,
daemons = if (length(..[[.compute]][["sockc"]])) query_nodes(..[[.compute]][["sockc"]], dispatcher) else
daemons = if (length(..[[.compute]][["sockc"]])) query_nodes(..[[.compute]][["sockc"]], 0L) else
if (length(..[[.compute]][["proc"]])) ..[[.compute]][["proc"]] else 0L))

if (is.null(..[[.compute]])) `[[<-`(.., .compute, new.env(hash = FALSE, parent = environment(daemons)))
Expand Down Expand Up @@ -974,10 +981,48 @@ print.miraiInterrupt <- function(x, ...) {
launch <- function(args)
system2(command = .command, args = c("-e", shQuote(args)), stdout = NULL, stderr = NULL, wait = FALSE)

#' Saisei - Regenerate Token
#'
#' When using daemons with a local dispatcher service, replaces an existing
#' socket at the dispatcher with a new one, listening to a URL incorporating
#' a newly-generated token.
#'
#' @param i integer \code{i}th daemon to replace.
#' @param .compute (optional) character compute profile to use (each compute
#' profile has its own set of daemons for connecting to different resources).
#'
#' @return The regenerated character URL upon success, or else NULL.
#'
#' @details Care should be taken when calling this function as the specified
#' socket will be closed and replaced immediately without waiting for any
#' task in progress to finish.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' daemons(1, token = TRUE)
#' daemons()
#' saisei(i = 1L)
#' daemons()
#'
#' daemons(0)
#'
#' }
#'
#' @export
#'
saisei <- function(i, .compute = "default") {

length(..[[.compute]][["sockc"]]) || return()
query_nodes(..[[.compute]][["sockc"]], as.integer(i))

}

# internals --------------------------------------------------------------------

query_nodes <- function(sock, command) {
send(sock, data = if (is.integer(command)) command else 0L, mode = 2L)
send(sock, data = command, mode = 2L)
recv(sock, mode = 1L, block = 1000L)
}

Expand Down
4 changes: 2 additions & 2 deletions man/daemons.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions man/saisei.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ if (Sys.getenv("NOT_CRAN") == "true") {
nanotest(is.matrix(status <- daemons()[["daemons"]]))
nanotesto(status[, "status_online"])
nanotesto(status[, "instance #"])
nanotest(is.character(daemons(,,1L)$daemons))
nanotest(is.character(saisei(i = 1L)))
nanotest(is.matrix(status <- daemons()[["daemons"]]))
nanotestz(status[, "status_online"])
nanotestz(status[, "instance #"])
nanotestz(daemons(0))
Sys.sleep(1L)
}
Expand Down

0 comments on commit c265c46

Please sign in to comment.