Skip to content

Commit

Permalink
dynamically adjust various run parameters based on bin sizes (#136)
Browse files Browse the repository at this point in the history
* proposed changes for a variable gres value

* Updated to dynamically adjust additional params.
  • Loading branch information
charles-cowart authored Mar 18, 2024
1 parent 7021b1e commit baf690a
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 19 deletions.
18 changes: 16 additions & 2 deletions sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
:param data_location_path: Path to the ConvertJob directory.
:param max_file_list_size_in_gb: Upper threshold for file-size.
:param batch_prefix: Path + file-name prefix for output-files.
:return: The number of output-files created.
:return: The number of output-files created, size of largest bin.
'''

# SPP root paths are of the form:
Expand All @@ -33,17 +33,31 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
current_size = max_size * 10
fp = None

bucket_size = 0
max_bucket_size = 0

for a, b in iter_paired_files(fastq_paths):
r1_size = os.stat(a).st_size
r2_size = os.stat(b).st_size

output_base = os.path.dirname(a).split('/')[-1]
if current_size + r1_size > max_size:
# bucket is full.
if bucket_size > max_bucket_size:
max_bucket_size = bucket_size

# reset bucket_size.
bucket_size = r1_size + r2_size

if fp is not None:
fp.close()

split_offset += 1
current_size = r1_size
fp = open(batch_prefix + '-%d' % split_offset, 'w')
else:
# add to bucket_size
bucket_size += r1_size + r2_size
current_size += r1_size

fp.write("%s\t%s\t%s\n" % (a, b, output_base))
Expand All @@ -54,7 +68,7 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
if split_offset == 0:
raise ValueError("No splits made")

return split_offset
return split_offset, max_bucket_size


def demux_cmd(id_map_fp, fp_fp, out_d, task, maxtask):
Expand Down
42 changes: 32 additions & 10 deletions sequence_processing_pipeline/NuQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,

self.counts = {}
self.known_adapters_path = known_adapters_path
self.max_file_list_size_in_gb = bucket_size
self.bucket_size = bucket_size
self.length_limit = length_limit

# NuQCJob() impl uses -c (--cores-per-task) switch instead of
Expand Down Expand Up @@ -227,14 +227,18 @@ def run(self, callback=None):
# now a single job-script will be created to process all projects at
# the same time, and intelligently handle adapter-trimming as needed
# as well as human-filtering.
job_script_path = self._generate_job_script()

batch_location = join(self.temp_dir, self.batch_prefix)

batch_count = 0 if self.force_job_fail else \
split_similar_size_bins(self.root_dir,
self.max_file_list_size_in_gb,
batch_location)
if self.force_job_fail:
batch_count = 0
max_size = 0
else:
batch_count, max_size = split_similar_size_bins(self.root_dir,
self.bucket_size,
batch_location)

job_script_path = self._generate_job_script(max_size)

self.counts[self.batch_prefix] = batch_count

Expand Down Expand Up @@ -391,12 +395,29 @@ def _process_sample_sheet(self):
'projects': lst,
'sample_ids': sample_ids}

def _generate_job_script(self):
def _generate_job_script(self, max_bucket_size):
# bypass generating job script for a force-fail job, since it is
# not needed.
if self.force_job_fail:
return None

gigabyte = 1073741824

if max_bucket_size > (3 * gigabyte):
gres_value = 4
else:
gres_value = 2

if max_bucket_size < gigabyte:
mod_wall_time_limit = self.wall_time_limit
mod_jmem = self.jmem
elif max_bucket_size < (2 * gigabyte):
mod_wall_time_limit = self.wall_time_limit * 1.5
mod_jmem = self.jmem * 4.5
else:
mod_wall_time_limit = self.wall_time_limit * 2
mod_jmem = self.jmem * 7.5

job_script_path = join(self.output_path, 'process_all_fastq_files.sh')
template = self.jinja_env.get_template("nuqc_job.sh")

Expand Down Expand Up @@ -428,8 +449,8 @@ def _generate_job_script(self):
f.write(template.render(job_name=job_name,
queue_name=self.queue_name,
# should be 4 * 24 * 60 = 4 days
wall_time_limit=self.wall_time_limit,
mem_in_gb=self.jmem,
wall_time_limit=mod_wall_time_limit,
mem_in_gb=mod_jmem,
# Note NuQCJob now maps node_count to
# SLURM -N parameter to act like other
# Job classes.
Expand All @@ -445,7 +466,8 @@ def _generate_job_script(self):
temp_dir=self.temp_dir,
splitter_binary=splitter_binary,
modules_to_load=mtl,
length_limit=self.length_limit))
length_limit=self.length_limit,
gres_value=gres_value))

return job_script_path

Expand Down
2 changes: 1 addition & 1 deletion sequence_processing_pipeline/templates/nuqc_job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
### as well as sbatch -c. demux threads remains fixed at 1.
### Note -c set to 4 and thread counts set to 7 during testing.
#SBATCH -c {{cores_per_task}}
#SBATCH --gres=node_jobs:4
#SBATCH --gres=node_jobs:{{gres_value}}


echo "---------------"
Expand Down
3 changes: 2 additions & 1 deletion sequence_processing_pipeline/tests/test_NuQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,8 @@ def test_generate_job_script(self):
'fastp', 'minimap2', 'samtools', [], self.qiita_job_id,
1000, '')

job_script_path = job._generate_job_script()
# 2k as a parameter will promote the default value.
job_script_path = job._generate_job_script(2048)

self.assertTrue(exists(job_script_path))

Expand Down
11 changes: 6 additions & 5 deletions sequence_processing_pipeline/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ class MockStat:
'/foo/bar/b_R1_.fastq.gz']

with TemporaryDirectory() as tmp:
exp = 2
exp_1 = ('/foo/bar/a_R1_.fastq.gz\t/foo/bar/a_R2_.fastq.gz\tbar\n'
'/foo/bar/b_R1_.fastq.gz\t/foo/bar/b_R2_.fastq.gz\tbar\n')
exp_2 = '/foo/baz/c_R1_.fastq.gz\t/foo/baz/c_R2_.fastq.gz\tbaz\n'

exp = (2, 1073741824)
stat.return_value = MockStat() # 512MB
glob.return_value = mockglob
obs = split_similar_size_bins('foo', 1, tmp + '/prefix')
self.assertEqual(obs, exp)

exp_1 = ('/foo/bar/a_R1_.fastq.gz\t/foo/bar/a_R2_.fastq.gz\tbar\n'
'/foo/bar/b_R1_.fastq.gz\t/foo/bar/b_R2_.fastq.gz\tbar\n')
exp_2 = '/foo/baz/c_R1_.fastq.gz\t/foo/baz/c_R2_.fastq.gz\tbaz\n'

obs_1 = open(tmp + '/prefix-1').read()
self.assertEqual(obs_1, exp_1)
obs_1 = open(tmp + '/prefix-2').read()
Expand Down

0 comments on commit baf690a

Please sign in to comment.