From f6e5025a8d1511f3b8df1748e93be050780d3fa2 Mon Sep 17 00:00:00 2001 From: Michael Harper Date: Fri, 31 Jan 2025 14:23:03 +1100 Subject: [PATCH] Removing regirstration stages --- .../stages/realign_genotype_with_dragen.py | 141 ------------------ dragen_378_realignment_runner.py | 4 - 2 files changed, 145 deletions(-) diff --git a/cpg_workflows/stages/realign_genotype_with_dragen.py b/cpg_workflows/stages/realign_genotype_with_dragen.py index 7765b2bfd..d8c2d566d 100644 --- a/cpg_workflows/stages/realign_genotype_with_dragen.py +++ b/cpg_workflows/stages/realign_genotype_with_dragen.py @@ -340,144 +340,3 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S data=self.expected_outputs(sequencing_group=sequencing_group), jobs=ica_download_job, ) - - -@stage( - analysis_type='cram', - analysis_keys=['cram', 'crai'], - required_stages=[PrepareIcaForDragenAnalysis, ManageDragenPipeline, GvcfMlrWithDragen, DownloadDataFromIca], - forced=True, # Forcing stage as expected_outputs should always exist before running this stage -) -class RegisterCramIcaOutputsInMetamist(SequencingGroupStage): - def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, cpg_utils.Path]: - bucket_name: cpg_utils.Path = sequencing_group.dataset.prefix() - pipeline_id_path = bucket_name / GCP_FOLDER_FOR_RUNNING_PIPELINE / f'{sequencing_group.name}_pipeline_id.json' - - if not pipeline_id_path.exists(): - raise FileNotFoundError(f'Pipeline ID not found in {pipeline_id_path}') - - with open(cpg_utils.to_path(pipeline_id_path), 'rt') as pipeline_fid_handle: - pipeline_id = pipeline_fid_handle.read().strip() - - download_path = ( - bucket_name - / GCP_FOLDER_FOR_ICA_DOWNLOAD - / sequencing_group.name - / f'{sequencing_group.name}-{pipeline_id}' - / sequencing_group.name - ) - - return { - 'cram': download_path / f'{sequencing_group.name}.cram', - 'crai': download_path / f'{sequencing_group.name}.cram.crai', - } - - def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None: - outputs = self.expected_outputs(sequencing_group=sequencing_group) - ica_outputs = { - 'pipeline_id': inputs.as_path(target=sequencing_group, stage=ManageDragenPipeline, key='pipeline_id'), - 'downloaded_data': inputs.as_path(target=sequencing_group, stage=DownloadDataFromIca), - } - - with open(cpg_utils.to_path(ica_outputs['pipeline_id']), 'rt') as pipeline_fid_handle: - pipeline_id: str = pipeline_fid_handle.read().rstrip() - - # Confirm existence of CRAM and CRAI files - if not (outputs['cram']).exists(): - raise FileNotFoundError(f'CRAM not found in {outputs["cram"]}') - if not (outputs['crai']).exists(): - raise FileNotFoundError(f'CRAI not found in {outputs["crai"]}') - - batch_instance: Batch = get_batch() - register_cram_job: BashJob = batch_instance.new_bash_job( - name='RegisterCramIcaOutputsInMetamist', - attributes=(self.get_job_attrs() or {}) | {'tool': 'ICA'}, - ) - - register_cram_job.memory('8Gi') - register_cram_job.image(image=image_path('ica')) - - register_cram_job.command( - f""" - echo 'Pipeline run {pipeline_id} CRAM and CRAI files successfully registered in Metamist' - """, - ) - - return self.make_outputs( - target=sequencing_group, - data=outputs, - jobs=register_cram_job, - ) - - -@stage( - analysis_type='gvcf', - analysis_keys=['gvcf', 'gvcf_tbi'], - required_stages=[PrepareIcaForDragenAnalysis, ManageDragenPipeline, GvcfMlrWithDragen, DownloadDataFromIca], - forced=True, # Forcing stage as expected_outputs should always exist before running this stage -) -class RegisterGvcfIcaOutputsInMetamist(SequencingGroupStage): - def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, cpg_utils.Path]: - bucket_name: cpg_utils.Path = sequencing_group.dataset.prefix() - pipeline_id_path = bucket_name / GCP_FOLDER_FOR_RUNNING_PIPELINE / f'{sequencing_group.name}_pipeline_id.json' - - if not pipeline_id_path.exists(): - raise FileNotFoundError(f'Pipeline ID not found in {pipeline_id_path}') - - with open(cpg_utils.to_path(pipeline_id_path), 'rt') as pipeline_fid_handle: - pipeline_id = pipeline_fid_handle.read().strip() - - download_path = ( - bucket_name - / GCP_FOLDER_FOR_ICA_DOWNLOAD - / sequencing_group.name - / f'{sequencing_group.name}-{pipeline_id}' - / sequencing_group.name - ) - - return { - 'gvcf': download_path / f'{sequencing_group.name}.hard-filtered.gvcf.gz', - 'gvcf_tbi': download_path / f'{sequencing_group.name}.hard-filtered.gvcf.gz.tbi', - } - - def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None: - outputs = self.expected_outputs(sequencing_group=sequencing_group) - ica_outputs = { - 'pipeline_id': inputs.as_path(target=sequencing_group, stage=ManageDragenPipeline, key='pipeline_id'), - 'downloaded_data': inputs.as_path(target=sequencing_group, stage=DownloadDataFromIca), - } - - with open(cpg_utils.to_path(ica_outputs['pipeline_id']), 'rt') as pipeline_fid_handle: - pipeline_id: str = pipeline_fid_handle.read().rstrip() - - # Confirm existence of CRAM and CRAI files - if not (outputs['gvcf']).exists(): - raise FileNotFoundError(f'GVCF not found in {outputs["gvcf"]}') - if not (outputs['gvcf_tbi']).exists(): - raise FileNotFoundError(f'GVCF_tbi not found in {outputs["gvcf_tbi"]}') - - batch_instance: Batch = get_batch() - register_cram_job: BashJob = batch_instance.new_bash_job( - name='RegisterGvcfIcaOutputsInMetamist', - attributes=(self.get_job_attrs() or {}) | {'tool': 'ICA'}, - ) - - register_cram_job.memory('8Gi') - register_cram_job.image(image=image_path('ica')) - - register_cram_job.command( - f""" - echo 'Pipeline run {pipeline_id} GVCF and GVCF_TBI files successfully registered in Metamist' - """, - ) - - return self.make_outputs( - target=sequencing_group, - data=outputs, - jobs=register_cram_job, - ) - - -@stage(required_stages=[RegisterCramIcaOutputsInMetamist, RegisterGvcfIcaOutputsInMetamist]) -class RegisterAllIcaOutputsInMetamist(SequencingGroupStage): - pass diff --git a/dragen_378_realignment_runner.py b/dragen_378_realignment_runner.py index 69d87ece4..7b3bf2dd5 100644 --- a/dragen_378_realignment_runner.py +++ b/dragen_378_realignment_runner.py @@ -14,15 +14,11 @@ from cpg_workflows import defaults_config_path from cpg_workflows.stages.realign_genotype_with_dragen import ( DownloadDataFromIca, - RegisterCramIcaOutputsInMetamist, - RegisterGvcfIcaOutputsInMetamist, ) from cpg_workflows.workflow import StageDecorator, run_workflow WORKFLOWS: dict[str, list[StageDecorator]] = { 'realign_genotype_with_dragen': [ - RegisterCramIcaOutputsInMetamist, - RegisterGvcfIcaOutputsInMetamist, DownloadDataFromIca, ], }