Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IcarusQ alpha driver for 0.2 #1056

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions src/qibolab/_core/instruments/awg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
from abc import abstractmethod
from collections.abc import Iterable

import numpy as np

from qibolab._core.components import AcquisitionChannel, Config, DcChannel, IqChannel
from qibolab._core.execution_parameters import (
AcquisitionType,
AveragingMode,
ExecutionParameters,
)
from qibolab._core.identifier import ChannelId, Result
from qibolab._core.instruments.abstract import Controller
from qibolab._core.pulses import PulseLike, Waveform
from qibolab._core.sequence import PulseSequence
from qibolab._core.sweeper import ParallelSweepers, Parameter


class AWG(Controller):
"""Arbitrary waveform generators (AWGs) are instruments that play an array
of samples and do not possess hardware sweepers.

We implement common methods such as waveform generation and
recursive Python-based software sweepers for ease of driver
implementation into Qibolab.
"""

def generate_waveforms(
self, sequence: PulseSequence, configs: dict[str, Config]
) -> dict[ChannelId, Waveform]:
"""Generates waveform arrays for a given pulse sequence."""

channel_waveforms = {}
new_sequence = sequence.align_to_delays()
for channel in new_sequence.channels:
sequence_duration = new_sequence.channel_duration(channel)
# Optional if channel has its own individual sampling rate
sampling_rate = getattr(
configs[channel], "sampling_rate", self.sampling_rate
)
time_interval = 1 / sampling_rate
num_samples = int(sequence_duration / time_interval)
pulses = new_sequence.channel(channel)

instrument_channel = self.channels[channel]
if isinstance(instrument_channel, DcChannel):
channel_waveforms[channel] = self.generate_flux_waveform(
pulses, sampling_rate, num_samples
)
else:
if isinstance(instrument_channel, AcquisitionChannel):
probe_channel = instrument_channel.probe
lo = self.channels[probe_channel].lo
frequency = configs[probe_channel].frequency
elif isinstance(instrument_channel, IqChannel):
lo = instrument_channel.lo
frequency = configs[channel].frequency
else:
raise ValueError("Channel not supported")
# Downconvert/upconvert as necessary
if lo is not None:
frequency = abs(frequency - configs[lo].frequency)
channel_waveforms[channel] = self.generate_iq_waveform(
pulses, sampling_rate, num_samples, frequency
)

return channel_waveforms

def generate_flux_waveform(
self, pulses: Iterable[PulseLike], sampling_rate: float, num_samples: int
):
"""Generates a waveform for DC flux pulses."""
buffer = np.zeros(num_samples)
stopwatch = 0

for pulse in pulses:

if pulse.kind == "delay":
stopwatch += pulse.duration

else:
i_envelope = pulse.i(sampling_rate)
num_pulse_samples = len(i_envelope)
start = int(stopwatch * sampling_rate)
end = start + num_pulse_samples
buffer[start:end] = i_envelope
stopwatch += pulse.duration

return buffer

def generate_iq_waveform(
self,
pulses: Iterable[PulseLike],
sampling_rate: float,
num_samples: int,
frequency: float,
):
"""Generates an IQ waveform for the given pulses."""
buffer = np.zeros((num_samples, 2))
time_interval = 1 / sampling_rate
stopwatch = 0
vz_phase = 0

for pulse in pulses:
if pulse.kind == "virtualz":
vz_phase += pulse.phase

elif pulse.kind == "delay":
stopwatch += pulse.duration

elif pulse.kind == "acquisition":
pass

else:
if pulse.kind == "readout":
pulse = pulse.probe

i_envelope, q_envelope = pulse.envelopes(sampling_rate)
num_pulse_samples = len(i_envelope)
start = int(stopwatch * sampling_rate)
end = start + num_pulse_samples

time_array = np.arange(start, end) * time_interval
angles = (
2 * np.pi * frequency * time_array + pulse.relative_phase + vz_phase
)
buffer[start:end, 0] = i_envelope * np.cos(angles)
buffer[start:end, 1] = q_envelope * np.sin(angles)
stopwatch += pulse.duration
return buffer

