Skip to content

Commit

Permalink
Merge pull request #47 from Psy-Fer/7.4.12_dorado
Browse files Browse the repository at this point in the history
7.4.12 dorado for v0.5.0 buttery-eel
  • Loading branch information
Psy-Fer authored Sep 5, 2024
2 parents 31ca0ad + 1657c15 commit c06452a
Show file tree
Hide file tree
Showing 18 changed files with 1,923 additions and 276 deletions.
157 changes: 122 additions & 35 deletions README.md

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions docs/set_params_changes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
The following options were removed:

```
| * post_out (bool): Flag indicating whether to return full posterior state data. Default is False.
| * lamp_kit (str): String naming LAMP barcode kit to use. Default is to not do LAMP barcoding.
| * detect_mid_strand_barcodes (bool): Flag indicating that read will be marked as unclassified if barcodes appear
| within the strand itself. Default is False.
| * detect_mid_strand_adapter (bool): Flag indicating that read will be marked as unclassified if the adapter
| sequence appears within the strand itself. Default is False.
| * min_score_barcode_front (float): Minimum score for a front barcode to be classified. Default is 60.
| * min_score_barcode_rear (float): Minimum score for a rear barcode to be classified. Default is to use the front
| minimum.
| * min_score_barcode_mid (float): Minimum score for mid barcodes to be detected. Default is 50.
| * detect_adapter (bool): Enable detection of adapters at the front and rear of the sequence.
| * detect_primer (bool): Enable detection of primers at the front and rear of the sequence.
| * min_score_adapter (float): Minimum score for a front or rear adapter to be classified. Default is 60.
| * min_score_primer (float): Minimum score for a front or rear primer to be classified. Default is 60.
| * align_type (str): Type of alignment requested. Valid values are "auto", "coarse", and "full".
| Default is "auto".
| * min_score_lamp (float): Minimum score for a LAMP barcode to be detected. Default is 80.
| * min_score_lamp_mask (float): Minimum score for a LAMP barcode mask to be detected. Default is 50.
| * min_score_lamp_target (float): Minimum score for a LAMP target to be detected. Default is 50.
| * additional_lamp_context_bases (int): Number of additional bases from context to include in lamp barcode comparison.
| Default is 2.
```

The follow were changed:

old
```
| * move_and_trace_enabled (bool): Flag indicating whether to return trace and move data. Default is True.
```
new
```
| * move_enabled (bool): Flag indicating whether to return move data. Default is False.
```
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
numpy
pyslow5>=1.1.0
# ont-pyguppy-client-lib==7.2.13
ont-pybasecall-client-lib==7.3.10
ont-pybasecall-client-lib==7.4.12
2 changes: 1 addition & 1 deletion src/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__="0.4.3"
__version__="0.5.0"
284 changes: 194 additions & 90 deletions src/basecaller.py

Large diffs are not rendered by default.

95 changes: 67 additions & 28 deletions src/buttery_eel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import os
import multiprocessing as mp
import platform

try:
import pybasecall_client_lib
Expand All @@ -24,14 +25,14 @@
from .basecaller import start_guppy_server_and_client, basecaller_proc

# region constants
total_reads = 0
div = 50
skipped = 0
# total_reads = 0
# div = 50
# skipped = 0

# How we get data out of the model files if they are not provided by the metadata output

