Skip to content

Commit

Permalink
Fix #108
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Nov 30, 2023
1 parent eee4423 commit 57341c0
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 7 deletions.
6 changes: 6 additions & 0 deletions R/crew_controller_group.R
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,16 @@ crew_class_controller_group <- R6::R6Class(
seconds_timeout,
scale
) {
if (sum(map_int(controllers, ~length(.x$tasks))) < 1L) {
return(FALSE)
}
envir <- new.env(parent = emptyenv())
envir$result <- FALSE
crew_retry(
fun = ~{
if (scale) {
walk(controllers, ~.x$scale())
}
if (self$relay$wait_unpopped(seconds_timeout = seconds_interval)) {
for (controller in controllers) {
if (controller$client$relay$wait_unpopped(seconds_timeout = 0)) {
Expand Down
6 changes: 3 additions & 3 deletions R/crew_relay.R
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ crew_class_relay <- R6::R6Class(
#' @return `NULL` (invisibly).
#' @param seconds_timeout Positive numeric of length 1,
#' Number of seconds to wait before timing out.
wait_condition = function(seconds_timeout = Inf) {
wait_condition = function(seconds_timeout = 1e9) {
timeout <- seconds_timeout * 1000
condition <- .subset2(self, "condition")
result <- FALSE
Expand All @@ -116,7 +116,7 @@ crew_class_relay <- R6::R6Class(
#' `FALSE` otherwise.
#' @param seconds_timeout Positive numeric of length 1,
#' Number of seconds to wait before timing out.
wait_unpopped = function(seconds_timeout = Inf) {
wait_unpopped = function(seconds_timeout = 1e9) {
if (.subset2(self, "unpopped") < 1L) {
.subset2(self, "wait_condition")(seconds_timeout = seconds_timeout)
}
Expand All @@ -131,7 +131,7 @@ crew_class_relay <- R6::R6Class(
#' Number of seconds to wait before timing out.
#' @param resolved Positive integer of length 1. This method waits
#' until the number of resolved tasks reaches this value or above.
wait_resolved = function(seconds_timeout = Inf, resolved = 1L) {
wait_resolved = function(seconds_timeout = 1e9, resolved = 1L) {
if (.subset2(self, "resolved")() < resolved) {
.subset2(self, "wait_condition")(seconds_timeout = seconds_timeout)
}
Expand Down
6 changes: 3 additions & 3 deletions man/crew_class_relay.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions tests/interactive/test-crew_relay.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
library(crew)
library(testthat)

crew_test("relay wait_condition() timeout condition", {
x <- crew_relay()
cv <- nanonext::cv()
Expand Down
118 changes: 118 additions & 0 deletions tests/interactive/test-wait.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
library(crew)
library(testthat)

crew_test("single controller, no tasks", {
for (group in c(FALSE, TRUE)) {
y <- crew_controller_local()
x <- if_any(group, crew_controller_group(y), y)
on.exit({
x$terminate()
crew_test_sleep()
})
x$start()
time <- system.time(
expect_true(
x$wait(
mode = "all",
seconds_interval = 5,
seconds_timeout = Inf
)
)
)["elapsed"]
expect_true(time < 1)
time <- system.time(
expect_false(
x$wait(
mode = "one",
seconds_interval = 5,
seconds_timeout = Inf
)
)
)["elapsed"]
expect_true(time < 1)
x$terminate()
crew_test_sleep()
}
})

crew_test("single controller, all tasks already done", {
for (group in c(FALSE, TRUE)) {
y <- crew_controller_local()
x <- if_any(group, crew_controller_group(y), y)
on.exit({
x$terminate()
crew_test_sleep()
})
x$start()
x$push(TRUE)
x$wait()
for (mode in c("all", "one")) {
time <- system.time(
expect_true(
x$wait(
mode = mode,
seconds_interval = 5,
seconds_timeout = Inf
)
)
)["elapsed"]
expect_true(time < 1)
}
x$terminate()
crew_test_sleep()
}
})

crew_test("single controller, one long task, time out", {
for (group in c(FALSE, TRUE)) {
y <- crew_controller_local()
x <- if_any(group, crew_controller_group(y), y)
on.exit({
x$terminate()
crew_test_sleep()
})
x$start()
x$push(Sys.sleep(10))
for (mode in c("all", "one")) {
time <- system.time(
expect_false(
x$wait(
mode = mode,
seconds_interval = 0.1,
seconds_timeout = 0.2
)
)
)["elapsed"]
expect_true(time < 1)
}
x$terminate()
crew_test_sleep()
}
})

crew_test("single controller, one long task, wait all, full wait", {
for (group in c(FALSE, TRUE)) {
for (mode in c("all", "one")) {
y <- crew_controller_local()
x <- if_any(group, crew_controller_group(y), y)
on.exit({
x$terminate()
crew_test_sleep()
})
x$start()
x$push(Sys.sleep(5))
time <- system.time(
expect_true(
x$wait(
mode = mode,
seconds_interval = 0.5,
seconds_timeout = Inf
)
)
)["elapsed"]
expect_true(time > 4)
x$terminate()
crew_test_sleep()
}
}
})
24 changes: 23 additions & 1 deletion tests/testthat/test-crew_controller_group.R
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ crew_test("crew_controller_group() wait one", {
crew_test_sleep()
})
x$start()
expect_false(x$wait(mode = "one", seconds_timeout = 30))
x$push(
command = "done",
name = "task_a",
Expand All @@ -332,7 +333,7 @@ crew_test("crew_controller_group() wait one", {
name = "task_a",
controller = "b"
)
x$wait(mode = "one", seconds_timeout = 30)
expect_true(x$wait(mode = "one", seconds_timeout = 30))
out <- x$pop()
expect_equal(out$result[[1L]], "done")
})
Expand Down Expand Up @@ -365,6 +366,27 @@ crew_test("crew_controller_group() wait all timeout", {
expect_false(x$wait(mode = "all", seconds_timeout = 0, seconds_interval = 0))
})

crew_test("controllers in groups must not already be started", {
skip_on_cran()
skip_on_os("windows")
a <- crew_controller_local(
name = "a",
seconds_idle = 360
)
b <- crew_controller_local(
name = "b",
seconds_idle = 360
)
on.exit({
b$terminate()
rm(b)
gc()
crew_test_sleep()
})
b$start()
expect_crew_error(crew_controller_group(a, b))
})

crew_test("crew_controller_group() deprecate collect()", {
skip_on_cran()
skip_on_os("windows")
Expand Down

0 comments on commit 57341c0

Please sign in to comment.