Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed Jun 12, 2024
1 parent f2b3278 commit d41360c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 26 deletions.
1 change: 0 additions & 1 deletion R/Optimizer.R
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ Optimizer = R6Class("Optimizer",
assign_result_default = function(inst) {
assert_r6(inst, "OptimInstance")
res = inst$archive$best()

xdt = res[, inst$search_space$ids(), with = FALSE]

if (inherits(inst, "OptimInstanceBatchMultiCrit") || inherits(inst, "OptimInstanceAsyncMultiCrit")) {
Expand Down
56 changes: 34 additions & 22 deletions R/OptimizerAsync.R
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,46 @@ optimize_async_default = function(instance, optimizer, design = NULL, n_workers
get_private(optimizer)$.optimize(instance)
} else {
# run .optimize() on workers

# check if there are already running workers or a rush plan is available
if (!instance$rush$n_running_workers && !rush_available()) {
stop("No running worker found and no rush plan available to start local workers.\n See `?rush::rush_plan()`")
}
rush = instance$rush

# FIXME: How to pass globals and packages?
if (!instance$rush$n_running_workers) {
lg$debug("Start %i local worker(s)", n_workers %??% rush_config()$n_workers)

packages = c(optimizer$packages, "bbotk") # add packages from objective

instance$rush$start_workers(
n_workers = n_workers,
wait_for_workers = TRUE,
if (rush$n_pre_workers) {
# start remote workers
lg$info("Starting to optimize %i parameter(s) with '%s' and '%s' on %i remote worker(s)",
instance$search_space$length,
optimizer$format(),
instance$terminator$format(with_params = TRUE),
rush$n_pre_workers
)

rush$start_remote_workers(
worker_loop = bbotk_worker_loop,
packages = packages,
packages = c(optimizer$packages, "bbotk"), # add packages from objective
optimizer = optimizer,
instance = instance)
} else if (rush_available()) {
# local workers
lg$info("Starting to optimize %i parameter(s) with '%s' and '%s' on %i remote worker(s)",
instance$search_space$length,
optimizer$format(),
instance$terminator$format(with_params = TRUE),
rush_config()$n_workers
)

rush$start_local_workers(
worker_loop = bbotk_worker_loop,
packages = c(optimizer$packages, "bbotk"), # add packages from objective
optimizer = optimizer,
instance = instance,
wait_for_workers = TRUE)
} else {
stop("No rush plan available to start local workers and no pre-started remote workers found. See `?rush::rush_plan()`.")
}

lg$info("Starting to optimize %i parameter(s) with '%s' and '%s' on %i worker(s)",
instance$search_space$length,
optimizer$format(),
instance$terminator$format(with_params = TRUE),
instance$rush$n_running_workers
)
}

# wait until optimization is finished
# check terminated workers when the terminator is "none"
while(!instance$is_terminated && !instance$rush$all_workers_terminated) {
while(TRUE) {
Sys.sleep(1)
instance$rush$print_log()

Expand All @@ -113,9 +121,13 @@ optimize_async_default = function(instance, optimizer, design = NULL, n_workers
if (instance$rush$all_workers_lost) {
stop("All workers have crashed.")
}

if (instance$is_terminated) break
if (instance$rush$all_workers_terminated) break
}

# assign result
print(instance$archive$n_evals)
get_private(optimizer)$.assign_result(instance)
lg$info("Finished optimizing after %i evaluation(s)", instance$archive$n_evals)
lg$info("Result:")
Expand Down
39 changes: 36 additions & 3 deletions tests/testthat/test_OptimizerAsync.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
test_that("OptimizerAsync starts local workers", {
skip_on_cran()
# skip_on_cran()
skip_if_not_installed("rush")
flush_redis()

Expand All @@ -11,6 +10,7 @@ test_that("OptimizerAsync starts local workers", {
search_space = PS_2D,
terminator = trm("evals", n_evals = 5L),
)

optimizer = opt("async_random_search")
optimizer$optimize(instance)

Expand All @@ -19,9 +19,43 @@ test_that("OptimizerAsync starts local workers", {
expect_rush_reset(instance$rush)
})

test_that("OptimizerAsync starts remote workers", {
skip_on_cran()
skip_if_not_installed("rush")
flush_redis()

rush = rsh(network_id = "test_rush")
expect_snapshot(rush$create_worker_script())

px = processx::process$new("Rscript",
args = c("-e", 'rush::start_worker(network_id = "test_rush", remote = TRUE, url = "redis://127.0.0.1:6379", scheme = "redis", host = "127.0.0.1", port = "6379")'),
supervise = TRUE,
stderr = "|", stdout = "|")

on.exit({
px$kill()
}, add = TRUE)

Sys.sleep(5)

instance = oi_async(
objective = OBJ_2D,
search_space = PS_2D,
terminator = trm("evals", n_evals = 5L),
rush = rush
)

optimizer = opt("async_random_search")
optimizer$optimize(instance)

expect_data_table(instance$rush$worker_info, nrows = 1)
expect_true(instance$rush$worker_info$remote)

expect_rush_reset(instance$rush)
})

test_that("OptimizerAsync assigns result", {
skip_on_cran()
# skip_on_cran()
skip_if_not_installed("rush")
flush_redis()

Expand All @@ -42,7 +76,6 @@ test_that("OptimizerAsync assigns result", {

test_that("OptimizerAsync throws an error when all workers are lost", {
skip_on_cran()
# skip_on_cran()
skip_if_not_installed("rush")
flush_redis()

Expand Down

0 comments on commit d41360c

Please sign in to comment.