forked from draskot/Vini
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cosmicTools.py
executable file
·290 lines (247 loc) · 11.1 KB
/
cosmicTools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
import os
import pathlib
import csv_splitter
import requests
import time
import csv
from time import sleep
TOKEN_NUMBER = "38287358613189592956605876772060261"
WORKING_DIR = os.path.join(os.path.realpath('.'), 'genes')
def mapUniprotIDToCosmicID_fromList(UNIPROT_LIST):
"""
Function for converting genes Uniprot ID name to Cosmic ID name by
rest api to uniprot.org.
Saves pair of Uniprot ID & Cosmic ID to automatically creates csv file "cosmic_ids.csv".
Parameter
---------
UNIPROT_LIST : str
File with list Uniprot ID gene names
"""
cosmic_id_csv_file = pathlib.Path(WORKING_DIR, "cosmic_ids.csv")
if not cosmic_id_csv_file.exists():
with open(cosmic_id_csv_file, 'a+') as csvFile:
writer = csv.writer(csvFile)
writer.writerow(["UNIPROT_ID", "COSMIC_ID"])
uniport_id_list = []
with open("genes/Uniprot_ID_list", "r") as f:
allIds = f.readlines()
for i in allIds:
uniport_id_list.append(i)
id_size = len(uniport_id_list)
request = requests.post(f"https://rest.uniprot.org/idmapping/run", data={"from": "UniProtKB_AC-ID", "to": "Gene_Name", "ids": ",".join(uniport_id_list)})
job_id = request.json()["jobId"]
results = requests.get(f"https://rest.uniprot.org/idmapping/results/{job_id}?size={id_size}")
results_json = results.json()["results"]
cosmic_id_list = []
with open(cosmic_id_csv_file, "a+") as csvFile:
writer = csv.writer(csvFile)
for i in results_json:
UNIPROT_ID = i["from"]
COSMIC_ID = i["to"]
cosmic_id_list.append(COSMIC_ID)
writer.writerow([UNIPROT_ID, COSMIC_ID])
return cosmic_id_list
def mapUniprotIDToCosmicID(UNIPROT_ID):
"""
Function fostatus_codeonverting genes Uniprot ID name to Cosmic ID name by
rest api to uniprot.org.
Saves pair of Uniprot ID & Cosmic ID to automatically creates csv file "cosmic_ids.csv".
Parameter
---------
UNIPROT_ID : str
Uniprot ID gene name
"""
dict_path = os.path.join(WORKING_DIR, 'cosmic_ids.csv')
if not os.path.exists(dict_path):
with open(dict_path, 'a+') as dict_csv:
writer = csv.writer(dict_csv)
writer.writerow(["UNIPROT_ID", "COSMIC_ID"])
with open(dict_path, "a+") as f:
csv_reader = csv.reader(f)
# Iterate over each row in the csv using reader object
#next(f) # Skip the header
reader = csv.reader(f, skipinitialspace=True)
id_dict = dict(reader)
#try:
url_submit_job = "https://rest.uniprot.org/idmapping/run?from=UniProtKB_AC-ID&to=Gene_Name&ids=%s" % (UNIPROT_ID)
response = requests.request("POST", url_submit_job, headers={}, data={})
response = response.json()
job_id = response["jobId"]
def check_status(job_id):
url_fetch_status = "https://rest.uniprot.org/idmapping/status/%s" % (job_id)
response = requests.request("GET", url_fetch_status, headers={}, data={})
return response
response = check_status(job_id)
response_payload = response.json()
while "jobStatus" in response_payload.keys():
sleep (0.5)
print ("Checking ID mapping job status for ", UNIPROT_ID)
response = check_status(job_id)
response_payload = response.json()
if response.status_code == 200:
COSMIC_ID = response_payload["results"][0]["to"]
print ("Mapped UniprotID %s to CosmicID: % s" % (UNIPROT_ID, COSMIC_ID))
writer = csv.writer(f)
writer.writerow([UNIPROT_ID, COSMIC_ID])
return COSMIC_ID
#except:
# print ('Error while contacting Uniprot mapping service')
# return False
def mapCosmicIDToUniprotID(COSMIC_ID):
dict_path = os.path.join(WORKING_DIR, 'cosmic_ids.csv')
if not os.path.exists(dict_path):
with open(dict_path, 'a+') as dict_csv:
writer = csv.writer(dict_csv)
writer.writerow(["COSMIC_ID", "UNIPROT_ID"])
with open(dict_path, "a+") as f:
csv_reader = csv.reader(f)
# Iterate over each row in the csv using reader object
reader = csv.reader(f, skipinitialspace=True)
id_dict = dict(reader)
try:
print("i am here")
url_submit_job = "https://rest.uniprot.org/idmapping/run?from=UniProtKB_AC-ID&to=Gene_Name&ids=%s" % (COSMIC_ID)
response = requests.request("POST", url_submit_job, headers={}, data={})
response = response.json()
job_id = response["jobId"]
def check_status(job_id):
url_fetch_status = "https://rest.uniprot.org/idmapping/status/%s" % (job_id)
response = requests.request("GET", url_fetch_status, headers={}, data={})
return response
response = check_status(job_id)
response_payload = response.json()
while "jobStatus" in response_payload.keys():
sleep (0.5)
print ("Checking ID mapping job status for ", COSMIC_ID)
response = check_status(job_id)
response_payload = response.json()
if response.status_code == 200:
UNIPROT_ID = response_payload["results"][0]["to"]
print ("Mapped Uniprot ID %s to Cosmic ID: % s" % (COSMIC_ID, UNIPROT_ID))
writer = csv.writer(f)
writer.writerow([COSMIC_ID, UNIPROT_ID])
return UNIPROT_ID
except:
print ('Error while contacting Uniprot mapping service')
return False
def getMutationFileName(GENE_NAME, WORKING_DIR):
return os.path.join(WORKING_DIR, GENE_NAME + '_mutations.csv')
def getSequenceFileName(GENE_NAME, WORKING_DIR):
return os.path.join(WORKING_DIR, GENE_NAME + '_sequence.csv')
def getGeneExpressionFileName(GENE_NAME, WORKING_DIR):
return os.path.join(WORKING_DIR, GENE_NAME + '_expressions.csv')
def getTissueExpressionFileName(TISSUE_NAME, WORKING_DIR):
return os.path.join(WORKING_DIR, TISSUE_NAME + '_samples.csv')
def countLinesCSV(filename):
with open(filename) as f:
row_count = sum(1 for line in f)
return row_count
def splitGeneExpressionCSV(GENE_NAME, nprocs, WORKING_DIR):
filename = getGeneExpressionFileName(GENE_NAME, WORKING_DIR)
ave, res = divmod(countLinesCSV(filename), int(nprocs))
print ('Splitting file %s ' % filename)
csv_splitter.split(filehandler=open(filename), output_name_template=GENE_NAME + '_part_%s.csv',
output_path=WORKING_DIR, row_limit=ave)
def checkCosmicEnvironment():
try:
cosmicdb_user = os.environ.get('COSMICDB_USER')
cosmicdb_pass = os.environ.get('COSMICDB_PASS')
return cosmicdb_user, cosmicdb_pass
except:
print ("No environment variables COSMICDB_USER and/or COSMICDB_PASS")
def makeGeneListFromInput(GENE_INPUT):
try:
gene_list = []
# checking if GENE_INPUT is list of multiple genes
with open(GENE_INPUT, 'r') as genes_file:
for gene_name in genes_file:
gene_list.append(gene_name.rstrip())
return gene_list
except:
# if GENE_INPUT is not a list it must be a single gene name
print ("Can't open file with list of genes.")
gene_list = [GENE_INPUT]
return gene_list
# TODO universal method for downloading data from Cosmic
def getDataFromCosmic(GENE_NAME, COSMIC_GENE_ID, URL_TEMPLATE, filename):
try:
# print ('Connecting to CosmicDB')
download_url = ("https://cancer.sanger.ac.uk/cosmic-download/download/index?" +
"table=V92_38_ALLGENES" + "&" + "genename=" + COSMIC_GENE_ID + "&" + "token=" + TOKEN_NUMBER)
number_of_attempts = 10
current_attempt = 0
while current_attempt < number_of_attempts:
current_attempt += 1
print ("Attempt %s/%s" % (current_attempt, number_of_attempts))
print ('Downloading data from CosmicDB')
r = requests.get(download_url)
print ('Cosmic response: %s', (r.status_code))
if r.text != "No data available." and r.status_code == 200:
with open(filename, 'wb') as f:
f.write(r.content)
return filename
elif r.status_code == 401:
# trying again, Cosmic sometimes randomly responds with 401 Unauthorized
print ("Unsuccessful download of data from CosmicDB for gene % s" % GENE_NAME)
time.sleep(3)
else:
print ("No data for gene under such name in CosmicDB: %s" % GENE_NAME)
return False
except:
return False
def saveSequenceToFASTA(GENE_NAME, sequence, WORKING_DIR):
try:
file_path = os.path.join(WORKING_DIR, GENE_NAME + '_mutated.fasta')
print (file_path)
with open(file_path, 'w') as fasta_file:
fasta_file.write(sequence)
print ("Mutated sequence saved to %s" %file_path)
return True
except:
print ("Unsuccessful saving of FASTA file for gene %s" % GENE_NAME)
return False
def applyMutationsToFASTA(mutations, FASTAfile):
# mutations are expected as pandas dataframe output
try:
print ("FASTAfile: ", FASTAfile)
with open(FASTAfile) as csvfile:
reader = csv.reader(csvfile, delimiter='\n')
header = next(reader)
sequence = list(next(reader)[0])
except:
print ("Can't open file with FASTA sequence")
return False
# saving deletions for last
deletions = []
# iterate through mutations and apply it to FASTA sequence
for mutation in mutations[' MUTATION_CDS']:
mutation = mutation.split('.')[1]
if 'dup' in mutation:
print (mutation)
elif 'ins' in mutation:
print (mutation)
elif 'del' in mutation:
deletions.append(mutation.replace('del', ''))
else:
# only covering substitution case here
nucleotide_index = int(mutation.split('>')[0][0:-1]) - 1
nucleotide_before = mutation.split('>')[0][-1].lower()
nucleotide_after = mutation.split('>')[1].lower()
# applying mutation to sequence
if sequence[nucleotide_index] == nucleotide_before:
sequence[nucleotide_index] = nucleotide_after
print ("Mutation applied: %s" % mutation)
else:
print ("No nucleotide match on given index. Expected %s but received %s" \
% (nucleotide_before, sequence[nucleotide_index]))
for mutation in deletions:
try:
range = mutation.split("_")
start = range[0]
finish = range[0] if len(range) == 1 else range[1]
del sequence[int(start):int(finish)+1]
print ("Mutation applied: %s" % mutation)
except:
# mutation format is probably not as expected
pass
return ''.join(sequence)