Skip to content

Commit

Permalink
Merge pull request #1130 from ropensci/1109
Browse files Browse the repository at this point in the history
Cloud metadata
  • Loading branch information
wlandau-lilly authored Aug 28, 2023
2 parents 74baabc + c16f945 commit b7d8183
Show file tree
Hide file tree
Showing 57 changed files with 2,310 additions and 65 deletions.
6 changes: 6 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,17 @@ export(tar_mermaid)
export(tar_message)
export(tar_message_run)
export(tar_meta)
export(tar_meta_delete)
export(tar_meta_download)
export(tar_meta_sync)
export(tar_meta_upload)
export(tar_name)
export(tar_network)
export(tar_newer)
export(tar_noninteractive)
export(tar_objects)
export(tar_older)
export(tar_option_export)
export(tar_option_get)
export(tar_option_reset)
export(tar_option_set)
Expand All @@ -439,6 +444,7 @@ export(tar_pid)
export(tar_pipeline)
export(tar_pipeline_validate_lite)
export(tar_poll)
export(tar_print)
export(tar_process)
export(tar_progress)
export(tar_progress_branches)
Expand Down
6 changes: 5 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# targets 1.2.2.9001 (development)


## Invalidating changes

Because of these changes, upgrading to this version of `targets` will unavoidably invalidate previously built targets in existing pipelines. Your pipeline code should still work, but any targets you ran before will most likely need to rerun after the upgrade.

* In the `hash_deps()` method of the metadata class, exclude symbols which are not actually dependencies, rather than just giving them empty strings. This change decouples the dependency hash from the hash of the target's command (#1108).

## Cloud metadata

