Skip to content

Commit

Permalink
Merge pull request #88 from shikokuchuo/signal
Browse files Browse the repository at this point in the history
Allow daemons to raise a signal upon autoexit
  • Loading branch information
shikokuchuo authored Dec 12, 2023
2 parents 3b6b2a2 + 2d365d8 commit 041b99a
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 85 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/check-standard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ jobs:
http-user-agent: ${{ matrix.config.http-user-agent }}
use-public-rspm: true

- name: Add r-universe to repos
run: |
cat("\noptions(repos=c(DEV='https://shikokuchuo.r-universe.dev',CRAN ='https://cloud.r-project.org'))\n", file = "~/.Rprofile", append = TRUE)
shell: Rscript {0}

- uses: r-lib/actions/setup-r-dependencies@v2
with:
extra-packages: any::rcmdcheck
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/pkgdown.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ jobs:
with:
use-public-rspm: true

- name: Add r-universe to repos
run: |
cat("\noptions(repos=c(DEV='https://shikokuchuo.r-universe.dev',CRAN ='https://cloud.r-project.org'))\n", file = "~/.Rprofile", append = TRUE)
shell: Rscript {0}

- uses: r-lib/actions/setup-r-dependencies@v2
with:
extra-packages: any::pkgdown, local::.
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/test-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ jobs:
with:
use-public-rspm: true

- name: Add r-universe to repos
run: |
cat("\noptions(repos=c(DEV='https://shikokuchuo.r-universe.dev',CRAN ='https://cloud.r-project.org'))\n", file = "~/.Rprofile", append = TRUE)
shell: Rscript {0}

- uses: r-lib/actions/setup-r-dependencies@v2
with:
extra-packages: any::covr
Expand Down
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Encoding: UTF-8
Depends:
R (>= 3.5)
Imports:
nanonext (>= 0.11.0)
nanonext (>= 0.11.0.9000)
Enhances:
parallel,
promises
Expand Down
5 changes: 4 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# mirai 0.11.3.9000 (development)

* Allows supplying a signal value for the 'autoexit' argument of daemon() to raise it upon exit.
* Requires nanonext >= [0.11.0.9000].

# mirai 0.11.3

* Implements `serialization()` for registering custom serialization and unserialization functions when using daemons.
Expand All @@ -13,7 +16,7 @@
* Cluster node failures during load balanced operations now rely on the 'parallel' mechanism to error and no longer fail early or automatically stop the cluster.
* Fixes regression since 0.11.0 which prevented dispatcher exiting in a timely manner when tasks are backlogged (thanks @wlandau #86).
* Improved memory efficiency and stability at dispatcher.
* No longer loads the 'promises' package if not already loaded (but makes the 'mirai' method available via a hook function).
* No longer loads the 'promises' package if not already loaded (but makes the 'mirai' method available via a hook function).
* Requires nanonext >= 0.11.0.

# mirai 0.11.2
Expand Down
17 changes: 12 additions & 5 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param autoexit [default TRUE] logical value, whether the daemon should
#' exit automatically when its socket connection ends (see 'Persistence'
#' exit automatically when its socket connection ends, or else an integer
#' signal value to additionally raise this upon exit (see 'Persistence'
#' section below).
#' @param cleanup [default TRUE] logical value, whether to perform cleanup of
#' the global environment and restore loaded packages and options to an
Expand Down Expand Up @@ -70,15 +71,21 @@
#' @section Persistence:
#'
#' The 'autoexit' argument governs persistence settings for the daemon. The
#' default TRUE ensures that it will exit cleanly under all circumstances
#' once its socket connection has ended.
#' default TRUE ensures that it will exit cleanly once its socket connection
#' has ended.
#'
#' Setting to FALSE allows the daemon to persist indefinitely even when
#' there is no longer a socket connection. This allows a host session to end
#' and a new session to connect at the URL where the daemon is dialled in.
#' Daemons must be terminated with \code{daemons(NULL)} in this case, which
#' sends an exit signal to all connected daemons.
#'
#' Supplying a signal from the \pkg{tools} package, e.g. \code{tools::SIGINT},
#' or an equivalent integer value, sets this signal to be raised when the
#' socket connection ends. As an example, supplying SIGINT allows a
#' potentially more immediate exit by interrupting any ongoing evaluation
#' rather than letting it complete.
#'
#' Persistence also implies that dials are performed asynchronously, which
#' means retries are attempted (indefinitely) if not immediately successful.
#' This is resilient behaviour but can mask potential connection issues.
Expand Down Expand Up @@ -107,7 +114,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
cv <- cv()
sock <- socket(protocol = "rep")
on.exit(reap(sock))
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = TRUE)
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = as.integer(autoexit))
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock = sock, url = url, asyncdial = !autoexit, tls = tls)

Expand Down Expand Up @@ -145,7 +152,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
error = mk_mirai_error, interrupt = mk_interrupt_error)
count <- count + 1L

