From f738b79c5262ee05ea18110be2928f3d05b46ebf Mon Sep 17 00:00:00 2001 From: wlandau-lilly Date: Tue, 30 Jan 2024 11:45:15 -0500 Subject: [PATCH] Sketch #1220 --- DESCRIPTION | 2 +- R/class_crew.R | 29 +++++++++++++++++++++-------- R/tar_make.R | 2 +- tests/testthat/test-class_crew.R | 12 ++++++------ tests/testthat/test-tar_make.R | 2 +- 5 files changed, 30 insertions(+), 17 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 82e4dac8..aabaa7c9 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -77,7 +77,7 @@ Suggests: arrow (>= 3.0.0), bs4Dash (>= 2.0.0), clustermq (>= 0.9.2), - crew (>= 0.8.0), + crew (>= 0.8.0.9003), curl (>= 4.3), DT (>= 0.14), dplyr (>= 1.0.0), diff --git a/R/class_crew.R b/R/class_crew.R index b9578846..f435d39b 100644 --- a/R/class_crew.R +++ b/R/class_crew.R @@ -126,6 +126,16 @@ crew_class <- R6::R6Class( list(common = common, globals = globals) }, run_worker = function(target) { + name <- target_get_name(target) + resources <- target$settings$resources$crew + name_controller <- resources$controller + # Covered in tests/hpc/test-crew_local.R + # nocov start + if (self$controller$saturated(controller = name_controller)) { + self$controller$push_backlog(name = name, controller = name_controller) + return() + } + # nocov end if (self$garbage_collection) { gc() } @@ -143,14 +153,12 @@ crew_class <- R6::R6Class( data <- self$exports$common data$target <- target globals <- self$exports$globals - resources <- target$settings$resources$crew - name <- target_get_name(target) target_prepare( target = target, pipeline = self$pipeline, scheduler = self$scheduler, meta = self$meta, - pending = self$controller$saturated(controller = resources$controller) + pending = FALSE ) self$sync_meta_time() self$controller$push( @@ -159,7 +167,7 @@ crew_class <- R6::R6Class( globals = globals, substitute = FALSE, name = name, - controller = resources$controller, + controller = name_controller, scale = TRUE, throttle = TRUE, seconds_timeout = resources$seconds_timeout @@ -202,9 +210,13 @@ crew_class <- R6::R6Class( iterate = function() { self$sync_meta_time() queue <- self$scheduler$queue - if_any( - queue$should_dequeue(), - self$process_target(queue$dequeue()), + # Covered in tests/hpc/test-crew_local.R + # nocov start + if (queue$should_dequeue()) { + self$process_target(queue$dequeue()) + } else if (length(backlog <- self$controller$pop_backlog())) { + map(backlog, ~self$process_target(.x)) + } else { self$controller$wait( mode = "one", seconds_interval = 0.5, @@ -212,7 +224,8 @@ crew_class <- R6::R6Class( scale = TRUE, throttle = TRUE ) - ) + } + # nocov end self$conclude_worker_task() }, conclude_worker_task = function() { diff --git a/R/tar_make.R b/R/tar_make.R index de18adb9..146bac40 100644 --- a/R/tar_make.R +++ b/R/tar_make.R @@ -224,7 +224,7 @@ tar_make_inner <- function( envir = tar_option_get("envir") )$run() } else { - tar_assert_package("crew (>= 0.8.0)") + tar_assert_package("crew (>= 0.8.0.9003)") crew_init( pipeline = pipeline, meta = meta_init(path_store = path_store), diff --git a/tests/testthat/test-class_crew.R b/tests/testthat/test-class_crew.R index 42d447a0..419d7b13 100644 --- a/tests/testthat/test-class_crew.R +++ b/tests/testthat/test-class_crew.R @@ -1,5 +1,5 @@ tar_test("crew$validate()", { - skip_if_not_installed("crew", minimum_version = "0.8.0") + skip_if_not_installed("crew", minimum_version = "0.8.0.9003") controller <- crew::crew_controller_local( host = "127.0.0.1", seconds_interval = 0.5 @@ -15,7 +15,7 @@ tar_test("crew database subkey", { tar_test("workerless deployment works", { skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.8.0") + skip_if_not_installed("crew", minimum_version = "0.8.0.9003") skip_if_not_installed("R.utils") tar_runtime$fun <- "tar_make" tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5)) @@ -98,7 +98,7 @@ tar_test("semi-workerless deployment works", { skip_cran() skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.8.0") + skip_if_not_installed("crew", minimum_version = "0.8.0.9003") skip_if_not_installed("R.utils") crew_test_sleep() tar_runtime$fun <- "tar_make" @@ -183,7 +183,7 @@ tar_test("some targets up to date, some not", { skip_cran() skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.8.0") + skip_if_not_installed("crew", minimum_version = "0.8.0.9003") skip_if_not_installed("R.utils") tar_runtime$fun <- "tar_make" tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5)) @@ -238,7 +238,7 @@ tar_test("crew algo can skip targets", { skip_cran() skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.8.0") + skip_if_not_installed("crew", minimum_version = "0.8.0.9003") skip_if_not_installed("R.utils") tar_runtime$fun <- "tar_make" tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5)) @@ -293,7 +293,7 @@ tar_test("nontrivial common data", { skip_cran() skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.8.0") + skip_if_not_installed("crew", minimum_version = "0.8.0.9003") skip_if_not_installed("R.utils") tar_runtime$fun <- "tar_make" tar_option_set(backoff = tar_backoff(min = 0.5, max = 0.5)) diff --git a/tests/testthat/test-tar_make.R b/tests/testthat/test-tar_make.R index 75334639..c566e0be 100644 --- a/tests/testthat/test-tar_make.R +++ b/tests/testthat/test-tar_make.R @@ -17,7 +17,7 @@ tar_test("tar_make() works", { tar_test("tar_make() works with crew", { skip_on_os("windows") skip_on_os("solaris") - skip_if_not_installed("crew", minimum_version = "0.8.0") + skip_if_not_installed("crew", minimum_version = "0.8.0.9003") skip_if_not_installed("R.utils") should_skip <- identical(tolower(Sys.info()[["sysname"]]), "windows") && isTRUE(as.logical(Sys.getenv("CI")))