Skip to content

Commit

Permalink
get intermediate gene info from GenBank format instead of feature table
Browse files Browse the repository at this point in the history
  • Loading branch information
gamcil committed Oct 25, 2022
1 parent 9aa2a24 commit cbf75ac
Showing 1 changed file with 35 additions and 138 deletions.
173 changes: 35 additions & 138 deletions cblaster/intermediate_genes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
import time
import requests
import re
import io

from Bio import SeqIO

from cblaster.extract_clusters import get_sorted_cluster_hierarchies
from cblaster.database import query_intermediate_genes
from cblaster.classes import Subject
from cblaster.genome_parsers import seqrecord_to_tuples


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -51,7 +55,7 @@ def intermediate_genes_request(accession, start, stop):
"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi",
params={
"db": "nuccore",
"rettype": "ft",
"rettype": "gb",
"from": str(start),
"to": str(stop),
},
Expand All @@ -66,6 +70,34 @@ def intermediate_genes_request(accession, start, stop):
return response


def intermediate_genes_genbank(response):
features = []
for record in SeqIO.parse(io.StringIO(response.text), "genbank"):
subjects = []
for feature in seqrecord_to_tuples(record, None):
try:
subject = tuple_to_subject(feature)
except TypeError:
continue
subjects.append(subject)
features.extend(subjects)
return features


def tuple_to_subject(feature):
assert len(feature) == 8, "Tuple should be of length 8"
ftype, name, start, end, strand, sequence, record_id, source_id = feature
if ftype != "gene":
raise TypeError(f"Feature type {ftype}, expecting 'gene'")
return Subject(
name=name,
start=start,
end=end,
strand=strand,
sequence=sequence,
)


def set_remote_intermediate_genes(cluster_hierarchy, gene_distance):
"""Adds intermediate genes to clusters in the cluster_hierarchy from NCBI feature tables.
Expand All @@ -81,147 +113,12 @@ def set_remote_intermediate_genes(cluster_hierarchy, gene_distance):
search_start = max(0, cluster.start - gene_distance)
search_stop = cluster.end + gene_distance
start_time = time.time()
response = intermediate_genes_request(
scaffold_accession, search_start, search_stop
)
subjects = genes_from_feature_table(response.text, search_start)
response = intermediate_genes_request(scaffold_accession, search_start, search_stop)
subjects = intermediate_genes_genbank(response)
cluster.intermediate_genes = get_remote_intermediate_genes(subjects, cluster)
passed_time = time.time() - start_time


def transfer_protein_ids(genes, cdss):
"""Transfers protein_id qualifier from CDS to matching gene features."""
for gene in genes:
for cds in cdss:
if gene["start"] <= cds["start"] and cds["end"] <= gene["end"]:
gene["protein_id"] = cds["protein_id"]
break


def parse_table_features(table_text):
"""Parse features into dictionaries containing start, end and qualifiers """
features = []
feature = {}
for line in table_text.split("\n"):
tabs = line.split("\t")
if len(tabs) == 3:
if feature:
features.append(feature)
start, end, ftype = tabs
start, end, strand = get_start_end_strand(start, end)
feature = dict(
type=ftype,
start=start,
end=end,
strand=strand,
locus_tag=None,
protein_id=None,
)
elif len(tabs) == 2 and feature:
start, end = tabs
start, end, _ = get_start_end_strand(start, end)
feature["start"] = min(feature["start"], start)
feature["end"] = max(feature["end"], end)
elif len(tabs) == 5:
*_, key, value = tabs
if key in ("locus_tag", "protein_id"):
feature[key] = value

# Make sure we catch the last feature
if feature:
features.append(feature)

# If there are gene features, prefer using their coordinates instead of CDS.
# Just transfer protein_ids from matching CDS features and return.
# Otherwise, just return CDS features.
genes = [f for f in features if f["type"] == "gene"]
cdss = [f for f in features if f["type"] == "CDS"]
if genes:
transfer_protein_ids(genes, cdss)
return genes
return cdss


def set_gene_name(feature):
"""Sets the name of a gene feature.
Tries to extract protein_id from feature table protein_id qualifier,
which takes the form:
source database|protein_id|additional qualifiers
For example:
ref|WP_063781455.1|
gb|ACG70812.1|
emb|SPB51992.1||gnl|WGS:OGUI|SPB51992
dbj|GAQ43934.1|
If this fails, falls back to locus_tag.
"""
pattern = re.compile(r"^\w+\|(?P<protein_id>[A-Za-z0-9\._-]+)(?:\||$)")
protein_id = feature.pop("protein_id", None)
locus_tag = feature.pop("locus_tag", None)
try:
feature["name"] = pattern.search(protein_id).group("protein_id")
except:
feature["name"] = locus_tag


def genes_from_feature_table(table_text, search_start):
"""Extracts all CDS regions from a feature table returned by NCBI.
Additional information for the feature table can be found here:
https://www.ncbi.nlm.nih.gov/WebSub/html/help/feature-table.html
Args:
table_text (str): The feature table in text format given by NCBI
search_start (int): The base pair start of the region where genes are returned from
since the location of genes provided is relative to the requested region.
Returns:
List of cblaster Subject objects containing the intermediate genes.
"""

# Parse features into dictionaries containing start, end and qualifiers
features = parse_table_features(table_text)

# Instantiate Subject objects
# - Add scaffold search start to gene start/end
# - Extract protein IDs from protein_id qualifiers
subjects = []
for feature in features:
feature.pop("type")
feature["start"] += search_start
feature["end"] += search_start
set_gene_name(feature)
subject = Subject(**feature)
subjects.append(subject)

return subjects


def get_start_end_strand(start, end):
"""Extracts the start and end locations from the start and end string.
The location can contain < and > which should be removed. Additionally
if the end is smaller then the start the location is on the negative strand
Args:
start(str): string representing the start location of a gene
end(str): string representing the end location of a gene
Returns:
a tuple with start as integer, end as integer and strand as '+' or '-'
"""
start = int(start.replace("<", "").replace(">", ""))
end = int(end.replace("<", "").replace(">", ""))
strand = 1
if start > end:
strand = -1
new_start = end
end = start
start = new_start
return start, end, strand


def get_remote_intermediate_genes(subjects, cluster):
"""
Get all genes that are not part of the cluster from the list of subjects
Expand Down

0 comments on commit cbf75ac

Please sign in to comment.