Skip to content

Commit

Permalink
Support continuous send mode for network sdr (#369)
Browse files Browse the repository at this point in the history
* support continuous send mode for network sdr

* support continuous send mode for network sdr

* add unittest

* wait an extra second for CI

* cleanup after yourself

* make test more ram friendly

* clear

* increase timeout for ci

* remove obsolete test line
  • Loading branch information
jopohl authored Nov 29, 2017
1 parent 1f94ed6 commit 89a7b5c
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 81 deletions.
10 changes: 8 additions & 2 deletions src/urh/controller/ContinuousSendDialogController.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self, project_manager, messages, modulators, total_samples: int, pa
self.total_samples = total_samples
self.ui.progressBarMessage.setMaximum(len(messages))

self.continuous_modulator = ContinuousModulator(messages, modulators)
self.continuous_modulator = ContinuousModulator(messages, modulators, num_repeats=self.ui.spinBoxNRepeat.value())
self.scene_manager = ContinuousSceneManager(ring_buffer=self.continuous_modulator.ring_buffer, parent=self)
self.scene_manager.init_scene()
self.graphics_view.setScene(self.scene_manager.scene)
Expand Down Expand Up @@ -70,6 +70,7 @@ def on_stop_clicked(self):

@pyqtSlot()
def on_start_clicked(self):
self.ui.spinBoxNRepeat.editingFinished.emit() # inform continuous modulator
if not self.continuous_modulator.is_running:
self.continuous_modulator.start()
super().on_start_clicked()
Expand All @@ -81,6 +82,11 @@ def on_clear_clicked(self):
self.scene_manager.clear_path()
self.reset()

@pyqtSlot()
def on_num_repeats_changed(self):
super().on_num_repeats_changed()
self.continuous_modulator.num_repeats = self.ui.spinBoxNRepeat.value()

def on_selected_device_changed(self):
self.ui.txtEditErrors.clear()
super().on_selected_device_changed()
Expand All @@ -96,7 +102,7 @@ def init_device(self):
try:
self.device.is_send_continuous = True
self.device.continuous_send_ring_buffer = self.continuous_modulator.ring_buffer
self.device.total_samples_to_send = self.total_samples
self.device.num_samples_to_send = self.total_samples

self._create_device_connects()
except ValueError as e:
Expand Down
45 changes: 22 additions & 23 deletions src/urh/dev/VirtualDevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class VirtualDevice(QObject):
index_changed = pyqtSignal(int, int)
sender_needs_restart = pyqtSignal()

native_only_msg = "Continuous send mode is only supported for native backend. " \
"You can change the configured device backend in options."
continuous_send_msg = "Continuous send mode is not supported for GNU Radio backend. " \
"You can change the configured device backend in options."

def __init__(self, backend_handler, name: str, mode: Mode, freq=None, sample_rate=None, bandwidth=None,
gain=None, if_gain=None, baseband_gain=None, samples_to_send=None,
Expand Down Expand Up @@ -126,7 +126,7 @@ def __init__(self, backend_handler, name: str, mode: Mode, freq=None, sample_rat
elif self.backend == Backends.network:
self.__dev = NetworkSDRInterfacePlugin(raw_mode=raw_mode,
resume_on_full_receive_buffer=resume_on_full_receive_buffer,
spectrum=self.mode == Mode.spectrum)
spectrum=self.mode == Mode.spectrum, sending=self.mode == Mode.send)
self.__dev.rcv_index_changed.connect(self.emit_index_changed)
self.__dev.samples_to_send = samples_to_send
elif self.backend == Backends.none:
Expand Down Expand Up @@ -177,46 +177,46 @@ def frequency(self, value):
raise ValueError("Unsupported Backend")

@property
def total_samples_to_send(self):
if self.backend == Backends.native:
return self.__dev.total_samples_to_send
def num_samples_to_send(self) -> int:
if self.backend in (Backends.native, Backends.network):
return self.__dev.num_samples_to_send
else:
raise ValueError()
raise ValueError(self.continuous_send_msg)

@total_samples_to_send.setter
def total_samples_to_send(self, value):
if self.backend == Backends.native:
self.__dev.total_samples_to_send = value
@num_samples_to_send.setter
def num_samples_to_send(self, value: int):
if self.backend in (Backends.native, Backends.network):
self.__dev.num_samples_to_send = value
else:
raise ValueError(self.native_only_msg)
raise ValueError(self.continuous_send_msg)

@property
def is_send_continuous(self) -> bool:
if self.backend == Backends.native:
if self.backend in (Backends.native, Backends.network):
return self.__dev.sending_is_continuous
else:
raise ValueError(self.native_only_msg)
raise ValueError(self.continuous_send_msg)

@is_send_continuous.setter
def is_send_continuous(self, value: bool):
if self.backend == Backends.native:
if self.backend in (Backends.native, Backends.network):
self.__dev.sending_is_continuous = value
else:
raise ValueError(self.native_only_msg)
raise ValueError(self.continuous_send_msg)

@property
def continuous_send_ring_buffer(self):
if self.backend == Backends.native:
if self.backend in (Backends.native, Backends.network):
return self.__dev.continuous_send_ring_buffer
else:
raise ValueError(self.native_only_msg)
raise ValueError(self.continuous_send_msg)

@continuous_send_ring_buffer.setter
def continuous_send_ring_buffer(self, value):
if self.backend == Backends.native:
if self.backend in (Backends.native, Backends.network):
self.__dev.continuous_send_ring_buffer = value
else:
raise ValueError(self.native_only_msg)
raise ValueError(self.continuous_send_msg)

@property
def is_in_spectrum_mode(self):
Expand Down Expand Up @@ -490,10 +490,8 @@ def current_iteration(self, value):
def sending_finished(self):
if self.backend == Backends.grc:
return self.__dev.current_iteration is None
elif self.backend == Backends.native:
elif self.backend in (Backends.native, Backends.network):
return self.__dev.sending_finished
elif self.backend == Backends.network:
return self.__dev.current_sent_sample == len(self.samples_to_send)
else:
raise ValueError("Unsupported Backend")

Expand Down Expand Up @@ -544,6 +542,7 @@ def stop(self, msg: str):
self.emit_stopped_signal()
elif self.backend == Backends.network:
self.__dev.stop_tcp_server()
self.__dev.stop_sending_thread()
self.emit_stopped_signal()
elif self.backend == Backends.none:
pass
Expand Down
7 changes: 5 additions & 2 deletions src/urh/dev/native/Device.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def __init__(self, center_freq, sample_rate, bandwidth, gain, if_gain=1, baseban
self.is_in_spectrum_mode = False
self.sending_is_continuous = False
self.continuous_send_ring_buffer = None
self.total_samples_to_send = None # None = get automatically. This value needs to be known in continuous send mode
self.num_samples_to_send = None # None = get automatically. This value needs to be known in continuous send mode
self._current_sent_sample = Value("L", 0)
self._current_sending_repeat = Value("L", 0)

Expand Down Expand Up @@ -282,7 +282,10 @@ def device_parameters(self) -> OrderedDict:

@property
def send_config(self) -> SendConfig:
total_samples = len(self.send_buffer) if self.total_samples_to_send is None else 2 * self.total_samples_to_send
if self.num_samples_to_send is None:
total_samples = len(self.send_buffer)
else:
total_samples = 2 * self.num_samples_to_send
return SendConfig(self.send_buffer, self._current_sent_sample, self._current_sending_repeat,
total_samples, self.sending_repeats, continuous=self.sending_is_continuous,
pack_complex_method=self.pack_complex,
Expand Down
97 changes: 63 additions & 34 deletions src/urh/plugins/NetworkSDRInterface/NetworkSDRInterfacePlugin.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
import socket
import socketserver
import threading

import time

import numpy as np
import psutil
from PyQt5.QtCore import pyqtSlot
from PyQt5.QtCore import QTimer
from PyQt5.QtCore import pyqtSignal
import socket
from PyQt5.QtCore import pyqtSlot, QTimer, pyqtSignal

from urh import constants
from urh.plugins.Plugin import SDRPlugin
from urh.signalprocessing.Message import Message
from urh.util.Errors import Errors
from urh.util.Logger import logger
from urh.util.RingBuffer import RingBuffer
from urh.util.SettingsProxy import SettingsProxy


Expand Down Expand Up @@ -45,7 +41,7 @@ def handle(self):
self.server.current_receive_index:self.server.current_receive_index + len(received)] = received
self.server.current_receive_index += len(received)

def __init__(self, raw_mode=False, resume_on_full_receive_buffer=False, spectrum=False):
def __init__(self, raw_mode=False, resume_on_full_receive_buffer=False, spectrum=False, sending=False):
"""
:param raw_mode: If true, sending and receiving raw samples if false bits are received/sent
Expand All @@ -54,6 +50,8 @@ def __init__(self, raw_mode=False, resume_on_full_receive_buffer=False, spectrum
self.client_ip = self.qsettings.value("client_ip", defaultValue="127.0.0.1", type=str)
self.server_ip = ""

self.samples_to_send = None # set in virtual device constructor

self.client_port = self.qsettings.value("client_port", defaultValue=2222, type=int)
self.server_port = self.qsettings.value("server_port", defaultValue=4444, type=int)

Expand All @@ -71,25 +69,30 @@ def __init__(self, raw_mode=False, resume_on_full_receive_buffer=False, spectrum
self.current_sent_sample = 0
self.current_sending_repeat = 0

self.sending_is_continuous = False
self.continuous_send_ring_buffer = None
self.num_samples_to_send = None # Only used for continuous send mode

self.raw_mode = raw_mode
if self.raw_mode:
num_samples = SettingsProxy.get_receive_buffer_size(self.resume_on_full_receive_buffer,
self.is_in_spectrum_mode)
try:
self.receive_buffer = np.zeros(num_samples, dtype=np.complex64, order='C')
except MemoryError:
logger.warning("Could not allocate buffer with {0:d} samples, trying less...")
i = 0
while True:
try:
i += 2
self.receive_buffer = np.zeros(num_samples // i, dtype=np.complex64, order='C')
logger.debug("Using buffer with {0:d} samples instead.".format(num_samples // i))
break
except MemoryError:
continue
else:
self.received_bits = []
if not sending:
if self.raw_mode:
num_samples = SettingsProxy.get_receive_buffer_size(self.resume_on_full_receive_buffer,
self.is_in_spectrum_mode)
try:
self.receive_buffer = np.zeros(num_samples, dtype=np.complex64, order='C')
except MemoryError:
logger.warning("Could not allocate buffer with {0:d} samples, trying less...")
i = 0
while True:
try:
i += 2
self.receive_buffer = np.zeros(num_samples // i, dtype=np.complex64, order='C')
logger.debug("Using buffer with {0:d} samples instead.".format(num_samples // i))
break
except MemoryError:
continue
else:
self.received_bits = []

@property
def is_sending(self) -> bool:
Expand All @@ -101,6 +104,10 @@ def is_sending(self, value: bool):
self.__is_sending = value
self.sending_status_changed.emit(self.__is_sending)

@property
def sending_finished(self) -> bool:
return self.current_sending_repeat >= self.sending_repeats if self.sending_repeats > 0 else False

@property
def received_data(self):
if self.raw_mode:
Expand Down Expand Up @@ -176,18 +183,34 @@ def send_data(self, data) -> str:

def send_raw_data(self, data: np.ndarray, num_repeats: int):
byte_data = data.tostring()

if num_repeats == -1:
# forever
rng = iter(int, 1)
else:
rng = range(0, num_repeats)
rng = iter(int, 1) if num_repeats <= 0 else range(0, num_repeats) # <= 0 = forever

for _ in rng:
if self.__sending_interrupt_requested:
break
self.send_data(byte_data)
self.current_sent_sample = len(data)
self.current_sending_repeat += 1

def send_raw_data_continuously(self, ring_buffer: RingBuffer, num_samples_to_send: int, num_repeats: int):
rng = iter(int, 1) if num_repeats <= 0 else range(0, num_repeats) # <= 0 = forever
samples_per_iteration = 65536 // 2

for _ in rng:
if self.__sending_interrupt_requested:
break
while self.current_sent_sample < num_samples_to_send:
if self.__sending_interrupt_requested:
break
n = max(0, min(samples_per_iteration, num_samples_to_send - self.current_sent_sample))
data = ring_buffer.pop(n, ensure_even_length=True)
self.send_data(data)
self.current_sent_sample += len(data)
self.current_sending_repeat += 1
self.current_sent_sample = 0

self.current_sent_sample = num_samples_to_send

def __send_messages(self, messages, sample_rates):
"""
Expand Down Expand Up @@ -235,8 +258,14 @@ def start_message_sending_thread(self, messages, sample_rates):

def start_raw_sending_thread(self):
self.__sending_interrupt_requested = False
self.sending_thread = threading.Thread(target=self.send_raw_data,
args=(self.samples_to_send, self.sending_repeats))
if self.sending_is_continuous:
self.sending_thread = threading.Thread(target=self.send_raw_data_continuously,
args=(self.continuous_send_ring_buffer,
self.num_samples_to_send, self.sending_repeats))
else:
self.sending_thread = threading.Thread(target=self.send_raw_data,
args=(self.samples_to_send, self.sending_repeats))

self.sending_thread.daemon = True
self.sending_thread.start()

Expand Down
13 changes: 8 additions & 5 deletions src/urh/signalprocessing/ContinuousModulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ class ContinuousModulator(object):
BUFFER_SIZE_MB = 100
WAIT_TIMEOUT = 0.1

def __init__(self, messages, modulators):
def __init__(self, messages, modulators, num_repeats=-1):
"""
:type messages: list of Message
:type modulators: list of Modulator
"""
self.messages = messages
self.modulators = modulators
self.num_repeats = num_repeats # -1 or 0 = infinite

self.ring_buffer = RingBuffer(int(self.BUFFER_SIZE_MB*10**6)//8)

self.current_message_index = Value("L", 0)

self.abort = Value("i", 0)
self.process = Process(target=self.modulate_continuously)
self.process = Process(target=self.modulate_continuously, args=(self.num_repeats, ))
self.process.daemon = True

@property
Expand All @@ -41,7 +42,7 @@ def is_running(self):
def start(self):
self.abort.value = 0
try:
self.process = Process(target=self.modulate_continuously)
self.process = Process(target=self.modulate_continuously, args=(self.num_repeats, ))
self.process.daemon = True
self.process.start()
except RuntimeError as e:
Expand All @@ -65,8 +66,9 @@ def stop(self, clear_buffer=True):

logger.debug("Stopped continuous modulation")

def modulate_continuously(self):
while True:
def modulate_continuously(self, num_repeats):
rng = iter(int, 1) if num_repeats <= 0 else range(0, num_repeats) # <= 0 = forever
for _ in rng:
start = self.current_message_index.value
for i in range(start, len(self.messages)):
if self.abort.value:
Expand All @@ -83,3 +85,4 @@ def modulate_continuously(self):
# Wait till there is space in buffer
time.sleep(self.WAIT_TIMEOUT)
self.ring_buffer.push(modulator.modulated_samples)
self.current_message_index.value = 0
5 changes: 4 additions & 1 deletion src/urh/util/RingBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,17 @@ def push(self, values: np.ndarray):

self.__increase_current_index_by(n)

def pop(self, number: int) -> np.ndarray:
def pop(self, number: int, ensure_even_length=False) -> np.ndarray:
"""
Pop number of elements. If there are not enough elements, all remaining elements are returned and the
buffer is cleared afterwards. If buffer is empty, an empty numpy array is returned.
"""
if number > self.current_index:
number = self.current_index

if ensure_even_length:
number -= number % 2

with self.__data.get_lock():
self.current_index -= number
result = np.copy(self.data[0:number])
Expand Down
Loading

0 comments on commit 89a7b5c

Please sign in to comment.