-
Notifications
You must be signed in to change notification settings - Fork 140
/
Copy pathdatabase.py
820 lines (699 loc) · 30.7 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
"""
Contains a singleton object `db` of type `Database` which allows easy access to surface files, anatomical images, and transforms that are stored in the pycortex filestore.
"""
import copy
import functools
import glob
import json
import os
import re
import shutil
import tempfile
import warnings
from builtins import input
from hashlib import sha1
import numpy as np
from . import options
default_filestore = options.config.get('basic', 'filestore')
def _memo(fn):
@functools.wraps(fn)
def memofn(self, *args, **kwargs):
if not hasattr(self, "_memocache"):
setattr(self, "_memocache", dict())
#h = sha1(str((id(fn), args, kwargs))).hexdigest()
h = str((id(fn), args, kwargs))
if h not in self._memocache:
self._memocache[h] = fn(self, *args, **kwargs)
return copy.deepcopy(self._memocache[h])
return memofn
class SubjectDB(object):
def __init__(self, subj, filestore=default_filestore):
self.subject = subj
self._warning = None
self._transforms = None
self._surfaces = None
self.filestore = filestore
try:
with open(os.path.join(filestore, subj, "warning.txt")) as fp:
self._warning = fp.read()
except IOError:
pass
@property
def transforms(self):
if self._transforms is not None:
return self._transforms
self._transforms = XfmDB(self.subject, filestore=self.filestore)
return self._transforms
@property
def surfaces(self):
if self._surfaces is not None:
return self._surfaces
self._surfaces = SurfaceDB(self.subject, filestore=self.filestore)
return self._surfaces
class SurfaceDB(object):
def __init__(self, subj, filestore=default_filestore):
self.subject = subj
self.types = {}
db = Database(filestore)
for name in db.get_paths(subj)['surfs'].keys():
self.types[name] = Surf(subj, name, filestore=filestore)
def __repr__(self):
return "Surfaces: [{surfs}]".format(surfs=', '.join(list(self.types.keys())))
def __dir__(self):
return list(self.types.keys())
def __getattr__(self, attr):
if attr in self.types:
return self.types[attr]
raise AttributeError(attr)
class Surf(object):
def __init__(self, subject, surftype, filestore=default_filestore):
self.subject, self.surftype = subject, surftype
self.db = Database(filestore)
def get(self, hemisphere="both"):
return self.db.get_surf(self.subject, self.surftype, hemisphere)
def show(self, hemisphere="both"):
from mayavi import mlab
pts, polys = self.db.get_surf(self.subject, self.surftype, hemisphere, merge=True, nudge=True)
return mlab.triangular_mesh(pts[:,0], pts[:,1], pts[:,2], polys)
class XfmDB(object):
def __init__(self, subj, filestore=default_filestore):
self.subject = subj
self.filestore = filestore
self.xfms = Database(self.filestore).get_paths(subj)['xfms']
def __getitem__(self, name):
if name in self.xfms:
return XfmSet(self.subject, name, filestore=self.filestore)
raise AttributeError
def __repr__(self):
xfms = "\n".join(sorted(self.xfms))
return f"Available transforms for {self.subject}:\n{xfms}"
class XfmSet(object):
def __init__(self, subj, name, filestore=default_filestore):
self.subject = subj
self.name = name
jspath = os.path.join(filestore, subj, 'transforms', name, 'matrices.xfm')
with open(jspath) as fp:
self._jsdat = json.load(fp)
self.masks = MaskSet(subj, name, filestore=filestore)
self.db = Database(filestore)
def __getattr__(self, attr):
if attr in self._jsdat:
return self.db.get_xfm(self.subject, self.name, attr)
raise AttributeError
def __repr__(self):
return "Types: {types}".format(types=", ".join(self._jsdat.keys()))
class MaskSet(object):
def __init__(self, subj, name, filestore=default_filestore):
self.subject = subj
self.xfmname = name
maskform = Database(filestore).get_paths(subj)['masks']
maskpath = maskform.format(xfmname=name, type='*')
self._masks = dict((os.path.split(path)[1][5:-7], path) for path in glob.glob(maskpath))
def __getitem__(self, item):
import nibabel
return nibabel.load(self._masks[item]).get_fdata().T
def __repr__(self):
return "Masks: [{types}]".format(types=', '.join(self._masks.keys()))
class Database(object):
"""
Database()
Surface database
Attributes
----------
This database object dynamically generates handles to all subjects within the filestore.
"""
def __init__(self, filestore=default_filestore):
self.filestore = filestore
self._subjects = None
self.auxfile = None
def __repr__(self):
subjs = "\n ".join(sorted(self.subjects.keys()))
return """Pycortex database\n Subjects:\n {subjs}""".format(subjs=subjs)
def __getattr__(self, attr):
if attr in self.subjects:
if self.subjects[attr]._warning is not None:
warnings.warn(self.subjects[attr]._warning)
return self.subjects[attr]
else:
raise AttributeError
def __dir__(self):
return ["save_xfm","get_xfm", "get_surf", "get_anat", "get_surfinfo", "subjects", # "get_paths", # Add?
"get_mask", "get_overlay","get_cache", "get_view", "save_view", "get_mnixfm",
'get_mri_surf2surf_matrix'] + list(self.subjects.keys())
@property
def subjects(self):
if self._subjects is not None:
return self._subjects
subjs = os.listdir(os.path.join(self.filestore))
subjs = [s for s in subjs if os.path.isdir(os.path.join(self.filestore, s))]
subjs = sorted(subjs)
self._subjects = dict([(sname, SubjectDB(sname, filestore=self.filestore)) for sname in subjs])
return self._subjects
def reload_subjects(self):
"""Force the reload of the subject dictionary."""
self._subjects = None
self.subjects
def get_anat(self, subject, type='raw', xfmname=None, recache=False, order=1, **kwargs):
"""Return anatomical information from the filestore. Anatomical information is defined as
any volume-space anatomical information pertaining to the subject, such as T1 image,
white matter masks, etc. Volumes not found in the database will be automatically generated.
Parameters
----------
subject : str
Name of the subject
type : str
Type of anatomical volume to return. This should be the name of one of the
recache : bool
Regenerate the information
Returns
-------
volume : nibabel object
Volume containing
"""
opts = ""
if len(kwargs) > 0:
opts = "[%s]"%','.join(["%s=%s"%i for i in kwargs.items()])
anatform = self.get_paths(subject)['anats']
anatfile = anatform.format(type=type, opts=opts, ext="nii.gz")
if not os.path.exists(anatfile) or recache:
print("Generating %s anatomical..."%type)
from . import anat
getattr(anat, type)(anatfile, subject, **kwargs)
import nibabel
anatnib = nibabel.load(anatfile)
if xfmname is None:
return anatnib
from . import volume
return volume.anat2epispace(anatnib.get_fdata().T.astype(float), subject, xfmname, order=order)
def get_surfinfo(self, subject, type="curvature", recache=False, **kwargs):
"""Return auxiliary surface information from the filestore. Surface info is defined as
anatomical information specific to a subject in surface space. A Vertex class will be returned
as necessary. Info not found in the filestore will be automatically generated.
See documentation in cortex.surfinfo for auto-generation code
Parameters
----------
subject: str
Subject name for which to return info
type: str
Type of surface info returned, IE. curvature, distortion, sulcaldepth, etc.
recache: bool
Regenerate the information
Returns
-------
verts : Vertex class
If the surface information has "left" and "right" entries, a Vertex class is returned
- OR -
npz : npzfile
Otherwise, an npz object is returned. Remember to close it!
"""
opts = ""
if len(kwargs) > 0:
opts = "[%s]"%','.join(["%s=%s"%i for i in kwargs.items()])
try:
self.auxfile.get_surf(subject, "fiducial")
surfifile = os.path.join(self.get_cache(subject),"%s%s.npz"%(type, opts))
except (AttributeError, IOError):
surfiform = self.get_paths(subject)['surfinfo']
surfifile = surfiform.format(type=type, opts=opts)
if not os.path.exists(os.path.join(self.filestore, subject, "surface-info")):
os.makedirs(os.path.join(self.filestore, subject, "surface-info"))
if not os.path.exists(surfifile) or recache:
print ("Generating %s surface info..."%type)
from . import surfinfo
getattr(surfinfo, type)(surfifile, subject, **kwargs)
npz = np.load(surfifile)
if "left" in npz and "right" in npz:
from .dataset import Vertex
verts = np.hstack([npz['left'], npz['right']])
npz.close()
return Vertex(verts, subject)
return npz
def get_mri_surf2surf_matrix(self, subject, surface_type, hemi='both',
fs_subj=None, target_subj='fsaverage',
**kwargs):
"""Get matrix generated by surf2surf to map one subject's surface to another's
Parameters
----------
subject : pycortex
pycortex subject ID
surface_type : str
type of surface; one of ['fiducial', 'inflated', 'white',
'pial'] ... more?
hemi : str or list
one of: 'both', 'lh', 'rh'
fs_subj : str
string ID for freesurfer subject (if different from
pycortex subject ID; None assumes they are the same)
target_subj : str
string ID for freesurfer subject to which to map surface
from `subject` (or `fs_subj`)
Other Parameters
----------------
subjects_dir : str
Custom path to freesurfer subjects dir can be specified in the
keyword args
n_neighbors : int
...
random_state : scalar
...
n_test_images : int
...
coef_threshold : scalar or None
...
renormalize : bool
...
"""
from .freesurfer import get_mri_surf2surf_matrix as mri_s2s
from .utils import load_sparse_array, save_sparse_array
if fs_subj is None:
fs_subj = subject
fpath = self.get_paths(subject)['surf2surf'].format(
source=fs_subj,
target=target_subj,
surface_type=surface_type
)
# Backward compatibility
fdir, _ = os.path.split(fpath)
if not os.path.exists(fdir):
print("Creating surf2surf directory for subject %s"%(subject))
os.makedirs(fdir)
if hemi == 'both':
hemis = ['lh', 'rh']
else:
hemis = [hemi]
if os.path.exists(fpath):
mats = [load_sparse_array(fpath, h) for h in hemis]
else:
mats = []
for h in hemis:
tmp = mri_s2s(fs_subj, h, surface_type,
target_subj=target_subj, **kwargs)
mats.append(tmp)
save_sparse_array(fpath, tmp, h, mode='a')
return mats
def get_overlay(self, subject, overlay_file=None, **kwargs):
from . import svgoverlay
pts, polys = self.get_surf(subject, "flat", merge=True, nudge=True)
paths = self.get_paths(subject)
# NOTE: This try loop is broken, in that it does nothing for the intended
# use case (loading an overlay from a packed subject) - needs fixing.
# This hasn't come up yet due to very rare use of packed subjects.
if self.auxfile is not None:
try:
# Prob should be something like:
#self.auxfile.get_overlay(subject, **kwargs)
tf = self.auxfile.get_overlay(subject) # kwargs??
svgfile = tf.name
except (AttributeError, IOError):
# I think the rest of the code should be here (as in other functions...)
pass
if 'pts' in kwargs:
pts = kwargs['pts']
del kwargs['pts']
# ... and `svgfile` variable should be used here...
if os.path.exists(paths['rois']) and not os.path.exists(paths['overlays']):
svgoverlay.import_roi(paths['rois'], paths['overlays'])
if overlay_file is None:
overlay_file = paths['overlays']
return svgoverlay.get_overlay(subject, overlay_file, pts, polys, **kwargs)
def save_xfm(self, subject, name, xfm, xfmtype="magnet", reference=None):
"""
Load a transform into the surface database. If the transform exists already, update it
If it does not exist, copy the reference epi into the filestore and insert.
Parameters
----------
subject : str
Name of the subject
name : str
Name to identify the transform
xfm : (4,4) array
The affine transformation matrix
xfmtype : str, optional
Type of the provided transform, either magnet space or coord space.
Defaults to 'magnet'.
reference : str, optional
The nibabel-compatible reference image associated with this transform.
Required if name not in database
"""
if xfmtype not in ["magnet", "coord"]:
raise TypeError("Unknown transform type")
import nibabel
path = os.path.join(self.filestore, subject, "transforms", name)
fname = os.path.join(path, "matrices.xfm")
if os.path.exists(fname):
with open(fname) as fp:
jsdict = json.load(fp)
else:
os.mkdir(path)
if reference is None:
raise ValueError("Please specify a reference")
fpath = os.path.join(path, "reference.nii.gz")
nib = nibabel.load(reference)
data = nib.get_fdata()
if len(data.shape) > 3:
import warnings
warnings.warn('You are importing a 4D dataset, automatically selecting the first volume as reference')
data = data[...,0]
out = nibabel.Nifti1Image(data, nib.affine, header=nib.header)
nibabel.save(out, fpath)
jsdict = dict()
nib = nibabel.load(os.path.join(path, "reference.nii.gz"))
if xfmtype == "magnet":
jsdict['magnet'] = np.array(xfm).tolist()
jsdict['coord'] = np.dot(np.linalg.inv(nib.affine), xfm).tolist()
elif xfmtype == "coord":
jsdict['coord'] = np.array(xfm).tolist()
jsdict['magnet'] = np.dot(nib.affine, xfm).tolist()
files = self.get_paths(subject)
if len(glob.glob(files['masks'].format(xfmname=name, type="*"))) > 0:
raise ValueError('Refusing to change a transform with masks')
with open(fname, "w") as fp:
json.dump(jsdict, fp, sort_keys=True, indent=4)
def get_xfm(self, subject, name, xfmtype="coord"):
"""Retrieves a transform from the filestore
Parameters
----------
subject : str
Name of the subject
name : str
Name of the transform
xfmtype : str, optional
Type of transform to return. Defaults to coord.
"""
from .xfm import Transform
if xfmtype == 'coord':
try:
return self.auxfile.get_xfm(subject, name)
except (AttributeError, IOError):
pass
if name == "identity":
nib = self.get_anat(subject, 'raw')
return Transform(np.linalg.inv(nib.affine), nib)
fname = os.path.join(self.filestore, subject, "transforms", name, "matrices.xfm")
reference = os.path.join(self.filestore, subject, "transforms", name, "reference.nii.gz")
with open(fname) as f:
xfmdict = json.load(f)
return Transform(xfmdict[xfmtype], reference)
@_memo
def get_surf(self, subject, type, hemisphere="both", merge=False, nudge=False):
'''Return the surface pair for the given subject, surface type, and hemisphere.
Parameters
----------
subject : str
Name of the subject
type : str
Type of surface to return, probably in (fiducial, inflated,
veryinflated, hyperinflated, superinflated, flat)
hemisphere : "lh", "rh"
Which hemisphere to return
merge : bool
Vstack the hemispheres, if requesting both
nudge : bool
Nudge the hemispheres apart from each other, for overlapping surfaces
(inflated, etc)
Returns
-------
left, right :
If request is for both hemispheres, otherwise:
pts, polys, norms : ((p,3) array, (f,3) array, (p,3) array or None)
For single hemisphere
'''
try:
return self.auxfile.get_surf(subject, type, hemisphere, merge=merge, nudge=nudge)
except (AttributeError, IOError):
pass
files = self.get_paths(subject)['surfs']
if hemisphere.lower() == "both":
left, right = [ self.get_surf(subject, type, hemisphere=h) for h in ["lh", "rh"]]
if type != "fiducial" and nudge:
left[0][:,0] -= left[0].max(0)[0]
right[0][:,0] -= right[0].min(0)[0]
if merge:
pts = np.vstack([left[0], right[0]])
polys = np.vstack([left[1], right[1]+len(left[0])])
return pts, polys
return left, right
elif hemisphere.lower() in ("lh", "left"):
hemi = "lh"
elif hemisphere.lower() in ("rh", "right"):
hemi = "rh"
else:
raise TypeError("Not a valid hemisphere name")
if type == 'fiducial' and 'fiducial' not in files:
wpts, polys = self.get_surf(subject, 'wm', hemi)
ppts, _ = self.get_surf(subject, 'pia', hemi)
return (wpts + ppts) / 2, polys
try:
from . import formats
fnm = str(os.path.splitext(files[type][hemi])[0])
return formats.read(fnm)
except KeyError:
raise IOError
def save_mask(self, subject, xfmname, type, mask):
fname = self.get_paths(subject)['masks'].format(xfmname=xfmname, type=type)
if os.path.exists(fname):
raise IOError('Refusing to overwrite existing mask')
import nibabel
xfm = self.get_xfm(subject, xfmname)
if xfm.shape != mask.shape:
raise ValueError("Invalid mask shape: must match shape of reference image")
affine = xfm.reference.affine
nib = nibabel.Nifti1Image(mask.astype(np.uint8).T, affine)
nib.to_filename(fname)
def get_mask(self, subject, xfmname, type='thick'):
if hasattr(type, 'decode'):
type = type.decode('utf8')
try:
self.auxfile.get_mask(subject, xfmname, type)
except (AttributeError, IOError):
pass
fname = self.get_paths(subject)['masks'].format(xfmname=xfmname, type=type)
try:
import nibabel
nib = nibabel.load(fname)
return nib.get_fdata().T != 0
except IOError:
print('Mask not found, generating...')
from .utils import get_cortical_mask
mask = get_cortical_mask(subject, xfmname, type)
self.save_mask(subject, xfmname, type, mask)
return mask
def get_shared_voxels(self, subject, xfmname, hemi="both", merge=True, use_astar=True, recache=False):
"""Get an array indicating which vertices are inappropriately mapped to the same voxel.
For a given transform and surface, returns an array containing a list of vertices which
are spatially distant on the cortical surface but that map to the same voxels (this occurs
at sulcal crossings)
"""
# Test for packed subjects
try:
voxels = self.auxfile.get_shared_voxels(subject, xfmname, hemi=hemi, merge=merge, use_astar=use_astar)
return voxels
except (AttributeError, IOError):
pass
# Proceed w/ potential load
shared_voxel_file = os.path.join(self.get_cache(subject), 'shared_vertices_{xfmname}_{hemi}.npy'.format(xfmname=xfmname, hemi=hemi))
if not os.path.exists(shared_voxel_file) or recache:
print('Shared voxel array not found, generating...')
from .utils import get_shared_voxels as _get_shared_voxels
voxels = _get_shared_voxels(subject, xfmname, hemi=hemi, merge=merge, use_astar=use_astar)
np.save(shared_voxel_file, voxels)
return voxels
else:
voxels = np.load(shared_voxel_file)
return voxels
def get_coords(self, subject, xfmname, hemisphere="both", magnet=None):
"""Calculate the coordinates of each vertex in the epi space by transforming the fiducial to the coordinate space
Parameters
----------
subject : str
Name of the subject
xfmname : str
Name of the transform
hemisphere : str, optional
Which hemisphere to return. If "both", return concatenated. Defaults to "both".
"""
import warnings
warnings.warn('Please use a Mapper object instead', DeprecationWarning)
if magnet is None:
xfm = self.get_xfm(subject, xfmname, xfmtype="coord")
else:
xfm = self.get_xfm(subject, xfmname, xfmtype="magnet")
xfm = np.linalg.inv(magnet) * xfm
coords = []
vtkTmp = self.get_surf(subject, "fiducial", hemisphere=hemisphere, nudge=False)
if not isinstance(vtkTmp,(tuple,list)):
vtkTmp = [vtkTmp]
for pts, polys in vtkTmp:
wpts = np.vstack([pts.T, np.ones(len(pts))])
coords.append(np.dot(xfm.xfm, wpts)[:3].round().astype(int).T)
return coords
def get_cache(self, subject):
try:
self.auxfile.get_surf(subject, "fiducial")
#generate the hashed name of the filename and subject as the directory name
import hashlib
hashname = "pycx_%s"%hashlib.md5(self.auxfile.h5.filename).hexdigest()[-8:]
cachedir = os.path.join(tempfile.gettempdir(), hashname, subject)
except (AttributeError, IOError):
try:
# Get cache dir from config file
cachedir = os.path.join(options.config.get('basic', 'cache'),
subject, 'cache')
except options.configparser.NoOptionError:
# If not defined, go with default cache
cachedir = os.path.join(self.filestore, subject, "cache")
cachedir = os.path.expanduser(cachedir)
if not os.path.exists(cachedir):
os.makedirs(cachedir)
return cachedir
def clear_cache(self, subject, clear_all_caches=True):
"""Clears config-specified and default file caches for a subject.
"""
local_cachedir = self.get_cache(subject)
shutil.rmtree(local_cachedir)
os.makedirs(local_cachedir)
# Check on default cache for subject as well, on the off chance
# that other people have cached files here for this subject
default_cachedir = os.path.join(self.filestore, subject, "cache")
if clear_all_caches is True:
# Just delete them.
shutil.rmtree(default_cachedir)
os.makedirs(default_cachedir)
if (len(os.listdir(default_cachedir)) > 0):
# Make sure you didn't want to delete them. You probably should.
proceed = input("Files exist in %s too! Delete them? [y]/n: "%default_cachedir)
if proceed.lower() in ('', 'y'):
shutil.rmtree(default_cachedir)
os.makedirs(default_cachedir)
def get_paths(self, subject):
"""Get a dictionary with a list of all candidate filenames for associated data, such as roi overlays, flatmap caches, and ctm caches.
"""
surfpath = os.path.join(self.filestore, subject, "surfaces")
if self.subjects[subject]._warning is not None:
warnings.warn(self.subjects[subject]._warning)
surfs = dict()
for surf in os.listdir(surfpath):
ssurf = os.path.splitext(surf)[0].split('_')
name = '_'.join(ssurf[:-1])
hemi = ssurf[-1]
if name not in surfs:
surfs[name] = dict()
surfs[name][hemi] = os.path.abspath(os.path.join(surfpath,surf))
viewsdir = os.path.join(self.filestore, subject, "views")
if not os.path.exists(viewsdir):
os.makedirs(viewsdir)
views = os.listdir(viewsdir)
filenames = dict(
surfs=surfs,
xfms=sorted(os.listdir(os.path.join(self.filestore, subject, "transforms"))),
xfmdir=os.path.join(self.filestore, subject, "transforms", "{xfmname}", "matrices.xfm"),
anats=os.path.join(self.filestore, subject, "anatomicals", '{type}{opts}.{ext}'),
surfinfo=os.path.join(self.filestore, subject, "surface-info", '{type}{opts}.npz'),
masks=os.path.join(self.filestore, subject, 'transforms', '{xfmname}', 'mask_{type}.nii.gz'),
rois=os.path.join(self.filestore, subject, "rois.svg").format(subj=subject),
overlays=os.path.join(self.filestore, subject, "overlays.svg").format(subj=subject),
views=sorted([os.path.splitext(f)[0] for f in views]),
surf2surf=os.path.join(self.filestore, subject, "surf2surf", "{source}_to_{target}", "matrices_{surface_type}.hdf"),
)
return filenames
def make_subj(self, subject):
if os.path.exists(os.path.join(self.filestore, subject)):
if input("Are you sure you want to overwrite this existing subject?\n"
"This will delete all files for this subject in the filestore, "
"including all blender cuts. Type YES\n") == "YES":
shutil.rmtree(os.path.join(self.filestore, subject))
else:
raise ValueError('Do not overwrite')
for dirname in ['transforms', 'anatomicals', 'cache', 'surfaces', 'surface-info','views']:
try:
path = os.path.join(self.filestore, subject, dirname)
os.makedirs(path)
except OSError:
print("Error making directory %s"%path)
def save_view(self,vw,subject,name,is_overwrite=False):
"""Set the view for an open webshow instance from a saved view
Sets the view in a currently-open cortex.webshow instance (with handle `vw`)
to the saved view named `name`
Parameters
----------
vw : handle for pycortex webgl viewer
Handle for open webgl session (returned by cortex.webshow)
subject : string
pycortex subject id
name : string
Name of stored view to re-load
Notes
-----
Equivalent to call to vw.save_view(subject,name)
For a list of the view parameters saved, see viewer._capture_view
"""
view = vw._capture_view()
sName = os.path.join(self.filestore, subject, "views", name+'.json')
if os.path.exists(sName):
if not is_overwrite:
raise IOError('Refusing to over-write extant view If you want to do this, set is_overwrite=True!')
with open(sName,'w') as fp:
json.dump(view, fp)
def get_view(self,vw,subject,name):
"""Set the view for an open webshow instance from a saved view
Sets the view in a currently-open cortex.webshow instance (with handle `vw`)
to the saved view named `name`
Parameters
----------
vw : handle for cortex.webshow
Handle for open webshow session (returned by cortex.webshow)
subject : string, subject name
name : string
Name of stored view to re-load
Notes
-----
Equivalent to call to vw.get_view(subject,name)
For a list of the view parameters saved, see viewer._capture_view
"""
sName = os.path.join(self.filestore, subject, "views", name+'.json')
with open(sName) as fp:
view = json.load(fp)
vw._set_view(**view)
def get_mnixfm(self, subject, xfm, template=None):
"""Get transform from the space specified by `xfm` to MNI space.
Parameters
----------
subject : str
Subject identifier
xfm : str
Name of functional space transform. Can be 'identity' for anat space.
template : str or None, optional
Path to MNI template volume. If None, uses default specified in cortex.mni
Returns
-------
mnixfm : numpy.ndarray
Transformation matrix from the space specified by `xfm` to MNI space.
Notes
-----
Equivalent to cortex.mni.compute_mni_transform, but this function also caches
the result (which is nice because computing it can be slow).
See Also
--------
cortex.mni.compute_mni_transform
cortex.mni.transform_to_mni
cortex.mni.transform_mni_to_subject
"""
from . import mni
if template is None:
templatehash = "default"
else:
templatehash = sha1(template).hexdigest()
# Check cache first
mnixfmfile = os.path.join(self.get_cache(subject), "mni_xfm-%s-%s.txt"%(xfm, templatehash))
if os.path.exists(mnixfmfile):
mnixfm = np.loadtxt(mnixfmfile)
else:
# Run the transform
if template is None:
mnixfm = mni.compute_mni_transform(subject, xfm)
else:
mnixfm = mni.compute_mni_transform(subject, xfm, template)
# Cache the result
mni._save_fsl_xfm(mnixfmfile, mnixfm)
return mnixfm
db = Database()