def recursive_sweep(
self,
configs: dict[str, Config],
sequences: list[PulseSequence],
options: ExecutionParameters,
sweepers: list[ParallelSweepers],
) -> dict[int, Result]:
"""Sample implementation of recursive sweeping.

Modify if device/channel implements a certain sweeper type
"""
if options.acquisition_type is AcquisitionType.RAW:
raise RuntimeError("Sweeper with raw acquisition type unsupported")

result_shape = options.results_shape(sweepers)
parallel_sweeper = sweepers.pop(0)

# Ensure that parallel sweepers have equal length
sweeper_length = len(parallel_sweeper[0].values)
for sweeper in parallel_sweeper:
if len(sweeper.values) != sweeper_length:
raise ValueError("Parallel sweepers have unequal length")

outer_results = {}
# Iterate across the sweeper and play on hardware
for idx in range(sweeper_length):
# First adjust the pulse sequence or channels according to the sweeper
for sweeper in parallel_sweeper:
value = sweeper.values[idx]

if sweeper.parameter == Parameter.frequency:
for channel in sweeper.channels:
# This is OK to mutate, as the actual config is cached by the platform object
setattr(configs[channel], sweeper.parameter.name, value)

elif (
sweeper.parameter == Parameter.amplitude
or sweeper.parameter == Parameter.duration
):
for pulse in sweeper.pulses:
setattr(pulse, sweeper.parameter.name, value)

else:
raise ValueError(
"Sweeper parameter not supported", sweeper.parameter.name
)

# Next, we play the modified pulse sequence or move to an inner sweeper
inner_results = self.play(configs, sequences, options, sweepers)

# Finally, we accumulate the results
# We can have either (nshots x sweeper_length x (inner_results.shape))
# or (sweeper_length x inner_results.shape) depending on the averaging mode
# Hence, we set the array per key accordingly
if len(outer_results) == 0:
outer_results = {
key: np.zeros(result_shape) for key in inner_results.keys()
}

if options.averaging_mode is AveragingMode.SINGLESHOT:
outer_results[:, idx] = inner_results
else:
outer_results[idx] = inner_results

return outer_results

@abstractmethod
def play(
self,
configs: dict[str, Config],
sequences: list[PulseSequence],
options: ExecutionParameters,
sweepers: list[ParallelSweepers],
) -> dict[int, Result]:
"""Play a pulse sequence and retrieve feedback.

If :class:`qibolab.sweeper.Sweeper` objects are passed as arguments, they are
executed in real-time. If not possible, an error is raised.

Returns a mapping with the id of the probe pulses used to acquired data.
"""

# As the AWG does not support hardware sweepers, we recursively sweep the sweepers and modify the sequence in-place
if len(sweepers) != 0:
return self.recursive_sweep(configs, sequences, options, sweepers)

# When we finally reach the innermost regime, we can play the pulse sequences
for sequence in sequences:
channel_waveform_map = self.generate_waveforms(sequence, configs)
# Upload waveforms to instrument and play pulse sequence
5 changes: 5 additions & 0 deletions src/qibolab/_core/instruments/icarusq/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from . import rfsoc
from .rfsoc import *

__all__ = []
__all__ += rfsoc.__all__
151 changes: 151 additions & 0 deletions src/qibolab/_core/instruments/icarusq/rfsoc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import numpy as np
from icarusq_rfsoc_driver import IcarusQRFSoC

from qibolab._core.components import AcquisitionChannel, Config, IqChannel
from qibolab._core.execution_parameters import (
AcquisitionType,
AveragingMode,
ExecutionParameters,
)
from qibolab._core.identifier import Result
from qibolab._core.instruments.awg import AWG
from qibolab._core.pulses.pulse import PulseId
from qibolab._core.sequence import PulseSequence
from qibolab._core.sweeper import ParallelSweepers
from qibolab._core.unrolling import Bounds

BOUNDS = Bounds(waveforms=Bounds(waveforms=1, readout=1, instructions=1))
SAMPLING_RATE = 5.89824
ADC_SAMPLING_RATE = 1.96608


