diff --git a/DESCRIPTION b/DESCRIPTION index c298d121d..bd6ea1135 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 0.7.2.9027 +Version: 0.7.2.9028 Description: Lightweight parallel code execution, local or distributed across the network. Designed for simplicity, a 'mirai' evaluates an arbitrary expression asynchronously, resolving automatically upon completion. diff --git a/NEWS.md b/NEWS.md index 1546bbf2c..0afc5edf3 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# mirai 0.7.2.9027 (development) +# mirai 0.7.2.9028 (development) * mirai 0.8.0 is a major feature release. Special thanks to @wlandau for suggestions, discussion and testing for many of the new capabilities. * Compute profiles have been introduced through a new `.compute` argument in `daemons()` and `mirai()` for sending tasks with heterogeneous compute requirements. diff --git a/R/mirai.R b/R/mirai.R index 8ec0afd81..bbd713526 100644 --- a/R/mirai.R +++ b/R/mirai.R @@ -90,7 +90,7 @@ server <- function(url, nodes = NULL, idletime = Inf, walltime = Inf, tasklimit vectorised <- length(url) == nodes + 2L seq_nodes <- seq_len(nodes) servernames <- character(nodes) - complete <- assigned <- integer(nodes) + activestore <- complete <- assigned <- integer(nodes) serverfree <- !integer(nodes) servers <- queue <- vector(mode = "list", length = nodes) @@ -140,8 +140,11 @@ server <- function(url, nodes = NULL, idletime = Inf, walltime = Inf, tasklimit while (count < tasklimit && mclock() - start < walltime && if (idle) mclock() - idle < idletime else TRUE) { activevec <- as.integer(unlist(lapply(servers, stat, "pipes"))) - assigned <- activevec * assigned - complete <- activevec * complete + newcon <- as.logical(pmax.int(activevec - activestore, 0L)) + activestore <- activevec + assigned[newcon] <- 0L + complete[newcon] <- 0L + active <- sum(activevec) free <- which(serverfree & activevec) if (length(free) == active) { @@ -379,7 +382,7 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau #' running a passive queue.} #' \item{\code{nodes}} {- a matrix of URL, active (connected) and busy #' status, as well as cumulative tasks assigned and completed (reset if a -#' node disconnects), or else NA if not running an active queue.} +#' node re-connects), or else NA if not running an active queue.} #' } #' #' @details Use \code{daemons(0)} to reset all daemon connections at any time. diff --git a/man/daemons.Rd b/man/daemons.Rd index 4c70a03f4..1d8dcd4fa 100644 --- a/man/daemons.Rd +++ b/man/daemons.Rd @@ -39,7 +39,7 @@ Setting daemons: integer number of daemons set, or the character running a passive queue.} \item{\code{nodes}} {- a matrix of URL, active (connected) and busy status, as well as cumulative tasks assigned and completed (reset if a - node disconnects), or else NA if not running an active queue.} + node re-connects), or else NA if not running an active queue.} } } \description{