Skip to content

Commit

Permalink
Remove Dataproc from Frequencies stage of Large Cohort (#632)
Browse files Browse the repository at this point in the history
* First attempt not using dataproc, changing stage output to tmp bucket for testing purposes as well

* Generating dataproc output but in tmp directory

* Back outside of dataproc. Changed writing out stage output back to workflow prefix
  • Loading branch information
michael-harper authored May 1, 2024
1 parent 6919746 commit 8b1debb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
8 changes: 4 additions & 4 deletions cpg_workflows/large_cohort/frequencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@


def run(
vds_path: Path,
sample_qc_ht_path: Path,
relateds_to_drop_ht_path: Path,
out_ht_path: Path,
vds_path: str,
sample_qc_ht_path: str,
relateds_to_drop_ht_path: str,
out_ht_path: str,
):
if can_reuse(out_ht_path):
return
Expand Down
32 changes: 17 additions & 15 deletions cpg_workflows/stages/large_cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,22 +324,24 @@ def expected_outputs(self, cohort: Cohort) -> dict[str, Path]:
return get_workflow().prefix / 'frequencies.ht'

def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
from cpg_workflows.large_cohort.dataproc_utils import dataproc_job
from cpg_workflows.large_cohort.frequencies import run
from cpg_workflows.large_cohort import frequencies

j = dataproc_job(
job_name=self.__class__.__name__,
function=run,
function_path_args=dict(
vds_path=inputs.as_path(cohort, stage=Combiner),
sample_qc_ht_path=inputs.as_path(cohort, stage=SampleQC),
relateds_to_drop_ht_path=inputs.as_path(
cohort,
stage=Relatedness,
key='relateds_to_drop',
),
out_ht_path=self.expected_outputs(cohort),
j = get_batch().new_job(
'Frequencies',
(self.get_job_attrs() or {}) | {'tool': 'hail query'},
)
j.image(image_path('cpg_workflows'))

j.command(
query_command(
frequencies,
frequencies.run.__name__,
str(inputs.as_path(cohort, Combiner)),
str(inputs.as_path(cohort, SampleQC)),
str(inputs.as_path(cohort, Relatedness, key='relateds_to_drop')),
str(self.expected_outputs(cohort)),
setup_gcp=True,
),
depends_on=inputs.get_jobs(cohort),
)

return self.make_outputs(cohort, data=self.expected_outputs(cohort), jobs=[j])

0 comments on commit 8b1debb

Please sign in to comment.