Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow daemons to raise a signal upon autoexit #88

Merged
merged 4 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/check-standard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ jobs:
http-user-agent: ${{ matrix.config.http-user-agent }}
use-public-rspm: true

- name: Add r-universe to repos
run: |
cat("\noptions(repos=c(DEV='https://shikokuchuo.r-universe.dev',CRAN ='https://cloud.r-project.org'))\n", file = "~/.Rprofile", append = TRUE)
shell: Rscript {0}

- uses: r-lib/actions/setup-r-dependencies@v2
with:
extra-packages: any::rcmdcheck
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/pkgdown.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ jobs:
with:
use-public-rspm: true

- name: Add r-universe to repos
run: |
cat("\noptions(repos=c(DEV='https://shikokuchuo.r-universe.dev',CRAN ='https://cloud.r-project.org'))\n", file = "~/.Rprofile", append = TRUE)
shell: Rscript {0}

- uses: r-lib/actions/setup-r-dependencies@v2
with:
extra-packages: any::pkgdown, local::.
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/test-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ jobs:
with:
use-public-rspm: true

- name: Add r-universe to repos
run: |
cat("\noptions(repos=c(DEV='https://shikokuchuo.r-universe.dev',CRAN ='https://cloud.r-project.org'))\n", file = "~/.Rprofile", append = TRUE)
shell: Rscript {0}

- uses: r-lib/actions/setup-r-dependencies@v2
with:
extra-packages: any::covr
Expand Down
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Encoding: UTF-8
Depends:
R (>= 3.5)
Imports:
nanonext (>= 0.11.0)
nanonext (>= 0.11.0.9000)
Enhances:
parallel,
promises
Expand Down
5 changes: 4 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# mirai 0.11.3.9000 (development)

* Allows supplying a signal value for the 'autoexit' argument of daemon() to raise it upon exit.
* Requires nanonext >= [0.11.0.9000].

# mirai 0.11.3

* Implements `serialization()` for registering custom serialization and unserialization functions when using daemons.
Expand All @@ -13,7 +16,7 @@
* Cluster node failures during load balanced operations now rely on the 'parallel' mechanism to error and no longer fail early or automatically stop the cluster.
* Fixes regression since 0.11.0 which prevented dispatcher exiting in a timely manner when tasks are backlogged (thanks @wlandau #86).
* Improved memory efficiency and stability at dispatcher.
* No longer loads the 'promises' package if not already loaded (but makes the 'mirai' method available via a hook function).
* No longer loads the 'promises' package if not already loaded (but makes the 'mirai' method available via a hook function).
* Requires nanonext >= 0.11.0.

# mirai 0.11.2
Expand Down
17 changes: 12 additions & 5 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param autoexit [default TRUE] logical value, whether the daemon should
#' exit automatically when its socket connection ends (see 'Persistence'
#' exit automatically when its socket connection ends, or else an integer
#' signal value to additionally raise this upon exit (see 'Persistence'
#' section below).
#' @param cleanup [default TRUE] logical value, whether to perform cleanup of
#' the global environment and restore loaded packages and options to an
Expand Down Expand Up @@ -70,15 +71,21 @@
#' @section Persistence:
#'
#' The 'autoexit' argument governs persistence settings for the daemon. The
#' default TRUE ensures that it will exit cleanly under all circumstances
#' once its socket connection has ended.
#' default TRUE ensures that it will exit cleanly once its socket connection
#' has ended.
#'
#' Setting to FALSE allows the daemon to persist indefinitely even when
#' there is no longer a socket connection. This allows a host session to end
#' and a new session to connect at the URL where the daemon is dialled in.
#' Daemons must be terminated with \code{daemons(NULL)} in this case, which
#' sends an exit signal to all connected daemons.
#'
#' Supplying a signal from the \pkg{tools} package, e.g. \code{tools::SIGINT},
#' or an equivalent integer value, sets this signal to be raised when the
#' socket connection ends. As an example, supplying SIGINT allows a
#' potentially more immediate exit by interrupting any ongoing evaluation
#' rather than letting it complete.
#'
#' Persistence also implies that dials are performed asynchronously, which
#' means retries are attempted (indefinitely) if not immediately successful.
#' This is resilient behaviour but can mask potential connection issues.
Expand Down Expand Up @@ -107,7 +114,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
cv <- cv()
sock <- socket(protocol = "rep")
on.exit(reap(sock))
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = TRUE)
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = as.integer(autoexit))
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock = sock, url = url, asyncdial = !autoexit, tls = tls)

Expand Down Expand Up @@ -145,7 +152,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
error = mk_mirai_error, interrupt = mk_interrupt_error)
count <- count + 1L

(count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
{ count >= maxtasks || count > timerstart && mclock() - start >= walltime } && {
next_config(mark = TRUE)
send(ctx, data = data, mode = 3L)
aio <- recv_aio_signal(ctx, cv = cv, mode = 8L)
Expand Down
47 changes: 12 additions & 35 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,19 @@
#'
#' By default \code{dispatcher = TRUE}. This launches a background process
#' running \code{\link{dispatcher}}. Dispatcher connects to daemons on
#' behalf of the host and queues tasks until a daemon is able to begin
#' immediate execution of that task, ensuring FIFO scheduling. Dispatcher
#' uses synchronisation primitives from \code{nanonext}, waiting rather than
#' polling for tasks, which is efficient both in terms of consuming no
#' resources while waiting, and also being fully synchronised with events
#' (having no latency).
#' behalf of the host and ensures FIFO scheduling of tasks. Dispatcher uses
#' synchronisation primitives from \pkg{nanonext}, waiting rather than
#' polling for tasks, which is both efficient (no resource usage) and fully
#' event-driven (having no latency).
#'
#' By specifying \code{dispatcher = FALSE}, daemons connect to the host
#' directly rather than through dispatcher. The host sends tasks to
#' connected daemons immediately in an evenly-distributed fashion. However,
#' optimal scheduling is not guaranteed as the duration of tasks cannot be
#' known \emph{a priori}, such that tasks can be queued at a daemon behind
#' a long-running task while other daemons remain idle. Nevertheless, this
#' provides a resource-light approach suited to working with similar-length
#' tasks, or where concurrent tasks typically do not exceed available
#' daemons.
#' known \emph{a priori}, such that tasks can be queued at one daemon while
#' other daemons remain idle. Nevertheless, this provides a resource-light
#' approach suited to working with similar-length tasks, or where concurrent
#' tasks typically do not exceed available daemons.
#'
#' @section Distributed Computing:
#'
Expand Down Expand Up @@ -167,8 +164,8 @@
#' numbers / paths. In this case it is optional to supply 'n' as this can
#' be inferred by the length of vector supplied.
#'
#' Individual daemons then dial in to each of these host URLs, and at most
#' one daemon should be dialled into each URL at any given time.
#' Individual daemons then dial in to each of these host URLs. At most one
#' daemon can be dialled into each URL at any given time.
#'
#' Dispatcher automatically adjusts to the number of daemons actually
#' connected. Hence it is possible to dynamically scale up or down the
Expand Down Expand Up @@ -203,9 +200,9 @@
#'
#' \strong{local / remote} daemons may be set with a host URL and specifying
#' '.compute' as 'remote', which creates a new compute profile. Subsequent
#' mirai calls may then be sent for local computation by not specifying its
#' mirai calls may then be sent for local computation by not specifying the
#' '.compute' argument, or for remote computation to connected daemons by
#' specifying its '.compute' argument as 'remote'.
#' specifying the '.compute' argument as 'remote'.
#'
#' \strong{cpu / gpu} some tasks may require access to different types of
#' daemon, such as those with GPUs. In this case, \code{daemons()} may be
Expand All @@ -217,26 +214,6 @@
#' Note: further actions such as resetting daemons via \code{daemons(0)}
#' should be carried out with the desired '.compute' argument specified.
#'
#' @section Everywhere:
#'
#' \code{\link{everywhere}} evaluates an expression on all connected daemons
#' and persists the resultant state. This is designed for setting up the
#' evaluation environment, with particular packages loaded, or common
#' resources made available, etc.
#'
#' @section Timeouts:
#'
#' Specifying the \code{.timeout} argument in \code{\link{mirai}} will
#' ensure that the 'mirai' always resolves.
#'
#' However, the task may not have completed and still be ongoing in the
#' daemon process. In such situations, dispatcher ensures that queued tasks
#' are not assigned to the busy process, however overall performance may
#' still be degraded if they remain in use. If a process hangs and cannot be
#' restarted manually, \code{\link{saisei}} specifying \code{force = TRUE}
#' may be used to cancel the task and regenerate any particular URL for a
#' new \code{\link{daemon}} to connect to.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
Expand Down
13 changes: 13 additions & 0 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,19 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
#' task that consistently hangs or crashes to prevent it from failing
#' repeatedly when new daemons connect.
#'
#' @section Timeouts:
#'
#' Specifying the '.timeout' argument to \code{\link{mirai}} ensures that
#' the 'mirai' always resolves. However, the task may not have completed and
#' still be ongoing in the daemon process. In such situations, dispatcher
#' ensures that queued tasks are not assigned to the busy process, however
#' overall performance may still be degraded if they remain in use.
#'
#' If a process hangs and cannot be restarted otherwise, \code{saisei}
#' specifying \code{force = TRUE} may be used to cancel the task and
#' regenerate any particular URL for a new \code{\link{daemon}} to connect
#' to.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
Expand Down
13 changes: 10 additions & 3 deletions man/daemon.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 12 additions & 39 deletions man/daemons.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions man/saisei.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ if (connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN")
nanotestn(unlist(serialization(NULL)))
Sys.sleep(1L)
option <- 15L
nanotesto(daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option))
nanotesto(daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option, autoexit = tools::SIGCONT))
Sys.sleep(1L)
mq <- mirai("daemon", .timeout = 1000)
nanotest(call_mirai(mq)$data == "daemon" || is_error_value(mq$data))
Expand Down
Loading