Skip to content

Commit

Permalink
prevent backlogged workers
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Sep 19, 2023
1 parent 4c70007 commit 51ec228
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.10.0.9001
Version: 0.10.0.9002
Description: Lightweight parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand All @@ -23,5 +23,5 @@ Encoding: UTF-8
Depends:
R (>= 3.5)
Imports:
nanonext (>= 0.10.0.9005)
nanonext (>= 0.10.0.9006)
RoxygenNote: 7.2.3
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# mirai 0.10.0.9001 (development)
# mirai 0.10.0.9002 (development)

* Ephemeral daemons now synchronise with the host process and exit as soon as the completed mirai task is received, without an 'exitlinger' period.
* Optimises scheduling at dispatcher - tasks are no longer assigned to a server if it is exiting due to specified time/task-outs.
* Deprecated use of alias `server()` for `daemon()` is retired.
* Requires nanonext >= [0.10.0.9005].

Expand Down
1 change: 1 addition & 0 deletions R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,5 @@ NULL
),
hash = TRUE
)
.seven <- as.raw(7L)
.timelimit <- 5000L
22 changes: 16 additions & 6 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ daemon <- function(url, asyncdial = FALSE, maxtasks = Inf, idletime = Inf,
se <- search()
count <- 0L
start <- mclock()
while (count < maxtasks && mclock() - start < walltime) {

repeat {

ctx <- .context(sock)
aio <- recv_aio_signal(ctx, cv = cv, mode = 1L, timeout = idletime)
Expand All @@ -120,14 +121,20 @@ daemon <- function(url, asyncdial = FALSE, maxtasks = Inf, idletime = Inf,
}
data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
error = mk_mirai_error, interrupt = mk_interrupt_error)
count <- count + 1L

if (count >= maxtasks || (count > timerstart && mclock() - start >= walltime)) {
send(ctx, data = data, mode = 3L)
break;
}

send(ctx, data = data, mode = 1L)

if (cleanup %% 2L) rm(list = (vars <- names(.GlobalEnv))[vars != ".Random.seed"], envir = .GlobalEnv)
if (clr & as.raw(2L)) lapply((new <- search())[!new %in% se], detach, unload = TRUE, character.only = TRUE)
if (clr & as.raw(4L)) options(op)
if (clr & as.raw(8L)) gc(verbose = FALSE)
if (count < timerstart) start <- mclock()
count <- count + 1L
if (count <= timerstart) start <- mclock()

}

Expand Down Expand Up @@ -311,8 +318,10 @@ dispatcher <- function(host, url = NULL, n = NULL, asyncdial = FALSE,
activevec <- cv_values %% 2L
changes <- (activevec - activestore) > 0L
activestore <- activevec
if (any(changes))
if (any(changes)) {
instance[changes] <- abs(instance[changes]) + 1L
serverfree <- serverfree | changes
}

ctrchannel && !unresolved(cmessage) && {
i <- .subset2(cmessage, "data")
Expand Down Expand Up @@ -348,9 +357,10 @@ dispatcher <- function(host, url = NULL, n = NULL, asyncdial = FALSE,

for (i in seq_n)
if (length(queue[[i]]) > 2L && !unresolved(queue[[i]][["res"]])) {
send(queue[[i]][["ctx"]], data = queue[[i]][["res"]], mode = 2L)
req <- queue[[i]][["res"]]
send(queue[[i]][["ctx"]], data = req, mode = 2L)
q <- queue[[i]][["daemon"]]
serverfree[[q]] <- TRUE
serverfree[[q]] <- parent.env(req)[["result"]][[1L]] != .seven
complete[[q]] <- complete[[q]] + 1L
ctx <- .context(sock)
req <- recv_aio_signal(ctx, cv = cv, mode = 8L)
Expand Down

0 comments on commit 51ec228

Please sign in to comment.