-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathdiskman.py
executable file
·136 lines (129 loc) · 5.18 KB
/
diskman.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/python
## ref: https://gist.github.com/pudquick/5f1baad30024a898e9f2115ac9b0c631
import objc
from Foundation import NSBundle
# Predefine some opaque types
DASessionRef = objc.createOpaquePointerType('DASessionRef', '^{__DASession=}', None)
DADiskRef = objc.createOpaquePointerType('DADiskRef', '^{__DADisk=}', None)
# Load DiskManagement framework classes
DiskManagment = objc.loadBundle('DiskManagment', globals(), bundle_path='/System/Library/PrivateFrameworks/DiskManagement.framework')
# Load DiskArbitration framework functions
DiskArbitration_bundle = NSBundle.bundleWithIdentifier_('com.apple.DiskArbitration')
functions = [
('DASessionCreate', '@o@'),
('DADiskGetBSDName', '*^{__DADisk=}'),
]
objc.loadBundleFunctions(DiskArbitration_bundle, globals(), functions)
class diskRef(object):
def __init__(self, dObj, controller, rawRef=False):
if rawRef:
self.cf_type = objc.objc_object(c_void_p=dObj.__pointer__)
self.ref_type = dObj
else:
self.cf_type = dObj
self.ref_type = DADiskRef(c_void_p=(dObj.__c_void_p__().value))
self.controller = controller
def __repr__(self):
return self.cf_type.__repr__()
@property
def devname(self):
return self.controller.shared.deviceNodeForDisk_error_(self.ref_type, None)
@property
def volname(self):
return self.controller.shared.volumeNameForDisk_error_(self.ref_type, None)
@property
def type(self):
return self.controller.shared.ioContentOfDisk_error_(self.ref_type, None)
@property
def facts(self):
possible = [x for x in dir(self.controller.shared) if (x.startswith('is') and x.endswith('_error_'))]
return [y for y in sorted([x.split('is',1)[-1].rsplit('_error_',1)[0] for x in possible]) if not '_' in y]
@property
def physical_store(self):
# This APFS and other disk types as well supposedly, looking at the code - like CoreStorage, SoftRAID ....
try:
results = self.controller.shared.physicalDisksForDisk_storageSystemName_error_(self.ref_type, None, None)
if results[0] is not None:
return diskRef(results[0], self.controller)
except:
pass
return None
def Is(self, factname):
if factname not in self.facts:
raise Exception('no such fact, check disk.facts')
selector = getattr(self.controller.shared, 'is' + factname + '_error_', None)
if selector is None:
raise Exception('no such fact, check disk.facts')
return (selector)(self.ref_type,None)
class diskList(object):
# This is dict-list hybrid that allows for slicing as well as lookups by dev name
def __init__(self, disks):
self._disk_list = disks[:]
self._disk_dict = dict()
for i,d in enumerate(disks):
self._disk_dict[d.devname] = d
def __iter__(self):
return iter(self._disk_list)
def __getitem__(self, index):
if isinstance(index, slice):
return self._disk_list.__getitem__(index)
elif isinstance(index, int):
return self._disk_list.__getitem__(index)
elif isinstance(index, diskRef):
# someone passed in a disk, so .. give them back the disk?
return index
return self._disk_dict[index]
def __repr__(self):
return self._disk_list.__repr__()
class diskManager(object):
def setup_managers(self):
# Create a DiskArb session
session = DASessionCreate(None)
# Get the shared disk manager
self.shared = DMManager.sharedManager()
session_p = DASessionRef(c_void_p=(session.__c_void_p__().value))
# connect the DA session
self.shared.setDefaultDASession_(session_p)
self.shared.setLanguage_('English')
# init the CS manager
self.shared_cs = DMCoreStorage.alloc().initWithManager_(self.shared)
def __init__(self):
self.shared = None
self.shared_cs = None
self.setup_managers()
def _formatted_disks(self, disk_list):
all_disks = sorted([diskRef(d, self) for d in disk_list], key=lambda x: x.devname)
return diskList(all_disks)
@property
def topLevelDisks(self):
return self._formatted_disks(self.shared.topLevelDisks())
@property
def disks(self):
return self._formatted_disks(self.shared.disks())
def diskForPath(self, path):
return diskRef(self.shared.diskForPath_error_(path, None), self, rawRef=True)
# Example usage
#
# from diskman import diskManager
# dm = diskManager()
#
# >>> dm.disks
# [<DADisk 0x7fd748434520 [0x7fff7c0b1440]>{id = /dev/disk0}, <DADisk 0x7fd748434550 [0x7fff7c0b1440]>{id = /dev/disk0s1}, ...
#
# >>> dm.diskForPath('/')
# <DADisk 0x7fa041c1a180 [0x7fff7c0b1440]>{id = /dev/disk1}
#
# >>> dm.diskForPath('/').volname
# u'Macintosh HD'
#
# >>> dm.diskForPath('/').devname
# u'/dev/disk1'
#
# >>> dm.disks['/dev/disk1'].facts
# ['AppleDiskImage', 'AppleRAIDDisk', 'AppleRAIDMemberDisk', 'AppleRAIDSetDisk', 'AppleRAIDSpareDisk', 'AppleRAIDUUID', ...
#
# >>> dm.disks['/dev/disk1'].Is('EjectableDisk')
# 0
#
# >>> dm.disks['/dev/disk1'].Is('InternalDisk')
# 1