* Continuously upload metadata files to the cloud during `tar_make()`, `tar_make_clustermq()`, and `tar_make_future()` (#1109). Upload them to the repository specified in the `repository_meta` `tar_option_set()` option, and use the bucket and prefix set in the `resources` `tar_option_set()` option. `repository_meta` defaults to the existing `repository` `tar_option_set()` option.
* Add new functions `tar_meta_download()`, `tar_meta_upload()`, `tar_meta_sync()`, and `tar_meta_delete()` to directly manage cloud metadata outside the pipeline (#1109).

## Other changes

* Fix solution of #1103 so the copy fallback actually runs (@jds485, #1102, #1103).
Expand Down
18 changes: 11 additions & 7 deletions R/class_active.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ active_class <- R6::R6Class(
},
ensure_meta = function() {
new_store <- !file.exists(self$meta$store)
self$meta$database$sync(prefer_local = TRUE, verbose = FALSE)
self$meta$migrate_database()
self$meta$validate()
self$meta$database$preprocess(write = TRUE)
Expand All @@ -74,8 +75,8 @@ active_class <- R6::R6Class(
self$meta$restrict_records(self$pipeline)
},
dequeue_meta = function() {
self$meta$database$dequeue_rows()
self$scheduler$progress$database$dequeue_rows()
self$meta$database$dequeue_rows(upload = TRUE)
self$scheduler$progress$database$dequeue_rows(upload = TRUE)
},
dequeue_meta_time = function() {
self$seconds_dequeued <- self$seconds_dequeued %|||% -Inf
Expand All @@ -102,6 +103,7 @@ active_class <- R6::R6Class(
ensure_process = function() {
self$process <- process_init(path_store = self$meta$store)
self$process$record_process()
self$process$database$upload(verbose = FALSE)
},
produce_exports = function(envir, path_store, is_globalenv = NULL) {
map(names(envir), ~force(envir[[.x]])) # try to nix high-mem promises
Expand Down Expand Up @@ -173,15 +175,17 @@ active_class <- R6::R6Class(
self$scheduler$reporter$report_start()
},
end = function() {
self$dequeue_meta()
pipeline_unload_loaded(self$pipeline)
seconds_elapsed <- time_seconds() - self$seconds_start
scheduler <- self$scheduler
scheduler$reporter$report_end(scheduler$progress, seconds_elapsed)
path_scratch_del(path_store = self$meta$store)
pipeline_unload_loaded(self$pipeline)
self$meta$database$dequeue_rows(upload = FALSE)
self$meta$database$deduplicate_storage()
self$meta$database$sync(prefer_local = TRUE, verbose = FALSE)
self$scheduler$progress$database$dequeue_rows(upload = TRUE)
path_scratch_del(path_store = self$meta$store)
compare_working_directories()
tar_assert_objects_files(self$meta$store)
seconds_elapsed <- time_seconds() - self$seconds_start
scheduler$reporter$report_end(scheduler$progress, seconds_elapsed)
},
validate = function() {
super$validate()
Expand Down
1 change: 1 addition & 0 deletions R/class_crew.R
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ crew_class <- R6::R6Class(
record_controller_summary = function(summary) {
database <- database_crew(self$meta$store)
database$overwrite_storage(summary)
database$upload(verbose = FALSE)
},
finalize_crew = function() {
summary <- crew_summary(self$controller)
Expand Down
68 changes: 64 additions & 4 deletions R/class_database.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ database_init <- function(
header = "name",
list_columns = character(0L),
list_column_modes = character(0L),
repository = tar_options$get_repository(),
repository = tar_options$get_repository_meta(),
resources = tar_options$get_resources()
) {
memory <- memory_init()
Expand Down Expand Up @@ -154,10 +154,13 @@ database_class <- R6::R6Class(
line <- self$produce_line(self$select_cols(row))
self$queue[length(self$queue) + 1L] <- line
},
dequeue_rows = function() {
dequeue_rows = function(upload = TRUE) {
if (length(self$queue)) {
on.exit(self$queue <- NULL)
self$append_lines(self$queue)
if (upload) {
self$upload(verbose = FALSE)
}
}
},
write_row = function(row) {
Expand Down Expand Up @@ -288,11 +291,68 @@ database_class <- R6::R6Class(
out
},
deduplicate_storage = function() {
if (file.exists(self$path)) {
data <- self$condense_data(self$read_data())
exists <- file.exists(self$path)
overwrite <- !exists
if (exists) {
old <- self$read_data()
data <- self$condense_data(old)
overwrite <- (nrow(data) != nrow(old))
}
if (overwrite) {
data <- data[order(data$name),, drop = FALSE] # nolint
self$overwrite_storage(data)
}
invisible()
},
upload = function(verbose = TRUE) {
"upload"
},
download = function(verbose = TRUE) {
"download"
},
head = function() {
file <- file_init(path = "path_cloud")
file_ensure_hash(file)
list(
exists = file.exists("path_cloud"),
hash = file$hash,
size = file$size,
time = file$time
)
},
sync = function(prefer_local = TRUE, verbose = TRUE) {
head <- self$head()
file <- file_init(path = self$path)
file_ensure_hash(file)
exists_file <- all(file.exists(self$path))
exists_object <- head$exists %|||% FALSE
changed <- !all(file$hash == head$hash)
if (exists_file && (!exists_object)) {
self$upload(verbose = verbose)
} else if ((!exists_file) && exists_object) {
self$download(verbose = verbose)
} else if (exists_file && exists_object && changed) {
time_file <- file_time_posixct(file$time)
time_head <- file_time_posixct(head$time)
file_newer <- time_file > time_head
file_same <- file$time == head$time
do_upload <- file_newer || (prefer_local && file_same)
if (do_upload) {
self$upload(verbose = verbose)
} else {
self$download(verbose = verbose)
}
} else {
if (verbose) {
tar_print(
"Skipped syncing ",
self$path,
" with cloud object ",
self$key
)
}
invisible()
}
},
validate_columns = function(header, list_columns) {
if (!all(list_columns %in% header)) {
Expand Down
69 changes: 61 additions & 8 deletions R/class_database_aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,87 @@ database_aws_class <- R6::R6Class(
)
resources_validate(self$resources$aws)
},
download = function() {
download = function(verbose = TRUE) {
if (verbose) {
tar_print(
"Downloading AWS cloud object ",
self$key,
" to local file ",
self$path
)
}
aws <- self$resources$aws
file <- file_init(path = path)
file_ensure_hash(file)
dir_create(dirname(self$path))
aws_s3_download(
file = self$path,
key = self$key,
bucket = aws$bucket,
region = aws$region,
endpoint = aws$endpoint,
max_tries = aws$max_tries,
args = aws$args
args = aws$args,
max_tries = aws$max_tries %|||% 5L
)
invisible()
},
upload = function() {
upload = function(verbose = TRUE) {
if (verbose) {
tar_print(
"Uploading local file ",
self$path,
" to AWS cloud object ",
self$key
)
}
aws <- self$resources$aws
file <- file_init(path = path)
file_ensure_hash(file)
aws_s3_upload(
file = self$path,
key = self$key,
bucket = aws$bucket,
region = aws$region,
endpoint = aws$endpoint,
metadata = list(
"targets-database-hash" = file$hash,
"targets-database-size" = file$size,
"targets-database-time" = file$time
),
part_size = aws$part_size,
max_tries = aws$max_tries,
args = aws$args
args = aws$args,
max_tries = aws$max_tries %|||% 5L
)
invisible()
},
head = function() {
aws <- self$resources$aws
head <- aws_s3_head(
key = self$key,
bucket = aws$bucket,
region = aws$region,
endpoint = aws$endpoint,
args = aws$args,
max_tries = aws$max_tries %|||% 5L
)
list(
exists = !is.null(head),
hash = head$Metadata$`targets-database-hash`,
size = head$Metadata$`targets-database-size`,
time = head$Metadata$`targets-database-time`
)
},
delete_cloud = function(verbose = TRUE) {
if (verbose) {
tar_print("Deleting AWS cloud object ", self$key)
}
aws <- self$resources$aws
aws_s3_delete(
key = self$key,
bucket = aws$bucket,
region = aws$region,
endpoint = aws$endpoint,
args = aws$args,
max_tries = aws$max_tries %|||% 5L
)
}
)
)
Expand Down
67 changes: 57 additions & 10 deletions R/class_database_gcp.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,79 @@ database_gcp_class <- R6::R6Class(
)
resources_validate(self$resources$gcp)
},
download = function() {
download = function(verbose = TRUE) {
if (verbose) {
tar_print(
"Downloading GCP cloud object ",
self$key,
" to local file ",
self$path
)
}
gcp <- self$resources$gcp
network <- self$resources$network
file <- file_init(path = path)
file_ensure_hash(file)
dir_create(dirname(self$path))
gcp_gcs_download(
file = self$path,
key = self$key,
bucket = gcp$bucket,
max_tries = network$max_tries %|||% 5L,
verbose = network$verbose %|||% TRUE
max_tries = gcp$max_tries %|||% 5L,
verbose = gcp$verbose %|||% TRUE
)
invisible()
},
upload = function() {
upload = function(verbose = TRUE) {
if (verbose) {
tar_print(
"Uploading local file ",
self$path,
" to GCP cloud object ",
self$key
)
}
gcp <- self$resources$gcp
network <- self$resources$network
file <- file_init(path = path)
file_ensure_hash(file)
gcp_gcs_upload(
file = self$path,
key = self$key,
bucket = gcp$bucket,
metadata = list(
"targets-database-hash" = file$hash,
"targets-database-size" = file$size,
"targets-database-time" = file$time
),
predefined_acl = gcp$predefined_acl %|||% "private",
max_tries = network$max_tries %|||% 5L,
verbose = network$verbose %|||% TRUE
max_tries = gcp$max_tries %|||% 5L,
verbose = gcp$verbose %|||% TRUE
)
invisible()
},
head = function() {
gcp <- self$resources$gcp
head <- gcp_gcs_head(
key = self$key,
bucket = gcp$bucket,
max_tries = gcp$max_tries %|||% 5L,
verbose = gcp$verbose %|||% TRUE
)
list(
exists = !is.null(head),
hash = head$metadata$`targets-database-hash`,
size = head$metadata$`targets-database-size`,
time = head$metadata$`targets-database-time`
)
},
delete_cloud = function(verbose = TRUE) {
if (verbose) {
tar_print("Deleting GCP cloud object ", self$key)
}
gcp <- self$resources$gcp
head <- gcp_gcs_delete(
key = self$key,
bucket = gcp$bucket,
max_tries = gcp$max_tries %|||% 5L,
verbose = gcp$verbose %|||% TRUE
)
}
)
)
Expand Down
Loading

0 comments on commit b7d8183

Please sign in to comment.