Skip to content

Commit

Permalink
update dev steps
Browse files Browse the repository at this point in the history
  • Loading branch information
stemangiola committed Dec 18, 2024
1 parent 0fa42be commit 0ccef13
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ annotation_label_transfer(sce_transformed_tier_4, empty_droplets_tbl = empty_tbl
#' @function `lighten_annotation`: Processes each annotation table target, unnesting and selecting specific columns to reduce data size.
#'
#' @example Usage:
#' The pipeline script is saved as `/vast/scratch/users/mangiola.s/lighten_annotation_tbl_target.R` and can be run using `tar_make()`.
tar_script({
library(dplyr)
library(magrittr)
Expand Down
160 changes: 79 additions & 81 deletions dev/3_prepare_local_cache_splitting_du_dataset_and_cell_type.R
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ tar_script({
storage = "worker",
retrieval = "worker",
error = "continue",
# debug = "saved_dataset_cpm_b1de4e660a0a9cf5",
cue = tar_cue(mode = "never"),
debug = "dataset_id_sce",

workspace_on_error = TRUE,
controller = crew_controller_group(
list(
Expand Down Expand Up @@ -294,36 +294,36 @@ tar_script({
# cores = as.numeric(Sys.getenv("SLURM_CPUS_PER_TASK", unset = 1))
# bp <- MulticoreParam(workers = cores , progressbar = TRUE) # Adjust the number of workers as needed
#
dataset_id_sce |>
purrr::transpose() |>
lapply(
FUN = function(x) {
.x = x[[2]]
.y = x[[1]]
# Check if the 'sce' has only one cell (column)
if(ncol(assay(.x)) == 1) {

# Duplicate the assay to prevent saving errors due to single-column matrices
my_assay = cbind(assay(.x), assay(.x))
# Rename the second column to distinguish it
colnames(my_assay)[2] = paste0("DUMMY", "___", colnames(my_assay)[2])

cd = colData(.x)
cd = cd |> rbind(cd)
rownames(cd)[2] = paste0("DUMMY", "___", rownames(cd)[2])



.x = SingleCellExperiment(assay = list( counts = my_assay ), colData = cd)
}


# TEMPORARY FOR SOME REASON THE MIN COUNTS IS NOT 0 FOR SOME SAMPLES
.x = HPCell:::check_if_assay_minimum_count_is_zero_and_correct_TEMPORARY(.x, assays(.x) |> names() |> _[1], subset_up_to_number_of_cells = 100)

.x = SingleCellExperiment(assay = list( counts = .x |> assay()), colData = colData(.x))



.x = dataset_id_sce |> pull(sce) |> _[[1]]

This comment has been minimized.

Copy link
@myushen

myushen Dec 19, 2024

Contributor

@stemangiola any reasons why changes were made to save_anndata only but not adapt to save_anndata_cpm and save_rank_per_cell?

This comment has been minimized.

Copy link
@stemangiola

stemangiola Dec 26, 2024

Author Owner

No reasons, it;s just I needed counts as priority, feel free to adapt.

.y = dataset_id_sce |> pull(file_id_cellNexus_single_cell) |> _[[1]] |> str_remove("\\.h5ad")

.x |> assays() |> names() = "counts"

# # Check if the 'sce' has only one cell (column)
# if(ncol(assay(.x)) == 1) {
#
# # Duplicate the assay to prevent saving errors due to single-column matrices
# my_assay = cbind(assay(.x), assay(.x))
# # Rename the second column to distinguish it
# colnames(my_assay)[2] = paste0("DUMMY", "___", colnames(my_assay)[2])
#
# cd = colData(.x)
# cd = cd |> rbind(cd)
# rownames(cd)[2] = paste0("DUMMY", "___", rownames(cd)[2])
#
#
#
# .x = SingleCellExperiment(assay = list( counts = my_assay ), colData = cd)
# }
#
#
# # TEMPORARY FOR SOME REASON THE MIN COUNTS IS NOT 0 FOR SOME SAMPLES
# .x = HPCell:::check_if_assay_minimum_count_is_zero_and_correct_TEMPORARY(.x, assays(.x) |> names() |> _[1], subset_up_to_number_of_cells = 100)
#
# .x = SingleCellExperiment(assay = list( counts = .x |> assay()), colData = colData(.x))


# My attempt to save a integer, sparse, delayed matrix (with zellkonverter it is not possible to save integers)
Expand All @@ -334,12 +334,7 @@ tar_script({
.x |> save_experiment_data(glue("{cache_directory}/{.y}"))

return(TRUE) # Indicate successful saving
}
#,
#BPPARAM = bp # Use the defined parallel backend
)

return("saved")


}

Expand Down Expand Up @@ -489,13 +484,9 @@ tar_script({
# cores = as.numeric(Sys.getenv("SLURM_CPUS_PER_TASK", unset = 1))
# bp <- MulticoreParam(workers = cores , progressbar = TRUE) # Adjust the number of workers as needed
#
dataset_id_sce |>
purrr::transpose() |>
purrr::map(
~ {
x = .x
.x = x[[2]]
.y = x[[1]]

.x = dataset_id_sce |> pull(sce) |> _[[1]]
.y = dataset_id_sce |> pull(file_id_cellNexus_single_cell) |> _[[1]]

# Check if the 'sce' has only one cell (column)
if(ncol(assay(.x)) == 1) {
Expand Down Expand Up @@ -541,13 +532,7 @@ tar_script({
.x |> save_experiment_data(glue("{cache_directory}/{.y}"))

return(TRUE) # Indicate successful saving
},
.progress = TRUE
#,
#BPPARAM = bp # Use the defined parallel backend
)

return("saved")




Expand All @@ -567,20 +552,26 @@ tar_script({
sql(glue("SELECT * FROM read_parquet('{file_id_db_file}')"))
) |>
filter(dataset_id == my_dataset_id) |>
select(cell_id, sample_id, dataset_id, file_id_cellNexus_single_cell) |>

# Drop extension because it is added later
mutate(file_id_cellNexus_single_cell = file_id_cellNexus_single_cell |> str_remove("\\.h5ad")) |>
as_tibble()
select(cell_id, sample_id, dataset_id, file_id_cellNexus_single_cell)
# |>
#
# # Drop extension because it is added later
# mutate(file_id_cellNexus_single_cell = file_id_cellNexus_single_cell |> str_remove("\\.h5ad")) |>
# as_tibble()

file_id_db =
target_name_grouped_by_dataset_id |>
left_join(file_id_db, copy = TRUE)


# Parallelise
cores = as.numeric(Sys.getenv("SLURM_CPUS_PER_TASK", unset = 1))
bp <- MulticoreParam(workers = cores , progressbar = TRUE) # Adjust the number of workers as needed

# Begin processing the data pipeline with the initial dataset 'target_name_grouped_by_dataset_id'
sce_df =
target_name_grouped_by_dataset_id |>

file_id_db |>
nest(cells = cell_id) |>
# Step 1: Read raw data for each 'target_name' and store it in a new column 'sce'
mutate(
sce = bplapply(
Expand All @@ -591,7 +582,9 @@ tar_script({
) |>

# This should not be needed, but there are some data sets with zero cells
filter(!map_lgl(sce, is.null))
filter(!map_lgl(sce, is.null)) |>

mutate(sce = map2(sce, cells, ~ .x |> filter(.cell %in% .y$cell_id), .progress = TRUE))



Expand All @@ -601,9 +594,7 @@ tar_script({
}

# plan(multisession, workers = 20)

sce_df =
sce_df |>
sce_df |>

# # Step 4: Group the data by 'dataset_id' and 'tar_group' for further summarization
# group_by(dataset_id, tar_group, chunk) |>
Expand All @@ -616,18 +607,19 @@ tar_script({
mutate(sce = map(sce, ~ SingleCellExperiment(assay = assays(.x), colData = colData(.x)) )) |>

# Step 5: Combine all 'sce' objects within each group into a single 'sce' object
summarise( sce = list(do.call(cbind, args = sce) ) ) |>
group_by(file_id_cellNexus_single_cell) |>
summarise( sce = list(do.call(cbind, args = sce) ) )

mutate(sce = map(sce,
~ { .x =
.x |>
left_join(file_id_db, by = join_by(.cell==cell_id, dataset_id==dataset_id, sample_id==sample_id))
.x |>
HPCell:::splitColData(colData(.x)$file_id_cellNexus_single_cell) |> # Split 'sce' by 'cell_type'
enframe(name = "file_id_cellNexus_single_cell", value = "sce") # Convert to tibble with 'cell_type' and 'sce' columns
})) |>
# mutate(sce = map(sce,
# ~ { .x =
# .x |>
# left_join(file_id_db, by = join_by(.cell==cell_id, dataset_id==dataset_id, sample_id==sample_id))
# .x |>
# HPCell:::splitColData(colData(.x)$file_id_cellNexus_single_cell) |> # Split 'sce' by 'cell_type'
# enframe(name = "file_id_cellNexus_single_cell", value = "sce") # Convert to tibble with 'cell_type' and 'sce' columns
# })) |>
# Step 8: Unnest the list of 'sce' objects to have one row per 'cell_type'
unnest(sce)
# unnest_single_cell_experiment(sce)


}
Expand All @@ -647,7 +639,7 @@ tar_script({
dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
sql(glue("SELECT * FROM read_parquet('{cell_metadata}')"))
) |>
distinct(dataset_id, sample_id, sample_chunk, cell_chunk) |>
distinct(dataset_id, sample_id, sample_chunk, cell_chunk, file_id_cellNexus_single_cell) |>
as_tibble(),
copy=T
)
Expand Down Expand Up @@ -692,7 +684,7 @@ tar_script({

# The input DO NOT DELETE
tar_target(my_store, "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store", deployment = "main"),
tar_target(cache_directory, "/vast/scratch/users/mangiola.s/cellNexus/cellxgene/26_11_2024", deployment = "main"),
tar_target(cache_directory, "/vast/scratch/users/mangiola.s/cellNexus/cellxgene/18_12_2024", deployment = "main"),
tar_target(
cell_metadata,
"/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_4.parquet",
Expand Down Expand Up @@ -723,7 +715,11 @@ tar_script({
tar_target(
target_name_grouped_by_dataset_id,
create_chunks_for_reading_and_saving(dataset_id_sample_id, cell_metadata) |>
group_by(dataset_id, sample_chunk, cell_chunk) |>

# !!! JUST FOR TESTING !!!
filter(file_id_cellNexus_single_cell == "004e0dd96de6f3091dac2cf8cc64ddc4___1.h5ad") |>

group_by(dataset_id, sample_chunk, cell_chunk, file_id_cellNexus_single_cell) |>
tar_group(),
iteration = "group",
resources = tar_resources(
Expand Down Expand Up @@ -794,24 +790,26 @@ job::job({
tar_make(
script = paste0(store_file_cellNexus, "_target_script.R"),
store = store_file_cellNexus,
reporter = "summary" #, callr_function = NULL
reporter = "verbose_positives" #, callr_function = NULL
)

})

tar_make(script = paste0(store_file_cellNexus, "_target_script.R"), store = store_file_cellNexus, callr_function = NULL)

x = tar_read(saved_dataset, store = store_file_cellNexus)
x = tar_read(dataset_id_sample_id, store = store_file_cellNexus)
y = tar_read(target_name_grouped_by_dataset_id, store = store_file_cellNexus)


tar_meta(store = store_file_cellNexus) |>
arrange(desc(time)) |>
filter(!error |> is.na()) |>
select(name, error, warnings, time)

tar_workspace(saved_dataset_rank_912838f659391dd0, store = store_file_cellNexus, script = paste0(store_file_cellNexus, "_target_script.R"))
tar_workspace(save_anndata_86c62bdb3d08d7b6, store = store_file_cellNexus, script = paste0(store_file_cellNexus, "_target_script.R"))

tar_invalidate(saved_dataset, store = store_file_cellNexus)
tar_delete(saved_dataset, , store = store_file_cellNexus)
tar_invalidate(starts_with("save_anndata"), store = store_file_cellNexus)
tar_delete(starts_with("save_anndata"), , store = store_file_cellNexus)

target_name_grouped_by_dataset_id =tar_read(target_name_grouped_by_dataset_id, store = store_file_cellNexus)

2 changes: 1 addition & 1 deletion dev/4_make_pseudobulk.R
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ job::job({
se,
verbose = TRUE,
file = "/vast/projects/cellxgene_curated/cellNexus/pseudobulk_sample_cell_type_1_0_4.h5ad",
X_name = "counts_scaled"
X_name = "counts"
)
})

Expand Down

0 comments on commit 0ccef13

Please sign in to comment.