-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathphyloki.py
373 lines (305 loc) · 13.7 KB
/
phyloki.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
import os
import random
from Bio import Entrez, SeqIO
import pandas as pd
def get_sequences(email, file_path, output_dir):
"""
Download sequences for accession numbers and save them to specified directory.
Args:
email (str): Email address to use with NCBI Entrez tool.
file_path (str): Path to the file containing accession numbers.
output_dir (str): Directory to save downloaded sequences.
"""
# Set the NCBI Entrez email
Entrez.email = email
# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)
# Read accession numbers from file
with open(file_path, "r") as file:
accession_numbers = file.read().split()
# Function to download sequence
def download_sequence(accession):
try:
# Fetching the sequence from NCBI
handle = Entrez.efetch(
db="nucleotide", id=accession, rettype="fasta", retmode="text"
)
record = SeqIO.read(handle, "fasta")
handle.close()
# Saving the sequence to a file
output_path = os.path.join(output_dir, f"{accession}.fasta")
SeqIO.write(record, output_path, "fasta")
print(f"Downloaded: {accession}")
except Exception as e:
print(f"Failed to download {accession}: {e}")
# Download sequences for each accession number
for accession in accession_numbers:
download_sequence(accession)
print("All downloads completed.")
def read_accession_file(filename):
"""
Reads a file and extracts accession numbers, each assumed to be on a separate line.
Args:
filename (str): The path to the file from which accession numbers are to be read.
Each accession number should be on its own line, and the file should be
plain text with no extra formatting.
Returns:
list: A list of strings, where each string is an accession number extracted from the file.
This list will be empty if the file contains no lines.
"""
with open(filename, "r") as file:
accession_numbers = [line.strip() for line in file.readlines()]
return accession_numbers
def fetch_organism_names(email, accession_numbers):
"""Fetch organism names and versions from NCBI's Entrez for given accession numbers."""
Entrez.email = email
organism_dict = {}
for accession in accession_numbers:
try:
handle = Entrez.efetch(db="nucleotide", id=accession, retmode="xml")
records = Entrez.read(handle)
organism_name = records[0]["GBSeq_organism"]
accession_version = records[0][
"GBSeq_accession-version"
] # Fetch the accession version
organism_dict[accession_version] = (
organism_name # Use accession version as key
)
handle.close()
except IndexError:
print(f"No data found for {accession}")
except KeyError:
print(f"Organism name or accession version missing for {accession}")
except Exception as e:
print(f"Error retrieving data for {accession}: {e}")
handle.close()
return organism_dict
def get_organisms(email, input_filename, output_filename):
"""Main function to get organisms with version and write them to a file."""
accession_numbers = read_accession_file(input_filename)
organism_dict = fetch_organism_names(email, accession_numbers)
with open(output_filename, "w") as file:
for accession_version, organism in organism_dict.items():
file.write(f"{accession_version} {organism}\n")
print(f"The request has been fulfilled.\nFile saved to {output_filename}")
def load_annotations(file_path):
"""
Load organism annotations from a given text file into a dictionary.
"""
annotations = {}
with open(file_path, "r") as file:
for line in file:
parts = line.strip().split(maxsplit=1)
if len(parts) == 2:
annotations[parts[0]] = parts[1]
return annotations
def replace_names_in_tree(tree_file_path, annotations, output_file_path):
"""
Replace accession numbers in the tree file with accession numbers and organism names.
"""
with open(tree_file_path, "r") as tree_file:
tree_data = tree_file.read()
for accession, organism in annotations.items():
tree_data = tree_data.replace(f"{accession}:", f"{accession} {organism}:")
with open(output_file_path, "w") as output_file:
output_file.write(tree_data)
def update_tree(annotation_file_path, tree_file_path, output_file_path):
"""
Update a tree file by replacing accession numbers with annotated names from a given annotation file.
Args:
annotation_file_path (str): Path to the text file containing accession numbers and organism names.
tree_file_path (str): Path to the tree file.
output_file_path (str): Path to save the updated tree file.
"""
annotations = load_annotations(annotation_file_path)
replace_names_in_tree(tree_file_path, annotations, output_file_path)
print(f"The request has been fulfilled.\nFile saved to {output_file_path}")
def read_accession_file(filename):
"""
Read accession numbers from a file.
"""
with open(filename, "r") as file:
accession_numbers = [line.strip() for line in file.readlines()]
return accession_numbers
def fetch_host_info(accession_numbers, email):
"""
Fetch host information for each accession number from NCBI.
"""
Entrez.email = email # Always set this to your email address
host_info = {}
for accession in accession_numbers:
try:
handle = Entrez.efetch(db="nucleotide", id=accession, retmode="xml")
records = Entrez.read(handle)
version = records[0][
"GBSeq_accession-version"
] # Retrieve the versioned accession number
host = "ND" # Default if no host information is found
if "GBSeq_feature-table" in records[0]:
features = records[0]["GBSeq_feature-table"]
for feature in features:
if feature["GBFeature_key"] == "source":
for qualifier in feature["GBFeature_quals"]:
if qualifier["GBQualifier_name"] == "host":
host = qualifier["GBQualifier_value"]
break
host_info[version] = host # Use versioned accession number as the key
except Exception as e:
print(f"Error fetching data for {accession}: {e}")
host_info[version] = "ND" # Assign 'ND' in case of any error
finally:
handle.close()
return host_info
def get_hosts(email, input_filename, output_filename):
"""
Retrieve host information for accession numbers and save it to a file.
Args:
email (str): Email address to use with NCBI Entrez tool.
input_filename (str): Path to the file containing accession numbers.
output_filename (str): Path to save the host information.
"""
accession_numbers = read_accession_file(input_filename)
host_info = fetch_host_info(accession_numbers, email)
with open(output_filename, "w") as file:
for accession, host in host_info.items():
file.write(f"{accession} {host}\n")
print(f"The request has been fulfilled.\nFile saved to {output_filename}")
def get_order(species_name, email):
"""
Fetches the taxonomic order for a given species name using Entrez,
handling potential errors and ambiguous entries.
"""
if species_name == "ND":
return "ND" # Maintain "ND" for no data entries
# Extract the relevant part of the species name (typically the first two words)
clean_species_name = " ".join(species_name.split()[:2])
Entrez.email = email
try:
handle = Entrez.esearch(db="taxonomy", term=clean_species_name)
record = Entrez.read(handle)
handle.close()
if not record["IdList"]: # Check if any results were found
return f"{species_name} - Note - False record"
tax_id = record["IdList"][0]
handle = Entrez.efetch(db="taxonomy", id=tax_id, retmode="xml")
records = Entrez.read(handle)
handle.close()
lineage = records[0]["LineageEx"]
for taxon in lineage:
if taxon["Rank"] == "order":
return taxon["ScientificName"]
except Exception as e:
return f"{species_name} - Error - {e}"
return "ND" # Return "ND" if order not found or in case of unexpected errors
def get_hosts_orders(email, input_filename, output_filename):
"""
Process an input file to associate host organisms with their taxonomic orders and save the results to an output file.
Args:
email (str): Email address to use with NCBI Entrez tool.
input_filename (str): Path to the file containing accession numbers and host organisms.
output_filename (str): Path to save the host orders.
"""
with open(input_filename, "r") as infile, open(output_filename, "w") as outfile:
for line in infile:
parts = line.strip().split(maxsplit=1)
if len(parts) == 2:
accession, species = parts
else:
continue # Skip lines that don't have two parts
order = get_order(species, email) if species != "ND" else "ND"
outfile.write(f"{accession}\t{order}\n")
print(
f'The request has been fulfilled.\nFile saved to {output_filename}\nPlease do not forget to edit the file manually.\nThe query to NCBI database from this function is pretty difficult.\nSometimes this function prints:\n"Error - HTTP Error 400: Bad Request" in case of bad connection or\n"Note - False record" in case there is no record about the host organism.'
)
def get_unique_orders(file_path):
"""
Extracts and returns a list of unique groups (orders) from the specified file.
Args:
file_path (str): Path to the file containing accession numbers and group classifications.
Returns:
list: A list of unique group identifiers.
"""
# Load the data into a DataFrame
order_df = pd.read_csv(
file_path, sep="\t", header=None, names=["Accession", "Group"]
)
# Get unique groups
unique_groups = order_df["Group"].unique().tolist()
return unique_groups
def set_color_map(file_path):
"""
Prompts the user to set HEX color codes for each unique group found in the file.
"""
unique_groups = get_unique_orders(file_path)
color_map = {}
for group in unique_groups:
# Keep prompting until a valid HEX color code is entered
while True:
color_code = input(
f"Enter HEX color code for {group} (without #): "
).strip()
# Check if the entered color code is valid
if len(color_code) == 6 and all(
c in "0123456789ABCDEFabcdef" for c in color_code
):
color_map[group] = f"#{color_code}"
break
else:
print("Invalid HEX code. Please enter a 6-digit hexadecimal number.")
return color_map
def generate_random_color():
"""Generate a random hex color."""
return "#{:06x}".format(random.randint(0, 0xFFFFFF))
def get_itol_dataset(organism_file, order_file, output_file, color_map=None):
"""
Generates a dataset for iTOL from given organism and order files, assigning colors to unique groups.
Allows for optional provision of a custom color map; if not provided, colors are generated randomly.
Args:
organism_file (str): Path to the file containing accession numbers and organism names.
order_file (str): Path to the file containing accession numbers and group classifications.
output_file (str): Path to save the iTOL dataset file.
color_map (dict, optional): Dictionary of colors for each group. If None, colors are generated randomly.
"""
# Ensure the output directory exists
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# Load order data
order_df = pd.read_csv(
order_file, sep="\t", header=None, names=["Accession", "Group"]
)
unique_groups = order_df["Group"].unique()
if color_map is None:
color_map = {group: generate_random_color() for group in unique_groups}
print("Colors were not set, they were generated randomly.")
else:
print("Colors were set by the user.")
# Manually parse the organism data
parsed_data = []
with open(organism_file, "r") as file:
for line in file:
split_line = line.strip().split(maxsplit=1)
if len(split_line) == 2:
parsed_data.append(split_line)
else:
print(f"Skipping line due to parsing issue: {line.strip()}")
organism_df = pd.DataFrame(parsed_data, columns=["Accession", "Organism"])
# Merge the dataframes on Accession number
merged_df = pd.merge(order_df, organism_df, on="Accession", how="left")
merged_df["Color"] = merged_df["Group"].map(color_map)
# Define iTOL dataset headers
itol_header = [
"DATASET_COLORSTRIP",
"SEPARATOR TAB",
"DATASET_LABEL\tHost Group Colors",
"DATA",
]
# Format data for iTOL
data_lines = merged_df.apply(
lambda row: f"{row['Accession']} {row['Organism']}\t{row['Color']}\t{row['Group']}",
axis=1,
).tolist()
itol_content = itol_header + data_lines
# Write to iTOL dataset file
with open(output_file, "w") as file:
for line in itol_content:
file.write(line + "\n")
print("The request has been fulfilled.")