Interactive async communication with mirai and nanonext #90
Replies: 4 comments 7 replies
-
Thanks! And fantastic to see such creative usage. I don't see anything overtly wrong with the example. I'll point out a couple of things: myipc <- mirai:::auto_tokenized_url() The above would be a more efficient constructor, and uses the best supported local transport (so for Linux it should be an abstract:// URL rather than ipc://). I could consider exporting if useful. This part is a bit of an anti-pattern though, unless you're actually doing work in the loop rather than just sleeping: # Print in the main process
message("Waiting for all jobs to finish...\n")
while (any(vapply(jobs, unresolved, logical(1)))) {
msleep(20)
} The most straightforward solution here is to move up this line to replace the above block: values <- lapply(jobs, \(x) call_aio_(x)$data) The As for signalling the end of an interaction, there is no canonical way - it is probably a little more efficient to just drop the connection and use pipe events Also if you ultimately decide to go down the {coro} or {async} route, do let me know your thinking - ideas for improvement are most welcome. Updated code block: library(mirai)
library(nanonext)
test_interactive_async <- function() {
# Create a single primary daemon to dispatch work to other daemons and
# call back to this parent process for user input.
daemons(1, dispatcher = FALSE, .compute = "prime", autoexit = tools::SIGTERM,
output = TRUE)
# Create a socket/context over which we communicate with the primary daemon. It is
# 'rep' type in that the daemon will make request and get an interactive reply
myipc <- mirai:::auto_tokenized_url()
sockA <- socket("rep", listen = myipc)
on.exit({close(sockA); daemons(0, .compute = "prime")}, add = TRUE)
# Send a task to the primary daemon to send a request back to this process
m <- mirai({
library(nanonext)
library(mirai)
# Create the secondary daemons that will take non-interactive tasks
daemons(2, dispatcher = TRUE, .compute = "workers", autoexit = tools::SIGTERM)
# Create the other end of the socket/context
sockB = socket("req", dial = myipc)
on.exit({close(sockB); daemons(0, .compute = "workers")}, add = TRUE)
# Send requests to workers
jobs <- replicate(
5,
mirai({
x <- rnorm(1)
Sys.sleep(1)
attr(x, "pid") <- Sys.getpid()
x
}, .compute = "workers")
)
on.exit(lapply(jobs, stop_mirai), add = TRUE, after = FALSE)
# Send the question to the interactive process
questions <- c("Please enter your first name: ", "Please enter your last name: ", "done")
replies <- lapply(questions, function(q) {
request(context(sockB), q)
})
# Wait for all to resolve
# Print in the main process
message("Waiting for all jobs to finish...\n")
values <- lapply(jobs, \(x) call_aio_(x)$data)
message("Jobs are done!")
# Extract the results and summarize
names <- vapply(replies, \(x) call_aio_(x)$data, character(1))
pids <- unique(vapply(values, \(x) attr(x, "pid"), integer(1)))
prime_pid <- Sys.getpid()
# Return it all together
greeting <- paste0("Hello, ", names[1], " ", names[2], "! Total value summed across ",
length(values), " jobs is ", sum(unlist(values)), ".",
" Run on workers ", paste(pids, collapse = ", "),
" and primary daemon ", prime_pid, ".")
greeting
}, .args = list(myipc = myipc), .compute = "prime")
# If the user interrupts interaction, process will continue unless stopped
on.exit(stop_mirai(m), add = TRUE)
signal <- ""
# Wait for and then reply to the requests from the primary daemon until done
while (signal != "done") {
reply(context(sockA), function(x) {
if (x != "done") {
return(readline(x))
} else {
signal <<- x
return("Over and out!")
}
})
}
# Output the results
call_mirai_(m)$data
}
test_interactive_async() |
Beta Was this translation helpful? Give feedback.
-
As another experiment, I dropped direct
Not quite requesting these as features, just thinking out loud as I go... :) |
Beta Was this translation helpful? Give feedback.
-
At a high level, I wonder if this problem would be easier to solve with a custom user interface rather than a custom network topology. For example, if a task needs input from the user, maybe it could quit early and defer the rest of the work to a later downstream task, and the downstream task could start after the user writes to a stream other than stdin. |
Beta Was this translation helpful? Give feedback.
-
I think with the latest addition of 'asyncdial' back to |
Beta Was this translation helpful? Give feedback.
-
A bit of praise for {mirai} and {nanonext} here. I am working on a project where there is a graph of async tasks to complete, some of which are requesting input from the user. I realized that in R, getting input from the user (via
readline()
) is blocking. This means that while the async coordinator is awaiting user input, other coordination is halted. Background processes running on daemons would continue, but the main evaluation loop could not. This means that after background tasks were finished, they would not kick off new tasks.I decided to try a single background worker as the main async coordinator, which itself would send async tasks to other workers or back to the main thread when user input was required. {mirai} and {nanonext} do this super well! Here's a proof of concept.
And I got:
But if I type quickly, I also get:
And watching
htop
, I briefly had 3 workers and a dispatcher node running and then they all shut down 👍 .Anything in this approach to improve, @shikokuchuo? (Or is there anything extraneous?) It took me a bit to realize I needed different context for reach request-reply pair to avoid them clobbering each other. I also was wondering if there was a canonical way for one end of the process to signal to the other that the interaction is finished, as I did with
signal = "done"
here.I'm still figuring out the async model to use. (I'm leaning towards @lionel-'s {coro}). But for pretty much any of them I think it should be easy enough to convert the request to the user to a promise/future/deferred value, using
as.promise.mirai
as a template.Beta Was this translation helpful? Give feedback.
All reactions