Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NCBI Genomes Breaking Pipeline #28

Open
ERBringHorvath opened this issue Dec 19, 2024 · 0 comments
Open

NCBI Genomes Breaking Pipeline #28

ERBringHorvath opened this issue Dec 19, 2024 · 0 comments

Comments

@ERBringHorvath
Copy link

I'm working with a population of ~10,000 S. pyogenes genomes downloaded using NCBI Datasets. For some reason, certain genomes will break the emmTyper pipeline, resulting in this error printing to my console:

INFO:emmtyper.objects.blast:Running command /home/ebh/miniforge3/envs/emmtyper/bin/blastn -db "\"/home/ebh/miniforge3/envs/emmtyper/lib/python3.13/site-packages/emmtyper/db/emm.fna"\" -query GCA_034940365_1_PDT001723085_1_genomic.fna -dust no -perc_identity 95 -culling_limit 5 -outfmt "6 std slen"
Traceback (most recent call last):
  File "/home/ebh/miniforge3/envs/emmtyper/bin/emmtyper", line 10, in <module>
    sys.exit(main())
             ~~~~^^
  File "/home/ebh/miniforge3/envs/emmtyper/lib/python3.13/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
           ~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/home/ebh/miniforge3/envs/emmtyper/lib/python3.13/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/home/ebh/miniforge3/envs/emmtyper/lib/python3.13/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ebh/miniforge3/envs/emmtyper/lib/python3.13/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/home/ebh/miniforge3/envs/emmtyper/lib/python3.13/site-packages/emmtyper/bin/run_emmtyper.py", line 193, in main
    clusterer()
    ~~~~~~~~~^^
  File "/home/ebh/miniforge3/envs/emmtyper/lib/python3.13/site-packages/emmtyper/objects/clusterer.py", line 430, in __call__
    self.classify_expected_answer()
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
  File "/home/ebh/miniforge3/envs/emmtyper/lib/python3.13/site-packages/emmtyper/objects/clusterer.py", line 384, in classify_expected_answer
    votes = np.array([[item[0], item[1]] for item in self.best_in_clusters])
ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 2 dimensions. The detected shape was (2, 2) + inhomogeneous part.

When this happens, the offending genome needs to be removed from the analysis population and the pipeline restarted. For so many genomes, this became very time consuming. I don't know what the underlying issue is, but my main goal was to find a way to keep emmTyper going in the event this error occurred. To address this, I built a simple wrapper for emmTyper that moves any pipeline-breaking FASTA files to a new directory without interrupting the larger process:

import os
import shutil
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed
import argparse

VALID_FASTA_EXTENSIONS = {".fna", ".fa", ".fas", ".fasta", ".ffn"}

def run_emmtyper(genome, input_dir, failed_dir, combined_output_path):
    """
    Run emmTyper on a single genome.
    If it fails, move the genome file to the failed directory and log the error.
    Append successful results to the combined output file.
    """
    input_path = os.path.join(input_dir, genome)
    try:
        #Run emmTyper and capture its output
        cmd = [
            "emmtyper",
            input_path,
            "-f",
            "verbose",
        ]
        env = os.environ.copy()
        env['PYTHONWARNINGS'] = "ignore"
        result = subprocess.run(
            cmd, 
            check=True, 
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE, #Redirect stderr to suppress logs for cleaner console
            text=True,
            env=env
            )

        #Append output to the combined file
        with open(combined_output_path, "a") as combined_output:
            combined_output.write(result.stdout)

        return genome, None  #Successful result, no error
    except subprocess.CalledProcessError as e:
        #Move pipeline-breaking genomes to a new directory
        shutil.move(input_path, os.path.join(failed_dir, genome))
        return genome, str(e)  # Error result
    
def main():
    parser = argparse.ArgumentParser(description="Wrapper for emmTyper with error handling")
    parser.add_argument("-i", "--input", required=True, help="Directory containing FASTA files to analyze")
    parser.add_argument("-o", "--output", required=True, help="Path to results file")
    parser.add_argument("-f", "--failed-dir", required=True, help="Directory to save failed genome files")
    parser.add_argument("-t", "--threads", type=int, default=4, help="Number of cores to dedicate for parallel processing")
    args = parser.parse_args()

    # Ensure the log file is in the same directory as the combined output
    log_file = os.path.join(os.path.dirname(args.output), "emmWrapper.log")

    # Ensure the failed directory exists
    os.makedirs(args.failed_dir, exist_ok=True)

    # Clear the output and log files if they exist
    open(args.output, "w").close()
    open(log_file, "w").close()

    #Filter valid genome files
    genomes = [
        g for g in os.listdir(args.input)
        if os.path.isfile(os.path.join(args.input, g)) and os.path.splitext(g)[1].lower() in VALID_FASTA_EXTENSIONS
    ]

    #Log skipped files
    skipped_files = [
        g for g in os.listdir(args.input)
        if os.path.isfile(os.path.join(args.input, g)) and os.path.splitext(g)[1].lower() not in VALID_FASTA_EXTENSIONS
    ]
    with open(log_file, "a") as log:
        if skipped_files:
            log.write(f"Skipped files with invalid extensions: {', '.join(skipped_files)}\n")
            print(f"\033[95mSkipped files with invalid extensions: {', '.join(skipped_files)}\033[0m")

    #Process genomes in parallel
    with ProcessPoolExecutor(max_workers=args.threads) as executor:
        future_to_genome = {
            executor.submit(run_emmtyper, genome, args.input, args.failed_dir, args.output): genome
            for genome in genomes
        }

        with open(log_file, "a") as log:
            for future in as_completed(future_to_genome):
                genome = future_to_genome[future]
                try:
                    genome, error = future.result()

                    if error:
                        log.write(f"Error processing {genome}: {error}\n")  # Log failures
                        print(f"\033[91mError processing {genome}: {error}\033[0m")
                    else:
                        print(f"\033[92mSuccessfully processed: {genome}\033[0m")
                except Exception as exc:
                    log.write(f"Unhandled exception for {genome}: {exc}\n")
                    print(f"\033[91mUnhandled exception for {genome}: {exc}\033[0m")

    print("\n \033[97mAnalysis complete.\033[0m\n")

if __name__ == "__main__":
    main()

I just wanted to bring this error to your attention and share my solution.

Regards,
Eli BH, PhD

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant