-
Notifications
You must be signed in to change notification settings - Fork 15
/
prepareFEP.py
334 lines (244 loc) · 7.15 KB
/
prepareFEP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
#!/usr/bin/env python
# coding: utf-8
# Author: Julien Michel
#
# email: [email protected]
# # PrepareFEP
# Loads a pair of input files, perform mapping between the first molecule of each input. Write down input files for a SOMD FEP calculation.
# In[ ]:
import os
import zipfile
import BioSimSpace as BSS
from sire.legacy.Mol import AtomIdx
# In[ ]:
def writeLog(ligA, ligB, mapping):
"""Human readable report on atoms used for the mapping."""
atoms_in_A = list(mapping.keys())
stream = open("somd.mapping", "w")
atAdone = []
atBdone = []
for atAidx in atoms_in_A:
atA = ligA._sire_object.select(AtomIdx(atAidx))
atB = ligB._sire_object.select(AtomIdx(mapping[atAidx]))
stream.write(
"%s %s --> %s %s\n" % (atA.index(), atA.name(), atB.index(), atB.name())
)
atAdone.append(atA)
atBdone.append(atB)
for atom in ligA._sire_object.atoms():
if atom in atAdone:
continue
stream.write("%s %s --> dummy\n" % (atom.index(), atom.name()))
for atom in ligB._sire_object.atoms():
if atom in atBdone:
continue
stream.write("dummy --> %s %s\n" % (atom.index(), atom.name()))
stream.close()
# In[ ]:
def loadMapping(mapping_file):
"""Parse a text file that specifies mappings between atomic indices in input1 --> atoms in input2"""
stream = open(mapping_file, "r")
buffer = stream.readlines()
stream.close()
mapping = {}
for line in buffer:
if line.startswith("#"):
continue
elems = line.split(",")
idx1 = int(elems[0])
idx2 = int(elems[1])
mapping[idx1] = idx2
return mapping
# In[ ]:
node = BSS.Gateway.Node(
"A node to generate input files for a SOMD relative free energy calculation."
)
# In[ ]:
node.addAuthor(
name="Julien Michel",
email="[email protected]",
affiliation="University of Edinburgh",
)
node.addAuthor(
name="Lester Hedges",
email="[email protected]",
affiliation="University of Bristol",
)
node.setLicense("GPLv3")
# In[ ]:
node.addInput("input1", BSS.Gateway.FileSet(help="A topology and coordinates file"))
node.addInput("input2", BSS.Gateway.FileSet(help="A topology and coordinates file"))
node.addInput(
"prematch",
BSS.Gateway.String(
help="list of atom indices that are matched between input2 and input1. Syntax is of the format 1-3,4-8,9-11... Ignored if a mapping is provided",
default="",
),
)
node.addInput(
"mapping",
BSS.Gateway.File(
help="csv file that contains atom indices in input1 mapped ot atom indices in input2",
optional=True,
),
)
node.addInput(
"timeout",
BSS.Gateway.Time(
help="The timeout for the maximum common substructure search",
default=10 * BSS.Units.Time.second,
),
)
node.addInput(
"allow_ring_breaking",
BSS.Gateway.Boolean(
help="Whether to allow opening/closing of rings during merge", default=False
),
)
node.addInput(
"allow_ring_size_change",
BSS.Gateway.Boolean(
help="Whether to allow ring size changes during merge", default=False
),
)
node.addInput(
"output",
BSS.Gateway.String(
help="The root name for the files describing the perturbation input1->input2."
),
)
node.addInput(
"somd2",
BSS.Gateway.Boolean(
help="Whether to generate input for SOMD2.",
default=False,
),
)
# In[ ]:
node.addOutput(
"nodeoutput",
BSS.Gateway.FileSet(help="SOMD input files for a perturbation of input1->input2."),
)
# In[ ]:
node.showControls()
# In[ ]:
do_mapping = True
custom_mapping = node.getInput("mapping")
# print (custom_mapping)
if custom_mapping is not None:
do_mapping = False
mapping = loadMapping(custom_mapping)
# print (mapping)
# In[ ]:
# Optional input, dictionary of Atom indices that should be matched in the search.
prematch = {}
prematchstring = node.getInput("prematch")
if len(prematchstring) > 0:
entries = prematchstring.split(",")
for entry in entries:
idxA, idxB = entry.split("-")
prematch[int(idxA)] = int(idxB)
# print (prematch)
# In[ ]:
# Load system 1
system1 = BSS.IO.readMolecules(node.getInput("input1"))
# In[ ]:
# Load system 2
system2 = BSS.IO.readMolecules(node.getInput("input2"))
# In[ ]:
# We assume the molecules to perturb are the first molecules in each system
lig1 = system1[0]
lig2 = system2[0]
# In[ ]:
if do_mapping:
# Return a maximum of 10 matches, scored by RMSD and sorted from best to worst.
mappings, scores = BSS.Align.matchAtoms(
lig1,
lig2,
matches=10,
prematch=prematch,
return_scores=True,
scoring_function="RMSDalign",
timeout=node.getInput("timeout"),
)
# We retain the top mapping
mapping = mappings[0]
# print (len(mappings))
# print (mappings)
# In[ ]:
# print (mapping)
# for x in range(0,len(mappings)):
# print (mappings[x], scores[x])
# In[ ]:
inverted_mapping = dict([[v, k] for k, v in mapping.items()])
# print (inverted_mapping)
# In[ ]:
# Align lig2 to lig1 based on the best mapping (inverted). The molecule is aligned based
# on a root mean squared displacement fit to find the optimal translation vector
# (as opposed to merely taking the difference of centroids).
lig2 = BSS.Align.rmsdAlign(lig2, lig1, inverted_mapping)
# Merge the two ligands based on the mapping.
merged = BSS.Align.merge(
lig1,
lig2,
mapping,
allow_ring_breaking=node.getInput("allow_ring_breaking"),
allow_ring_size_change=node.getInput("allow_ring_size_change"),
)
# Create a composite system
system1.removeMolecules(lig1)
system1.addMolecules(merged)
# In[ ]:
# Log the mapping used
writeLog(lig1, lig2, mapping)
# Are we saving output for SOMD2?
is_somd2 = node.getInput("somd2")
# File root for all output.
root = node.getInput("output")
BSS.IO.saveMolecules(
"merged_at_lam0.pdb",
merged,
"PDB",
{"coordinates": "coordinates0", "bond": "bond0", "element": "element0"},
)
if is_somd2:
BSS.Stream.save(system1, root)
stream_file = "%s.bss" % root
else:
# Generate package specific input
protocol = BSS.Protocol.FreeEnergy(
runtime=2 * BSS.Units.Time.femtosecond, num_lam=3
)
process = BSS.Process.Somd(system1, protocol)
process.getOutput()
with zipfile.ZipFile("somd_output.zip", "r") as zip_hnd:
zip_hnd.extractall(".")
# In[ ]:
if not is_somd2:
mergedpdb = "%s.mergeat0.pdb" % root
pert = "%s.pert" % root
prm7 = "%s.prm7" % root
rst7 = "%s.rst7" % root
mapping_str = "%s.mapping" % root
# In[ ]:
if not is_somd2:
os.replace("merged_at_lam0.pdb", mergedpdb)
os.replace("somd.pert", pert)
os.replace("somd.prm7", prm7)
os.replace("somd.rst7", rst7)
os.replace("somd.mapping", mapping_str)
try:
os.remove("somd_output.zip")
os.remove("somd.cfg")
os.remove("somd.err")
os.remove("somd.out")
except Exception:
pass
# In[ ]:
if is_somd2:
output = [stream_file]
else:
output = [mergedpdb, pert, prm7, rst7, mapping_str]
node.setOutput("nodeoutput", output)
# In[ ]:
node.validate()