Skip to content

Commit

Permalink
Removing regirstration stages
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-harper committed Jan 31, 2025
1 parent 9f38290 commit f6e5025
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 145 deletions.
141 changes: 0 additions & 141 deletions cpg_workflows/stages/realign_genotype_with_dragen.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,144 +340,3 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S
data=self.expected_outputs(sequencing_group=sequencing_group),
jobs=ica_download_job,
)


@stage(
analysis_type='cram',
analysis_keys=['cram', 'crai'],
required_stages=[PrepareIcaForDragenAnalysis, ManageDragenPipeline, GvcfMlrWithDragen, DownloadDataFromIca],
forced=True, # Forcing stage as expected_outputs should always exist before running this stage
)
class RegisterCramIcaOutputsInMetamist(SequencingGroupStage):
def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, cpg_utils.Path]:
bucket_name: cpg_utils.Path = sequencing_group.dataset.prefix()
pipeline_id_path = bucket_name / GCP_FOLDER_FOR_RUNNING_PIPELINE / f'{sequencing_group.name}_pipeline_id.json'

if not pipeline_id_path.exists():
raise FileNotFoundError(f'Pipeline ID not found in {pipeline_id_path}')

with open(cpg_utils.to_path(pipeline_id_path), 'rt') as pipeline_fid_handle:
pipeline_id = pipeline_fid_handle.read().strip()

download_path = (
bucket_name
/ GCP_FOLDER_FOR_ICA_DOWNLOAD
/ sequencing_group.name
/ f'{sequencing_group.name}-{pipeline_id}'
/ sequencing_group.name
)

return {
'cram': download_path / f'{sequencing_group.name}.cram',
'crai': download_path / f'{sequencing_group.name}.cram.crai',
}

def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None:
outputs = self.expected_outputs(sequencing_group=sequencing_group)
ica_outputs = {
'pipeline_id': inputs.as_path(target=sequencing_group, stage=ManageDragenPipeline, key='pipeline_id'),
'downloaded_data': inputs.as_path(target=sequencing_group, stage=DownloadDataFromIca),
}

with open(cpg_utils.to_path(ica_outputs['pipeline_id']), 'rt') as pipeline_fid_handle:
pipeline_id: str = pipeline_fid_handle.read().rstrip()

# Confirm existence of CRAM and CRAI files
if not (outputs['cram']).exists():
raise FileNotFoundError(f'CRAM not found in {outputs["cram"]}')
if not (outputs['crai']).exists():
raise FileNotFoundError(f'CRAI not found in {outputs["crai"]}')

batch_instance: Batch = get_batch()
register_cram_job: BashJob = batch_instance.new_bash_job(
name='RegisterCramIcaOutputsInMetamist',
attributes=(self.get_job_attrs() or {}) | {'tool': 'ICA'},
)

register_cram_job.memory('8Gi')
register_cram_job.image(image=image_path('ica'))

register_cram_job.command(
f"""
echo 'Pipeline run {pipeline_id} CRAM and CRAI files successfully registered in Metamist'
""",
)

return self.make_outputs(
target=sequencing_group,
data=outputs,
jobs=register_cram_job,
)


@stage(
analysis_type='gvcf',
analysis_keys=['gvcf', 'gvcf_tbi'],
required_stages=[PrepareIcaForDragenAnalysis, ManageDragenPipeline, GvcfMlrWithDragen, DownloadDataFromIca],
forced=True, # Forcing stage as expected_outputs should always exist before running this stage
)
class RegisterGvcfIcaOutputsInMetamist(SequencingGroupStage):
def expected_outputs(self, sequencing_group: SequencingGroup) -> dict[str, cpg_utils.Path]:
bucket_name: cpg_utils.Path = sequencing_group.dataset.prefix()
pipeline_id_path = bucket_name / GCP_FOLDER_FOR_RUNNING_PIPELINE / f'{sequencing_group.name}_pipeline_id.json'

if not pipeline_id_path.exists():
raise FileNotFoundError(f'Pipeline ID not found in {pipeline_id_path}')

with open(cpg_utils.to_path(pipeline_id_path), 'rt') as pipeline_fid_handle:
pipeline_id = pipeline_fid_handle.read().strip()

download_path = (
bucket_name
/ GCP_FOLDER_FOR_ICA_DOWNLOAD
/ sequencing_group.name
/ f'{sequencing_group.name}-{pipeline_id}'
/ sequencing_group.name
)

return {
'gvcf': download_path / f'{sequencing_group.name}.hard-filtered.gvcf.gz',
'gvcf_tbi': download_path / f'{sequencing_group.name}.hard-filtered.gvcf.gz.tbi',
}

def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None:
outputs = self.expected_outputs(sequencing_group=sequencing_group)
ica_outputs = {
'pipeline_id': inputs.as_path(target=sequencing_group, stage=ManageDragenPipeline, key='pipeline_id'),
'downloaded_data': inputs.as_path(target=sequencing_group, stage=DownloadDataFromIca),
}

with open(cpg_utils.to_path(ica_outputs['pipeline_id']), 'rt') as pipeline_fid_handle:
pipeline_id: str = pipeline_fid_handle.read().rstrip()

# Confirm existence of CRAM and CRAI files
if not (outputs['gvcf']).exists():
raise FileNotFoundError(f'GVCF not found in {outputs["gvcf"]}')
if not (outputs['gvcf_tbi']).exists():
raise FileNotFoundError(f'GVCF_tbi not found in {outputs["gvcf_tbi"]}')

batch_instance: Batch = get_batch()
register_cram_job: BashJob = batch_instance.new_bash_job(
name='RegisterGvcfIcaOutputsInMetamist',
attributes=(self.get_job_attrs() or {}) | {'tool': 'ICA'},
)

register_cram_job.memory('8Gi')
register_cram_job.image(image=image_path('ica'))

register_cram_job.command(
f"""
echo 'Pipeline run {pipeline_id} GVCF and GVCF_TBI files successfully registered in Metamist'
""",
)

return self.make_outputs(
target=sequencing_group,
data=outputs,
jobs=register_cram_job,
)


@stage(required_stages=[RegisterCramIcaOutputsInMetamist, RegisterGvcfIcaOutputsInMetamist])
class RegisterAllIcaOutputsInMetamist(SequencingGroupStage):
pass
4 changes: 0 additions & 4 deletions dragen_378_realignment_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@
from cpg_workflows import defaults_config_path
from cpg_workflows.stages.realign_genotype_with_dragen import (
DownloadDataFromIca,
RegisterCramIcaOutputsInMetamist,
RegisterGvcfIcaOutputsInMetamist,
)
from cpg_workflows.workflow import StageDecorator, run_workflow

WORKFLOWS: dict[str, list[StageDecorator]] = {
'realign_genotype_with_dragen': [
RegisterCramIcaOutputsInMetamist,
RegisterGvcfIcaOutputsInMetamist,
DownloadDataFromIca,
],
}
Expand Down

0 comments on commit f6e5025

Please sign in to comment.