Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Parametric Sweeps and DataContainer improvements #494

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
55 changes: 35 additions & 20 deletions src/auspex/data_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import numpy as np
import os, os.path
import json
import datetime

AUSPEX_CONTAINER_VERSION = 1.1

class AuspexDataContainer(object):
"""A container for Auspex data. Data is stored as `datasets` which may be of any dimension. These are in turn
Expand All @@ -27,7 +30,7 @@ class AuspexDataContainer(object):
| - DemodulatedData
"""

def __init__(self, base_path, mode='a', open_all=True):
def __init__(self, base_path, mode='a', open_all=True, metadata=None):
"""Initialize the data container.

Args:
Expand All @@ -40,7 +43,8 @@ def __init__(self, base_path, mode='a', open_all=True):
self.open_mmaps = []
self.mode = mode
self._create()

self.metadata = metadata

if open_all:
self.open_all()

Expand All @@ -58,6 +62,22 @@ def _create(self):
os.makedirs(self.base_path, exist_ok=True)
self.groups = {}

@property
def metadata(self):
filename = os.path.join(self.base_path,'meta.json')
assert os.path.exists(filename), "Container metadata does not exist, this must be an old .auspex file that is no longer supported."
with open(filename, 'r') as f:
a = json.load(f)
a['date'] = datetime.datetime.strptime(a['date'], '%Y-%m-%d %H:%M:%S.%f')
return a

@metadata.setter
def metadata(self, value):
filename = os.path.join(self.base_path,'meta.json')
meta = {'version': AUSPEX_CONTAINER_VERSION, 'date': str(datetime.datetime.now()), 'metadata': value}
with open(filename, 'w') as f:
json.dump(meta, f)

def new_group(self, groupname):
"""Add a group to the data container.

Expand Down Expand Up @@ -94,14 +114,7 @@ def _create_meta(self, groupname, datasetname, descriptor):
filename = os.path.join(self.base_path,groupname,datasetname+'_meta.json')
assert not os.path.exists(filename), "Existing dataset metafile found. Did you want to open instead?"
meta = {'shape': tuple(descriptor.dims()), 'dtype': np.dtype(descriptor.dtype).str}
meta['axes'] = {a.name: a.points.tolist() for a in descriptor.axes}
meta['units'] = {a.name: a.unit for a in descriptor.axes}
meta['meta_data'] = {}
for a in descriptor.axes:
if a.metadata is not None:
meta['meta_data'][a.name] = a.metadata
else:
meta['meta_data'][a.name] = None
meta['axes'] = [{'name': a.name, 'unit': a.unit, 'points': a.points.tolist(), 'metadata': a.metadata} for a in descriptor.axes]
meta['filename'] = os.path.join(self.base_path,groupname,datasetname)
with open(filename, 'w') as f:
json.dump(meta, f)
Expand Down Expand Up @@ -132,14 +145,16 @@ def open_all(self):
"""
ret = {}
for groupname in os.listdir(self.base_path):
ret[groupname] = {}
self.groups[groupname] = {}
for datasetname in os.listdir(os.path.join(self.base_path,groupname)):
if datasetname[-4:] == '.dat':
dat = self.open_dataset(groupname, datasetname[:-4])
self.groups[groupname][datasetname[:-4]] = dat
ret[groupname][datasetname[:-4]] = dat
if os.path.isdir(os.path.join(self.base_path,groupname)):
ret[groupname] = {}
self.groups[groupname] = {}
for datasetname in os.listdir(os.path.join(self.base_path,groupname)):
if datasetname[-4:] == '.dat':
dat = self.open_dataset(groupname, datasetname[:-4])
self.groups[groupname][datasetname[:-4]] = dat
ret[groupname][datasetname[:-4]] = dat
return ret

def open_dataset(self, groupname, datasetname):
"""Open a particular dataset stored in this DataContainer.

Expand All @@ -164,8 +179,8 @@ def open_dataset(self, groupname, datasetname):
del mm