(count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
{ count >= maxtasks || count > timerstart && mclock() - start >= walltime } && {
next_config(mark = TRUE)
send(ctx, data = data, mode = 3L)
aio <- recv_aio_signal(ctx, cv = cv, mode = 8L)
Expand Down
47 changes: 12 additions & 35 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,19 @@
#'
#' By default \code{dispatcher = TRUE}. This launches a background process
#' running \code{\link{dispatcher}}. Dispatcher connects to daemons on
#' behalf of the host and queues tasks until a daemon is able to begin
#' immediate execution of that task, ensuring FIFO scheduling. Dispatcher
#' uses synchronisation primitives from \code{nanonext}, waiting rather than
#' polling for tasks, which is efficient both in terms of consuming no
#' resources while waiting, and also being fully synchronised with events
#' (having no latency).
#' behalf of the host and ensures FIFO scheduling of tasks. Dispatcher uses
#' synchronisation primitives from \pkg{nanonext}, waiting rather than
#' polling for tasks, which is both efficient (no resource usage) and fully
#' event-driven (having no latency).
#'
#' By specifying \code{dispatcher = FALSE}, daemons connect to the host
#' directly rather than through dispatcher. The host sends tasks to
#' connected daemons immediately in an evenly-distributed fashion. However,
#' optimal scheduling is not guaranteed as the duration of tasks cannot be
#' known \emph{a priori}, such that tasks can be queued at a daemon behind
#' a long-running task while other daemons remain idle. Nevertheless, this
#' provides a resource-light approach suited to working with similar-length
#' tasks, or where concurrent tasks typically do not exceed available
#' daemons.
#' known \emph{a priori}, such that tasks can be queued at one daemon while
#' other daemons remain idle. Nevertheless, this provides a resource-light
#' approach suited to working with similar-length tasks, or where concurrent
#' tasks typically do not exceed available daemons.
#'
#' @section Distributed Computing:
#'
Expand Down Expand Up @@ -167,8 +164,8 @@
#' numbers / paths. In this case it is optional to supply 'n' as this can
#' be inferred by the length of vector supplied.
#'
#' Individual daemons then dial in to each of these host URLs, and at most
#' one daemon should be dialled into each URL at any given time.
#' Individual daemons then dial in to each of these host URLs. At most one
#' daemon can be dialled into each URL at any given time.
#'
#' Dispatcher automatically adjusts to the number of daemons actually
#' connected. Hence it is possible to dynamically scale up or down the
Expand Down Expand Up @@ -203,9 +200,9 @@
#'
#' \strong{local / remote} daemons may be set with a host URL and specifying
#' '.compute' as 'remote', which creates a new compute profile. Subsequent
#' mirai calls may then be sent for local computation by not specifying its
#' mirai calls may then be sent for local computation by not specifying the
#' '.compute' argument, or for remote computation to connected daemons by
#' specifying its '.compute' argument as 'remote'.
#' specifying the '.compute' argument as 'remote'.
#'
#' \strong{cpu / gpu} some tasks may require access to different types of
#' daemon, such as those with GPUs. In this case, \code{daemons()} may be
Expand All @@ -217,26 +214,6 @@
#' Note: further actions such as resetting daemons via \code{daemons(0)}
#' should be carried out with the desired '.compute' argument specified.
#'
#' @section Everywhere:
#'
#' \code{\link{everywhere}} evaluates an expression on all connected daemons
#' and persists the resultant state. This is designed for setting up the
#' evaluation environment, with particular packages loaded, or common
#' resources made available, etc.
#'
#' @section Timeouts:
#'
#' Specifying the \code{.timeout} argument in \code{\link{mirai}} will
#' ensure that the 'mirai' always resolves.
#'
#' However, the task may not have completed and still be ongoing in the
#' daemon process. In such situations, dispatcher ensures that queued tasks
#' are not assigned to the busy process, however overall performance may
#' still be degraded if they remain in use. If a process hangs and cannot be
#' restarted manually, \code{\link{saisei}} specifying \code{force = TRUE}
#' may be used to cancel the task and regenerate any particular URL for a
#' new \code{\link{daemon}} to connect to.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
Expand Down
13 changes: 13 additions & 0 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,19 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
#' task that consistently hangs or crashes to prevent it from failing
#' repeatedly when new daemons connect.
#'
#' @section Timeouts:
#'
#' Specifying the '.timeout' argument to \code{\link{mirai}} ensures that
#' the 'mirai' always resolves. However, the task may not have completed and
#' still be ongoing in the daemon process. In such situations, dispatcher
#' ensures that queued tasks are not assigned to the busy process, however
#' overall performance may still be degraded if they remain in use.
#'
#' If a process hangs and cannot be restarted otherwise, \code{saisei}
#' specifying \code{force = TRUE} may be used to cancel the task and
#' regenerate any particular URL for a new \code{\link{daemon}} to connect
#' to.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
Expand Down
13 changes: 10 additions & 3 deletions man/daemon.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 12 additions & 39 deletions man/daemons.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions man/saisei.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ if (connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN")
nanotestn(unlist(serialization(NULL)))
Sys.sleep(1L)
option <- 15L
nanotesto(daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option))
nanotesto(daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option, autoexit = tools::SIGCONT))
Sys.sleep(1L)
mq <- mirai("daemon", .timeout = 1000)
nanotest(call_mirai(mq)$data == "daemon" || is_error_value(mq$data))
Expand Down

0 comments on commit 041b99a

Please sign in to comment.