Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the appearance of staged parallelism for single-job runthroughs. #371

Merged
merged 3 commits into from
May 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Version 5.1.4

- Remove staged parallelism from the `make(parallelism = "mclapply")` backend. Now uses persistent workers and a master process.
- Remove staged parallelism from the `"mclapply"` backend. Now uses persistent workers and a master process.
- Remove the appearance of staged parallelism from single-job `make()`'s.
- Calls to `make()` no longer leave targets in the user's environment.
- Attempt to fix a Solaris CRAN check error. The test at https://github.com/ropensci/drake/blob/b4dbddb840d2549621b76bcaa46c344b0fd2eccc/tests/testthat/test-edge-cases.R#L3 was previously failing on CRAN's Solaris machine (R 3.5.0). In the test, one of the threads deliberately quits in error, and the R/Solaris installation did not handle this properly. The test should work now because it no longer uses any parallelism.
- Add an `upstream_only` argument to `failed()` so users can list failed targets that do not have any failed dependencies. Naturally accompanies `make(keep_going = TRUE)`.
- Add an RStudio R Markdown template compatible with https://krlmlr.github.io/drake-pitch/.
Expand Down
24 changes: 24 additions & 0 deletions R/build.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@ drake_build <- function(
build_and_store(target = target, config = config)
}

build_check_store <- function(
target, config, downstream = NULL, announce = TRUE, flag_attempt = FALSE
){
meta <- drake_meta(target = target, config = config)
if (!should_build_target(
target = target,
meta = meta,
config = config
)){
return()
}
meta$start <- proc.time()
prune_envir(
targets = target,
config = config,
downstream = downstream
)
value <- build_and_store(target = target, meta = meta, config = config)
assign_to_envir(target = target, value = value, config = config)
if (flag_attempt && target %in% config$plan$target){
set_attempt_flag(config)
}
}

build_and_store <- function(target, config, meta = NULL, announce = TRUE){
# The environment should have been pruned by now.
# For staged parallelism, this was already done in bulk
Expand Down
15 changes: 14 additions & 1 deletion R/envir.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
assign_to_envir <- function(targets, values, config){
assign_to_envir <- function(target, value, config){
if (
identical(config$lazy_load, "eager") &&
!is_file(target) &&
target %in% config$plan$target
){
assign(x = target, value = value, envir = config$envir)
}
invisible()
}

# Should go away when staged parallelism goes away.
assign_to_envir_batch <- function(targets, values, config){
if (config$lazy_load != "eager"){
return()
}
Expand All @@ -13,6 +25,7 @@ assign_to_envir <- function(targets, values, config){
invisible()
}

# Same.
assign_to_envir_single <- function(index, targets, values, config){
target <- targets[index]
value <- values[[index]]
Expand Down
5 changes: 4 additions & 1 deletion R/graph.R
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,11 @@ filter_upstream <- function(targets, graph){
}

# This function will go away when we get rid of staged parallelism.
# No point in testing it.
# nocov start
exclude_imports_if <- function(config){
if (!length(config$skip_imports)){
config$skip_imports <- FALSE # nocov
config$skip_imports <- FALSE
}
if (!config$skip_imports){
return(config)
Expand All @@ -240,6 +242,7 @@ exclude_imports_if <- function(config){
)
config
}
# nocov end

subset_graph <- function(graph, subset){
if (!length(subset)){
Expand Down
17 changes: 6 additions & 11 deletions R/lapply.R
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
run_lapply <- function(config){
run_staged_parallelism(config = config, worker = worker_lapply)
}

worker_lapply <- function(targets, meta_list, config){
prune_envir(targets = targets, config = config)
values <- lapply(
X = targets,
FUN = drake_build_worker,
meta_list = meta_list,
config = config
lapply(
X = igraph::topo_sort(config$schedule)$name,
FUN = build_check_store,
config = config,
flag_attempt = TRUE
)
assign_to_envir(targets = targets, values = values, config = config)
invisible()
}
4 changes: 4 additions & 0 deletions R/make.R
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ make_session <- function(config){
cache = config$cache,
jobs = config$jobs
)
remove(
list = intersect(config$plan$target, ls(envir = config$envir)),
envir = config$envir
)
return(invisible(config))
}

Expand Down
25 changes: 8 additions & 17 deletions R/mclapply.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
run_mclapply <- function(config){
do_prework(config = config, verbose_packages = FALSE)
config$jobs <- safe_jobs(config$jobs)
if (config$jobs < 2) {
return(run_lapply(config = config))
Expand Down Expand Up @@ -32,9 +31,12 @@ mc_master <- function(config){
for (worker in config$workers){
if (mc_is_idle(worker = worker, config = config)){
mc_conclude_worker(worker = worker, config = config)
if (!config$queue$size()){
mc_set_done(worker = worker, config = config)
next
}
target <- config$queue$pop0(what = "names")
if (!length(target)){
mc_set_done(worker = worker, config = config)
next
}
mc_set_target(worker = worker, target = target, config = config)
Expand All @@ -54,23 +56,11 @@ mc_worker <- function(worker, config){
break
}
target <- mc_get_target(worker = worker, config = config)
meta <- drake_meta(target = target, config = config)
if (!should_build_target(
build_check_store(
target = target,
meta = meta,
config = config
)){
mc_set_idle(worker = worker, config = config)
next
}
meta$start <- proc.time()
prune_envir(
targets = target,
config = config,
downstream = config$cache$list(namespace = "protect")
)
value <- build_and_store(target = target, meta = meta, config = config)
assign(x = target, value = value, envir = config$envir)
mc_set_idle(worker = worker, config = config)
}
}
Expand Down Expand Up @@ -105,13 +95,14 @@ mc_conclude_worker <- function(worker, config){
reverse = TRUE
) %>%
intersect(y = config$queue$list(what = "names"))
config$queue$decrease_key(names = revdeps)
flag_attempt <- !get_attempt_flag(config) &&
get_progress_single(target = target, cache = config$cache) == "finished" &&
target %in% config$plan$target
if (flag_attempt){
set_attempt_flag(config)
}
config$queue$decrease_key(names = revdeps)
mc_set_target(worker = worker, target = NA, config = config)
}

mc_work_remains <- function(config){
Expand Down Expand Up @@ -223,5 +214,5 @@ worker_mclapply <- function(targets, meta_list, config){
config = config,
mc.cores = jobs
)
assign_to_envir(targets = targets, values = values, config = config)
assign_to_envir_batch(targets = targets, values = values, config = config)
}
2 changes: 1 addition & 1 deletion R/parLapply.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ prune_envir_parLapply <- function(targets = targets, config = config) { # nolint

assign_to_envir_parLapply <- # nolint
function(targets, values, config) {
assign_to_envir(targets = targets, values = values, config = config)
assign_to_envir_batch(targets = targets, values = values, config = config)
if (identical(config$envir, globalenv()))
clusterCall(cl = config$cluster, fun = assign_to_envir,
targets = targets, values = values, config = config)
Expand Down
16 changes: 3 additions & 13 deletions tests/testthat/test-lazy-load.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,19 @@ test_with_dir("lazy loading is actually lazy", {
unload_these <- c(lazily_loaded, eagerly_loaded) %>%
intersect(y = ls(envir = config$envir))
remove(list = unload_these, envir = config$envir)
config <- make(
config <- drake_config(
lazy_load = TRUE,
plan = config$plan,
targets = "combined",
envir = config$envir,
verbose = FALSE,
session_info = FALSE
)
config$schedule <- config$graph
run_lapply(config)
loaded <- ls(envir = config$envir)
expect_true(all(lazily_loaded %in% loaded))
expect_false(any(eagerly_loaded %in% loaded))
clean()
config <- make(
lazy_load = FALSE,
plan = config$plan,
targets = "combined",
envir = config$envir,
verbose = FALSE,
session_info = FALSE
)
loaded <- ls(envir = config$envir)
expect_true(all(lazily_loaded %in% loaded))
expect_true(all(eagerly_loaded %in% loaded))
})

test_with_dir("active bindings", {
Expand Down
6 changes: 1 addition & 5 deletions tests/testthat/test-other-features.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,11 @@ test_with_dir("drake_build works as expected", {
x <- cached()
expect_equal(x, "a")
o <- make(pl, envir = e)
expect_true("a" %in% ls(envir = e))
expect_equal(justbuilt(o), "b")
remove(list = "a", envir = e)
expect_false("a" %in% ls(envir = e))

# Can run without config
o <- drake_build(b, envir = e)
expect_equal(o, e$b)
expect_equal(o, readd(b))
expect_true("a" %in% ls(envir = e))

# Replacing deps in environment
expect_equal(e$a, 1)
Expand All @@ -73,6 +68,7 @@ test_with_dir("drake_build works as expected", {
expect_equal(e$a, 1)

# `replace` in loadd()
e$b <- 1
expect_equal(e$b, 1)
e$b <- 5
loadd(b, envir = e, replace = FALSE)
Expand Down