desc = DataStreamDescriptor(meta['dtype'])
for name, points in meta['axes'].items():
ax = DataAxis(name, points, unit=meta['units'][name])
ax.metadata = meta['meta_data'][name]
for a in meta['axes'][::-1]:
ax = DataAxis(a['name'], a['points'], unit=a['unit'])
ax.metadata = a['metadata']
desc.add_axis(ax)
return data, desc, self.base_path.replace('.auspex', '')
28 changes: 2 additions & 26 deletions src/auspex/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,33 +350,9 @@ def sweep(self):
else:
self.progressbars[axis].goto(axis.step)

if self.sweeper.is_adaptive():
# Add the new tuples to the stream descriptors
for oc in self.output_connectors.values():
# Obtain the lists of values for any fixed
# DataAxes and append them to them to the sweep_values
# in preperation for finding all combinations.
vals = [a for a in oc.descriptor.data_axis_values()]
if sweep_values:
vals = [[v] for v in sweep_values] + vals
# Find all coordinate tuples and update the list of
# tuples that the experiment has probed.
nested_list = list(itertools.product(*vals))
flattened_list = [tuple((val for sublist in line for val in sublist)) for line in nested_list]
oc.descriptor.visited_tuples = oc.descriptor.visited_tuples + flattened_list

# Since the filters are in separate processes, pass them the same
# information so that they may perform the same operations.
oc.push_event("new_tuples", (axis_names, sweep_values,))

# Run the procedure
self.run()

# See if the axes want to extend themselves. They will push updates
# directly to the output_connecters as messages that will be passed
# through the filter pipeline.
self.sweeper.check_for_refinement(self.output_connectors)

# Finish up, checking to see whether we've received all of our data
if self.sweeper.done():
self.declare_done()
Expand Down Expand Up @@ -670,8 +646,8 @@ def add_axis(self, axis, position=0):
logger.debug("Adding axis %s to connector %s.", axis, oc.name)
oc.descriptor.add_axis(axis, position=position)

def add_sweep(self, parameters, sweep_list, refine_func=None, callback_func=None, metadata=None):
ax = SweepAxis(parameters, sweep_list, refine_func=refine_func, callback_func=callback_func, metadata=metadata)
def add_sweep(self, parameters, sweep_list, callback_func=None, metadata=None):
ax = SweepAxis(parameters, sweep_list, callback_func=callback_func, metadata=metadata)
ax.experiment = self
self.sweeper.add_sweep(ax)
self.add_axis(ax)
Expand Down
28 changes: 0 additions & 28 deletions src/auspex/filters/average.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,6 @@ def update_descriptors(self):
self.source.descriptor = descriptor
self.excited_counts = np.zeros(self.data_dims, dtype=np.int64)

# We can update the visited_tuples upfront if none
# of the sweeps are adaptive...
desc_out_dtype = descriptor_in.axis_data_type(with_metadata=True, excluding_axis=self.axis.value)
if not descriptor_in.is_adaptive():
vals = [a.points_with_metadata() for a in descriptor_in.axes if a.name != self.axis.value]
nested_list = list(itertools.product(*vals))
flattened_list = [tuple((val for sublist in line for val in sublist)) for line in nested_list]
descriptor.visited_tuples = np.core.records.fromrecords(flattened_list, dtype=desc_out_dtype)
else:
descriptor.visited_tuples = np.empty((0), dtype=desc_out_dtype)

for stream in self.partial_average.output_streams:
stream.set_descriptor(descriptor)
stream.descriptor.buffer_mult_factor = 20
Expand Down Expand Up @@ -171,11 +160,6 @@ def update_descriptors(self):
descriptor_count.metadata["num_counts"] = self.num_averages
self.final_counts.descriptor = descriptor_count

if not descriptor_in.is_adaptive():
descriptor_var.visited_tuples = np.core.records.fromrecords(flattened_list, dtype=desc_out_dtype)
else:
descriptor_var.visited_tuples = np.empty((0), dtype=desc_out_dtype)

for stream in self.final_variance.output_streams:
stream.set_descriptor(descriptor_var)
stream.end_connector.update_descriptors()
Expand Down Expand Up @@ -233,18 +217,6 @@ def process_data(self, data):
excited_states = (np.real(reshaped) > self.threshold.value).sum(axis=self.mean_axis)
ground_states = self.num_averages - excited_states