class RFSOC(AWG):

device: IcarusQRFSoC = None
internal_channel_mapping: dict[str, int]
bounds: str = "icarusq/bounds"

def connect(self):
host, port = self.address.split(":")
self.device = IcarusQRFSoC(host, int(port))

def sampling_rate(self) -> int:
return SAMPLING_RATE

def play(
self,
configs: dict[str, Config],
sequences: list[PulseSequence],
options: ExecutionParameters,
sweepers: list[ParallelSweepers],
) -> dict[int, Result]:

if len(sweepers) != 0:
return self.recursive_sweep(configs, sequences, options, sweepers)

results = {}
for sequence in sequences:
new_sequence = sequence.align_to_delays()
qibolab_channel_waveform_map = self.generate_waveforms(
new_sequence, configs
)
rfsoc_channel_waveform_map = {}
acquisitions: dict[int, PulseId] = {}

for channel, waveform in qibolab_channel_waveform_map.items():

if isinstance(self.channels[channel], AcquisitionChannel):
continue
elif isinstance(self.channels[channel], IqChannel):
waveform = waveform[:, 0] + waveform[:, 1]

port = self.channels[channel].port
waveform.resize(self.device.dac[port].max_samples)
if port in rfsoc_channel_waveform_map:
rfsoc_channel_waveform_map[port] += waveform
else:
rfsoc_channel_waveform_map[port] = waveform

for channel, inputops in new_sequence.acquisitions:
if inputops.kind == "acquisition":
raise RuntimeError(
"IcarusQ currently does not support acquisition events"
)

channel_pulses = new_sequence.channel(channel)
if channel_pulses[0].kind == "delay":
delay_adc = (
int(
np.ceil(channel_pulses[0].duration * ADC_SAMPLING_RATE / 8)
/ 2
)
* 2
)
delay_dac = delay_adc * 1.5
else:
delay_adc = 0
delay_dac = 0

dac, adc = self.channels[channel].path.split(":")
dac_port = int(dac)
adc_port = int(adc)
frequency = configs[channel].frequency
waveform = self.generate_iq_waveform(
[inputops.probe],
SAMPLING_RATE,
self.device.dac[dac_port].max_samples,
frequency,
)
if dac_port in rfsoc_channel_waveform_map:
rfsoc_channel_waveform_map[dac_port] += waveform
else:
rfsoc_channel_waveform_map[dac_port] = waveform
rfsoc_acq_id = len(acquisitions)
acquisitions[rfsoc_acq_id] = inputops.id
self.device.program_qunit(
frequency, inputops.probe.duration * 1e-9, rfsoc_acq_id
)

self.device.dac[dac_port].delay = delay_dac
self.device.adc[adc_port].delay = delay_adc

payload = [
(port, waveform, None)
for port, waveform in rfsoc_channel_waveform_map.items()
]
self.device.upload_waveform(payload)

if options.acquisition_type is AcquisitionType.RAW:
self.device.arm_adc([adc_port], options.nshots)
temp = self.device.result()
result = {acq_id: temp[adc_port] for acq_id in acquisitions.values()}

else:
if options.acquisition_type is AcquisitionType.INTEGRATION:
self.device.set_qunit_mode(0)
elif options.acquisition_type is AcquisitionType.DISCRIMINATION:
self.device.set_qunit_mode(1)

temp = self.device.start_qunit_acquisition(
options.nshots, list(acquisitions.values())
)
if options.acquisition_type is AcquisitionType.INTEGRATION:
result = {
acquisitions[rfsoc_acq_id]: np.dstack((i_array, q_array))[0]
for rfsoc_acq_id, (i_array, q_array) in temp.items()
}
elif options.acquisition_type is AcquisitionType.DISCRIMINATION:
result = {
acq_id: temp[rfsoc_acq_id]
for rfsoc_acq_id, acq_id in acquisitions.items()
}

if options.averaging_mode is not AveragingMode.SINGLESHOT:
result = {
acq_id: np.average(array, axis=0)
for acq_id, array in result.items()
}

results.update(result)

return results
Loading