forked from pyIPP/pyddww
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdd.py
229 lines (209 loc) · 9.05 KB
/
dd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import numpy
import ctypes
import os
__libddww__ = ctypes.cdll.LoadLibrary('/afs/ipp-garching.mpg.de/aug/ads/lib64/' + os.environ['SYS'] + '/libddww8.so')
__fields__ = {'version': lambda: numpy.int32(0), 'level': lambda: numpy.int32(0), 'status': lambda: numpy.int32(0), 'error': lambda: numpy.int32(0), 'relations': lambda: numpy.zeros(8, dtype=numpy.int32), 'address': lambda: numpy.int32(0), 'length': lambda: numpy.int32(0), 'objnr': lambda: numpy.int32(0), 'format': lambda: numpy.zeros(3, dtype=numpy.int32), 'dataformat': lambda: numpy.int32(0), 'objtype': lambda: numpy.int32(0), 'text': lambda: 64*b' ', 'size': lambda: numpy.int32(0), 'indices': lambda: numpy.zeros(3, dtype=numpy.int32), 'items': lambda: numpy.int32(0)}
def getError(error):
""" Check if an error/warning occured. """
try:
err = ctypes.c_int32(error)
except TypeError:
err = ctypes.c_int32(error.value)
isError = __libddww__.xxsev_(ctypes.byref(err))==1
isWarning = __libddww__.xxwarn_(ctypes.byref(err))==1
if isError or isWarning:
id = ctypes.c_char_p(b'')
lid = ctypes.c_uint64(0)
text = ctypes.c_char_p(b' '*255)
ltext = ctypes.c_uint64(255)
unit = ctypes.byref(ctypes.c_int32(-1))
ctrl = ctypes.byref(ctypes.c_uint32(3))
__libddww__.xxerrprt_(unit, text, ctypes.byref(err), ctrl, id, ltext, lid);
if isError:
raise Exception(text.value.strip())
else:
raise Warning(text.value.strip())
def getLastAUGShotNumber():
""" Returns the current shotnumber of ASDEX Upgrade """
error = ctypes.c_int32(0)
pulseNumber = ctypes.c_uint32(0)
__libddww__.ddlastshotnr_(ctypes.byref(error), ctypes.byref(pulseNumber))
getError(error)
return numpy.uint32(pulseNumber.value)
def getLastShotNumber(diagnostic, pulseNumber=None, experiment='AUGD'):
""" Returns the highest available shotnumber of the specified diagnostic.
If pulseNumber is specified, the search starts from there. """
if pulseNumber==None:
pulseNumber = getLastAUGShotNumber()
try:
exper = ctypes.c_char_p(experiment)
except TypeError:
exper = ctypes.c_char_p(experiment.encode())
lexper = ctypes.c_uint64(len(experiment))
try:
diag = ctypes.c_char_p(diagnostic)
except TypeError:
diag = ctypes.c_char_p(diagnostic.encode())
ldiag = ctypes.c_uint64(len(diagnostic))
shot = ctypes.c_uint32(pulseNumber)
cshot = ctypes.c_uint32(0)
__libddww__.ddcshotnr_(exper, diag, ctypes.byref(shot), ctypes.byref(cshot), lexper, ldiag)
return numpy.uint32(cshot.value)
def getPhysicalDimension(Unit):
""" Returns human readable physical dimension of Unit. """
try:
physunit = ctypes.c_int32(Unit)
except TypeError:
physunit = ctypes.c_int32(Unit.value)
error = ctypes.c_int32(0)
output = b' '*256
physdim = ctypes.c_char_p(output)
lphysdim = ctypes.c_uint64(256)
__libddww__.dddim_(ctypes.byref(error), ctypes.byref(physunit), physdim, lphysdim)
getError(error)
return output.replace('\x00','').strip()
class signalInfo(object):
""" Class storing the general information of a Signal. """
def __init__(self, name, type, index, timeBase):
""" Constructor initializing the signalInfo object. """
object.__init__(self)
self.name = name
self.type = type
self.index = index
self.timeBase = timeBase
def nDim():
doc = """ Number of dimension of the signal. """
def fget(self):
return numpy.size(filter(lambda i: self.index[i]>1, xrange(self.index.size)))
return locals()
nDim = property(**nDim())
def size():
doc = """ Number of total datapoints in the signal. """
def fget(self):
return self.index[:self.nDim].prod()
return locals()
size = property(**size())
class shotfile(object):
""" Class to load the data from the shotfile. """
def __init__(self, diagnostic=None, pulseNumber=None, experiment='AUGD', edition=0):
""" Basic constructor. If a diagnostic is specified the shotfile is opened. """
self.diaref = ctypes.c_int32(0)
if diagnostic!=None and pulseNumber!=None:
self.open(diagnostic, pulseNumber, experiment, edition)
def __del__(self):
""" Basic destructor closing the shotfile. """
self.close()
def status():
doc = """ Status showing if shotfile is open. """
def fget(self):
return self.diaref.value!=0
return locals()
status = property(**status())
def open(self, diagnostic, pulseNumber, experiment='AUGD', edition=0):
""" Function opening the specified shotfile. """
self.close()
error = ctypes.c_int32(0)
edit = ctypes.byref(ctypes.c_int32(edition))
cshot = ctypes.byref(ctypes.c_uint32(0))
shot = ctypes.byref(ctypes.c_uint32(pulseNumber))
try:
diag = ctypes.c_char_p(diagnostic)
except TypeError:
diag = ctypes.c_char_p(diagnostic.encode())
try:
exper = ctypes.c_char_p(experiment)
except TypeError:
exper = ctypes.c_char_p(experiment.encode())
lexp = ctypes.c_uint64(len(experiment))
ldiag = ctypes.c_uint64(len(diagnostic))
date = ctypes.c_char_p(b'123456789012345678')
ldate = ctypes.c_uint64(18)
result = __libddww__.ddcshotnr_(exper,diag,shot,cshot,lexp,ldiag)
if result==0 and cshot._obj.value==pulseNumber:
result = __libddww__.ddopen_(ctypes.byref(error),exper,diag,shot,edit,ctypes.byref(self.diaref),date,lexp,ldiag,ldate)
getError(error)
def close(self):
""" Close the shotfile. """
if self.status:
error = ctypes.c_int32(0)
result = __libddww__.ddclose_(ctypes.byref(error), ctypes.byref(self.diaref))
getError(error)
self.diaref.value = 0
def getObjectName(self, objectNumber):
""" Return name of object """
if not self.status:
raise Exception('ddww: Shotfile not open!')
error = ctypes.c_int32(0)
lname = ctypes.c_uint64(8)
try:
obj = ctypes.c_int32(objectNumber)
except TypeError:
obj = ctypes.c_int32(objectNumber.value)
name = b' '*8
__libddww__.ddobjname_(ctypes.byref(error), ctypes.byref(self.diaref), ctypes.byref(obj), ctypes.c_char_p(name), lname)
getError(error)
return name.replace('\x00','').strip()
def getObjectNames(self):
""" Return list of all object names in the shotfile. """
if not self.status:
raise Exception('ddww: Shotfile not open!')
output = []
count = 0
while True:
try:
output.append(self.getObjectName(count))
count += 1
except Exception:
return output
def getSignalInfo(self, name):
""" Returns a signalInfo object containing the information of the signal name """
if not self.status:
raise Exception('ddww::shotfile: Shotfile not open!')
error = ctypes.c_int32(0)
lsig = ctypes.c_uint64(len(name))
typ = ctypes.c_int32(0)
tname = b' '*8
ltname = ctypes.c_uint64(len(tname))
ind = numpy.zeros(4, dtype=numpy.uint32)
try:
sigName = ctypes.c_char_p(name)
except TypeError:
sigName = ctypes.c_char_p(name.encode())
try:
tName = ctypes.c_char_p(tname)
except TypeError:
tName = ctypes.c_char_p(tname.encode())
__libddww__.ddsinfo_(ctypes.byref(error), ctypes.byref(self.diaref) , sigName, ctypes.byref(typ), tName, ind.ctypes.data_as(ctypes.c_void_p) , lsig , ltname)
getError(error.value)
return signalInfo(name.replace('\x00', '').strip(), numpy.uint32(typ.value), ind, tname.replace('\x00','').strip())
def getObjectValue(self, name, field):
""" Returns the value for the specified field for the given object name. """
if not self.status:
raise Exception('Shotfile not open')
error = ctypes.c_int32(0)
data = __fields__[field]()
try:
value = ctypes.c_char_p(data)
except TypeError:
try:
value = data.ctypes.data_as(ctypes.c_void_p)
except AttributeError:
del data
val = ctypes.c_int32(0)
value = ctypes.byref(val)
try:
_name = ctypes.c_char_p(name)
except TypeError:
_name = ctypes.c_char_p(name.encode())
try:
_field = ctypes.c_char_p(field)
except TypeError:
_field = ctypes.c_char_p(field.encode())
lname = ctypes.c_uint64(len(name))
lfield = ctypes.c_uint64(len(field))
result = __libddww__.ddobjval_(ctypes.byref(error),ctypes.byref(self.diaref),_name,_field, value,lname,lfield)
getError(error)
try:
return data
except Exception, Error:
return numpy.int32(val.value)