if self.sink.descriptor.is_adaptive():
new_tuples = self.sink.descriptor.tuples()[self.idx_global:self.idx_global + new_points]
new_tuples_stripped = remove_fields(new_tuples, self.axis.value)
take_axis = -1 if self.axis_num > 0 else 0
reduced_tuples = new_tuples_stripped.reshape(self.reshape_dims).take((0,), axis=take_axis)
self.idx_global += new_points

# Add to Visited tuples
if self.sink.descriptor.is_adaptive():
for os in self.source.output_streams + self.final_variance.output_streams + self.partial_average.output_streams:
os.descriptor.visited_tuples = np.append(os.descriptor.visited_tuples, reduced_tuples)

for os in self.source.output_streams:
os.push(averaged)

Expand Down
2 changes: 0 additions & 2 deletions src/auspex/filters/elementwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ def main(self):
if message_type == 'event':
if message['event_type'] == 'done':
streams_done[stream] = True
elif message['event_type'] == 'refine':
logger.warning("ElementwiseFilter doesn't handle refinement yet!")
elif message_type == 'data':
# Add any old data...
message_data = stream.pop()
Expand Down
2 changes: 1 addition & 1 deletion src/auspex/filters/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def main(self):
for message in messages:
message_type = message['type']
if message['type'] == 'event':
logger.debug('%s "%s" received event with type "%s"', self.__class__.__name__, message_type)
logger.debug('%s received event with type "%s"', self.__class__.__name__, message_type)

# Check to see if we're done
if message['event_type'] == 'done':
Expand Down
2 changes: 1 addition & 1 deletion src/auspex/filters/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import os
import numpy as np
from scipy.signal import chebwin, blackman, slepian, convolve
from scipy.signal import chebwin, blackman, convolve

from .filter import Filter
from auspex.parameter import Parameter, FloatParameter, IntParameter, BoolParameter
Expand Down
5 changes: 3 additions & 2 deletions src/auspex/filters/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class WriteToFile(Filter):
groupname = Parameter(default='main')
datasetname = Parameter(default='data')

def __init__(self, filename=None, groupname=None, datasetname=None, **kwargs):
def __init__(self, filename=None, groupname=None, datasetname=None, metadata=None, **kwargs):
super(WriteToFile, self).__init__(**kwargs)
if filename:
self.filename.value = filename
Expand All @@ -59,6 +59,7 @@ def __init__(self, filename=None, groupname=None, datasetname=None, **kwargs):
if datasetname:
self.datasetname.value = datasetname

self.metadata = metadata
self.ret_queue = None # MP queue For returning data

def final_init(self):
Expand All @@ -67,7 +68,7 @@ def final_init(self):
assert self.datasetname.value, "Dataset name never supplied to writer."

self.descriptor = self.sink.input_streams[0].descriptor
self.container = AuspexDataContainer(self.filename.value)
self.container = AuspexDataContainer(self.filename.value, metadata=self.metadata)
self.group = self.container.new_group(self.groupname.value)
self.mmap = self.container.new_dataset(self.groupname.value, self.datasetname.value, self.descriptor)

Expand Down
27 changes: 16 additions & 11 deletions src/auspex/instruments/X6.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(self, resource_name=None, name="Unlabeled X6", gen_fake_data=False)
self.last_timestamp = Value('d', datetime.datetime.now().timestamp())

self.gen_fake_data = gen_fake_data
self.fake_data_random_mag = 0.1
self.increment_ideal_data = False
self.ideal_counter = 0
self.ideal_data = None
Expand Down Expand Up @@ -233,7 +234,7 @@ def add_channel(self, channel):
# todo: other checking here
self._channels.append(channel)

def spew_fake_data(self, counter, ideal_data, random_mag=0.1, random_seed=12345):
def spew_fake_data(self, counter, ideal_data, random_seed=12345):
"""
Generate fake data on the stream. For unittest usage.
ideal_data: array or list giving means of the expected signal for each segment
Expand All @@ -242,8 +243,9 @@ def spew_fake_data(self, counter, ideal_data, random_mag=0.1, random_seed=12345)
keep track of how many we expect to receive, when we're doing
the test with fake data
"""
# logger.info(f"In Spew: got {ideal_data}")
random_mag = self.fake_data_random_mag
total = 0
# import ipdb; ipdb.set_trace();
segs = self._lib.nbr_segments
for chan, wsock in self._chan_to_wsocket.items():
if chan.stream_type == "integrated":
Expand All @@ -253,24 +255,21 @@ def spew_fake_data(self, counter, ideal_data, random_mag=0.1, random_seed=12345)
else: #Raw
length = int(self._lib.record_length/4)
buff = np.zeros((segs, length), dtype=chan.dtype)
# for chan, wsock in self._chan_to_wsocket.items():
for i in range(segs):
if chan.stream_type == "integrated":
# random_mag*(np.random.random(length).astype(chan.dtype) + 1j*np.random.random(length).astype(chan.dtype)) +
buff[i,:] = ideal_data[i]
elif chan.stream_type == "demodulated":
buff[i, int(length/4):int(3*length/4)] = 1.0 if ideal_data[i] == 0 else ideal_data[i]
else: #Raw
signal = np.sin(np.linspace(0,10.0*np.pi,int(length/2)))
buff[i, int(length/4):int(length/4)+len(signal)] = signal * (1.0 if ideal_data[i] == 0 else ideal_data[i])
# import ipdb; ipdb.set_trace();
if chan.stream_type == "raw":
buff += random_mag*np.random.random((segs, length))
else:
buff = buff.astype(np.complex128) + random_mag*np.random.random((segs, length))+ 1j*random_mag*np.random.random((segs, length))

total += length*segs
# logger.info(f"In Spew: {buff.dtype} {chan.dtype} {buff.size}")
# logger.info(f"In Spew: {counter}: {buff}")
wsock.send(struct.pack('n', segs*length*buff.dtype.itemsize) + buff.flatten().tostring())
counter[chan] += length*segs

Expand Down Expand Up @@ -348,30 +347,36 @@ def wait_for_acquisition(self, dig_run, timeout=15, ocs=None, progressbars=None)
if hasattr(self, 'exp_step') and self.increment_ideal_data:
raise Exception("Cannot use both exp_step and increment_ideal_data")
elif hasattr(self, 'exp_step'):
if self.exp_step >= len(self.ideal_data):
# logger.info("Exp Step longer than ideal data... keeping at last element")
self.exp_step = len(self.ideal_data)-1
total_spewed += self.spew_fake_data(counter, self.ideal_data[self.exp_step])
elif self.increment_ideal_data:
if self.ideal_counter >= len(self.ideal_data):
# logger.info("Ideal data counter longer than ideal data... keeping at last element")
self.ideal_counter = len(self.ideal_data)-1
total_spewed += self.spew_fake_data(counter, self.ideal_data[self.ideal_counter])
else:
total_spewed += self.spew_fake_data(counter, self.ideal_data)
else:
total_spewed += self.spew_fake_data(counter, [0.0 for i in range(self.number_segments)])
# logger.info(f"Spewed {total_spewed}")
logger.debug("Spewed %d", total_spewed)
time.sleep(0.0001)

self.ideal_counter += 1
# logger.info("Counter: %s", str(counter))
# logger.info('TOTAL fake data generated %d', total_spewed)
logger.debug("Counter: %s", str(counter))
logger.debug('TOTAL fake data generated %d', total_spewed)
if ocs:
while True:
total_taken = 0
for oc in ocs:
total_taken += oc.points_taken.value - initial_points[oc]
if progressbars:
progress_updaters[oc](ocs[0].points_taken.value)
# logger.info('TOTAL fake data received %d', oc.points_taken.value - initial_points[oc])
logger.debug('TOTAL fake data received %d', oc.points_taken.value - initial_points[oc])
if total_taken == total_spewed:
break
# logger.info('WAITING for acquisition to finish %d < %d', total_taken, total_spewed)
logger.debug('WAITING for acquisition to finish %d < %d', total_taken, total_spewed)
time.sleep(0.025)
for oc in ocs:
if progressbars:
Expand Down
Loading