# def get_model_info(config, guppy_bin):
# config = os.path.join(guppy_bin,"../data/", config)
# def get_model_info(config, basecaller_bin):
# config = os.path.join(basecaller_bin,"../data/", config)
# model = ""
# with open(config, 'r') as f:
# for line in f:
Expand All @@ -47,7 +48,7 @@
# logger.warning("could not deduce model for fastq writing, falling back to default, model_version_id=conf")
# model_version_id = config
# else:
# model_json = os.path.join(guppy_bin,"../data/", model)
# model_json = os.path.join(basecaller_bin,"../data/", model)
# with open(model_json, 'r') as f:
# jdata = json.load(f)
# model_version_id = jdata["version"]["id"]
Expand All @@ -62,7 +63,7 @@ def main():
"""
Example:
buttery-eel --guppy_bin /install/ont-guppy-6.1.3/bin --use_tcp --chunk_size 200 \
buttery-eel --basecaller_bin /install/ont-guppy-6.1.3/bin --use_tcp --chunk_size 200 \
--max_queued_reads 1000 -x "cuda:all" --config dna_r9.4.1_450bps_fast.cfg --port 5558 \
-i /Data/test.blow5 -o /Data/test.fastq
Expand All @@ -71,27 +72,20 @@ def main():

VERSION = __version__

# get args from cli
args, other_server_args, arg_error = get_args()

# get version to set secret flags
above_7310_flag = False
above_7412_flag = False
try:
major, minor, patch = [int(i) for i in pybasecall_client_lib.__version__.split(".")]
except:
major, minor, patch = [int(i) for i in pyguppy_client_lib.__version__.split(".")]
if major >= 7 and minor >= 3:
above_7310_flag = True
else:
above_7310_flag = False

if major >= 7 and minor >= 4:
above_7412_flag = True

# add super sneaky hidden flags the user can't interact with but makes global sharing easier
extra_args = argparse.Namespace(
above_7310=above_7310_flag, # is the version >= 7.3.* where the name and inputs change?
)

# now merge them. This will all get printed into the arg print below which also helps with troubleshooting
args = argparse.Namespace(**vars(args), **vars(extra_args))
# get args from cli
args, other_server_args, arg_error = get_args(above_7310_flag, above_7412_flag)

if len(sys.argv) == 1:
arg_error(sys.stderr)
Expand Down Expand Up @@ -161,6 +155,10 @@ def main():
print("throttle: {}".format(client.throttle))
# print("Client Basecalling config:")
# print(client.get_basecalling_config())
bc_config = client.get_basecalling_config()[0]
# print(bc_config)
# print("model: {}".format(bc_config["model_version_id"]))
model_version_id = bc_config["model_version_id"]
# print("Server Basecalling config:")
# print(client.get_server_information("127.0.0.1:5000", 10))
# print(client.get_barcode_kits("127.0.0.1:{}".format(args.port), 10))
Expand Down Expand Up @@ -232,12 +230,25 @@ def main():
print()

mp.set_start_method('spawn')
input_queue = mp.JoinableQueue()
result_queue = mp.JoinableQueue()

if platform.system() == "Darwin":
im = mp.Manager()
rm = mp.Manager()
sm = mp.Manager()
input_queue = im.JoinableQueue()
result_queue = rm.JoinableQueue()
skip_queue = sm.JoinableQueue()
else:
input_queue = mp.JoinableQueue()
result_queue = mp.JoinableQueue()
skip_queue = mp.JoinableQueue()

processes = []

if args.duplex:
if platform.system() == "Darwin":
print("MacOS not currently supported for duplex calling")
sys.exit(1)
if args.single:
print("Duplex mode active - a duplex model must be used to output duplex reads")
print("Buttery-eel does not have checks for this, as the model names are in flux")
Expand All @@ -250,10 +261,10 @@ def main():
duplex_queue = mp.JoinableQueue()
reader = mp.Process(target=duplex_read_worker_single, args=(args, duplex_queue, duplex_pre_queue), name='duplex_read_worker_single')
reader.start()
out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT), name='write_worker')
out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT, model_version_id), name='write_worker')
out_writer.start()
# set up each worker to have a unique queue, so it only processes 1 channel at a time
basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queue, result_queue, address, config, params, 0), daemon=True, name='basecall_worker_{}'.format(0))
basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queue, result_queue, skip_queue, address, config, params, 0), daemon=True, name='basecall_worker_{}'.format(0))
basecall_worker.start()
processes.append(basecall_worker)

Expand All @@ -267,20 +278,20 @@ def main():
duplex_queues = {name: mp.JoinableQueue() for name in queue_names}
reader = mp.Process(target=duplex_read_worker, args=(args, duplex_queues, duplex_pre_queue), name='duplex_read_worker')
reader.start()
out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT), name='write_worker')
out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT, model_version_id), name='write_worker')
out_writer.start()
# set up each worker to have a unique queue, so it only processes 1 channel at a time
for name in queue_names:
basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queues[name], result_queue, address, config, params, name), daemon=True, name='basecall_worker_{}'.format(name))
basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queues[name], result_queue, skip_queue, address, config, params, name), daemon=True, name='basecall_worker_{}'.format(name))
basecall_worker.start()
processes.append(basecall_worker)
else:
reader = mp.Process(target=read_worker, args=(args, input_queue), name='read_worker')
reader.start()
out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT), name='write_worker')
out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT, model_version_id), name='write_worker')
out_writer.start()
for i in range(args.procs):
basecall_worker = mp.Process(target=basecaller_proc, args=(args, input_queue, result_queue, address, config, params, i), daemon=True, name='basecall_worker_{}'.format(i))
basecall_worker = mp.Process(target=basecaller_proc, args=(args, input_queue, result_queue, skip_queue, address, config, params, i), daemon=True, name='basecall_worker_{}'.format(i))
basecall_worker.start()
processes.append(basecall_worker)

Expand All @@ -289,9 +300,37 @@ def main():
p.join()
result_queue.put(None)
out_writer.join()

if skip_queue.qsize() > 0:
print("1")
skipped = 0
skip_queue.put(None)
if "/" in args.output:
SKIPPED = open("{}/skipped_reads.txt".format("/".join(args.output.split("/")[:-1])), "w")
print("Skipped reads detected, writing details to file: {}/skipped_reads.txt".format("/".join(args.output.split("/")[:-1])))
else:
SKIPPED = open("./skipped_reads.txt", "w")
print("Skipped reads detected, writing details to file: ./skipped_reads.txt")

SKIPPED.write("read_id\tstage\terror\n")
print("2")

while True:
read = skip_queue.get()
if read is None:
break
read_id, stage, error = read
skipped += 1
SKIPPED.write("{}\t{}\t{}\n".format(read_id, stage, error))

print("3")
SKIPPED.close()
print("Skipped reads total: {}".format(skipped))

print("\n")
print("Basecalling complete!\n")


# ==========================================================================
# Finish up, close files, disconnect client and terminate server
# ==========================================================================
Expand Down
Loading

0 comments on commit c06452a

Please sign in to comment.