diff --git a/data/azure-pipelines.yml b/data/azure-pipelines.yml
index b88a45a92c..da712a77e8 100644
--- a/data/azure-pipelines.yml
+++ b/data/azure-pipelines.yml
@@ -34,13 +34,13 @@ jobs:
- script: |
touch tests/show_gui
- pytest -s --boxed --junitxml=junit/test-results.xml --cov=src/urh --cov-report=xml --cov-report=html --cov-config tests/.coveragerc tests
+ pytest --boxed --junitxml=junit/test-results.xml --cov=src/urh --cov-report=xml --cov-report=html --cov-config tests/.coveragerc tests
displayName: 'Run pytest with coverage'
condition: eq(variables['python.version'], '3.6')
- script: |
touch tests/show_gui
- pytest -s --boxed --junitxml=junit/test-results.xml tests
+ pytest --boxed --junitxml=junit/test-results.xml tests
displayName: 'Run pytest without coverage'
condition: ne(variables['python.version'], '3.6')
@@ -162,7 +162,7 @@ jobs:
assetUploadMode: 'replace'
addChangeLog: true
- - script: pytest --junitxml=junit/test-results.xml -s -v tests
+ - script: pytest --junitxml=junit/test-results.xml tests
displayName: 'Run pytest'
- task: PublishTestResults@2
@@ -202,7 +202,7 @@ jobs:
- script: python setup.py build_ext --inplace
displayName: "Build extensions"
- - script: pytest --junitxml=junit/test-results.xml -s -v tests
+ - script: pytest --junitxml=junit/test-results.xml tests
displayName: 'Run pytest'
- task: PublishTestResults@2
diff --git a/data/ui/send_recv_sniff_settings.ui b/data/ui/send_recv_sniff_settings.ui
index 806d2bf9ed..24b6a8df04 100644
--- a/data/ui/send_recv_sniff_settings.ui
+++ b/data/ui/send_recv_sniff_settings.ui
@@ -7,7 +7,7 @@
0
0
482
- 377
+ 388
@@ -147,19 +147,6 @@ QGroupBox::indicator:checked {
- -
-
-
- 4
-
-
- -3.140000000000000
-
-
- 3.140000000000000
-
-
-
-
@@ -303,6 +290,36 @@ QGroupBox::indicator:checked {
+ -
+
+
-
+
+
+
+ 0
+ 0
+
+
+
+ 4
+
+
+ -3.140000000000000
+
+
+ 3.140000000000000
+
+
+
+ -
+
+
+ Automatic
+
+
+
+
+
@@ -314,7 +331,6 @@ QGroupBox::indicator:checked {
groupBoxSniffSettings
spinbox_sniff_Noise
- spinbox_sniff_Center
spinbox_sniff_BitLen
spinbox_sniff_ErrorTolerance
combox_sniff_Modulation
diff --git a/src/urh/ainterpretation/AutoInterpretation.py b/src/urh/ainterpretation/AutoInterpretation.py
index 76449efdf9..ca9bdc8436 100644
--- a/src/urh/ainterpretation/AutoInterpretation.py
+++ b/src/urh/ainterpretation/AutoInterpretation.py
@@ -28,7 +28,6 @@ def min_without_outliers(data: np.ndarray, z=2):
return np.min(data[abs(data - np.mean(data)) <= z * np.std(data)])
-
def get_most_frequent_value(values: list):
"""
Return the most frequent value in list.
@@ -192,23 +191,40 @@ def detect_modulation_for_messages(signal: np.ndarray, message_indices: list) ->
return max(set(modulations_for_messages), key=modulations_for_messages.count)
-def detect_center(rectangular_signal: np.ndarray):
+def detect_center(rectangular_signal: np.ndarray, max_size=None):
rect = rectangular_signal[rectangular_signal > -4] # do not consider noise
+ # Ignore the first and last 5% of samples,
+ # because there tends to be an overshoot at start/end of rectangular signal
+ rect = rect[int(0.05*len(rect)):int(0.95*len(rect))]
+
+ if max_size is not None and len(rect) > max_size:
+ rect = rect[0:max_size]
+
hist_min, hist_max = util.minmax(rect)
- hist_step = 0.05
+
+ # The step size of histogram is set to variance of the rectangular signal
+ # If a signal has low variance we need to be more accurate at center detection
+ hist_step = float(np.var(rect))
y, x = np.histogram(rect, bins=np.arange(hist_min, hist_max + hist_step, hist_step))
num_values = 2
most_common_levels = []
+ window_size = max(2, int(0.05*len(y)))
+
+ def get_elem(arr, index: int, default):
+ if 0 <= index < len(arr):
+ return arr[index]
+ else:
+ return default
+
for index in np.argsort(y)[::-1]:
# check if we have a local maximum in histogram, if yes, append the value
- left = y[index - 1] if index > 0 else 0
- right = y[index + 1] if index < len(y) - 1 else 0
-
- if left < y[index] and y[index] > right:
+ if all(y[index] > get_elem(y, index+i, 0) and
+ y[index] > get_elem(y, index-i, 0)
+ for i in range(1, window_size+1)):
most_common_levels.append(x[index])
if len(most_common_levels) == num_values:
diff --git a/src/urh/controller/widgets/SniffSettingsWidget.py b/src/urh/controller/widgets/SniffSettingsWidget.py
index 65e4d7817f..3b14aecd20 100644
--- a/src/urh/controller/widgets/SniffSettingsWidget.py
+++ b/src/urh/controller/widgets/SniffSettingsWidget.py
@@ -38,6 +38,9 @@ def __init__(self, device_name: str, project_manager: ProjectManager, signal=Non
backend_handler=BackendHandler() if backend_handler is None else backend_handler,
network_raw_mode=network_raw_mode)
+ self.sniffer.adaptive_noise = self.ui.checkBoxAdaptiveNoise.isChecked()
+ self.sniffer.automatic_center = self.ui.checkBoxAutoCenter.isChecked()
+
self.create_connects()
self.ui.comboBox_sniff_encoding.currentIndexChanged.emit(self.ui.comboBox_sniff_encoding.currentIndex())
self.ui.comboBox_sniff_viewtype.setCurrentIndex(constants.SETTINGS.value('default_view', 0, int))
@@ -77,6 +80,8 @@ def set_val(widget, key: str, default):
set_val(self.ui.combox_sniff_Modulation, "modulation_index", signal.modulation_type if signal else 1)
self.ui.comboBox_sniff_encoding.setCurrentText(conf_dict.get("decoding_name", ""))
self.ui.checkBoxAdaptiveNoise.setChecked(bool(conf_dict.get("adaptive_noise", False)))
+ self.ui.checkBoxAutoCenter.setChecked(bool(conf_dict.get("automatic_center", False)))
+ self.ui.spinbox_sniff_Center.setDisabled(self.ui.checkBoxAutoCenter.isChecked())
self.emit_editing_finished_signals()
@@ -92,6 +97,7 @@ def create_connects(self):
self.ui.checkBox_sniff_Timestamp.clicked.connect(self.on_checkbox_sniff_timestamp_clicked)
self.ui.btn_sniff_use_signal.clicked.connect(self.on_btn_sniff_use_signal_clicked)
self.ui.checkBoxAdaptiveNoise.clicked.connect(self.on_check_box_adaptive_noise_clicked)
+ self.ui.checkBoxAutoCenter.clicked.connect(self.on_check_box_auto_center_clicked)
def emit_editing_finished_signals(self):
self.ui.spinbox_sniff_Noise.editingFinished.emit()
@@ -108,7 +114,8 @@ def emit_sniff_parameters_changed(self):
tolerance=self.sniffer.signal.tolerance,
modulation_index=self.sniffer.signal.modulation_type,
decoding_name=self.sniffer.decoder.name,
- adaptive_noise=self.sniffer.adaptive_noise))
+ adaptive_noise=self.sniffer.adaptive_noise,
+ automatic_center=self.sniffer.automatic_center))
@pyqtSlot()
def on_noise_edited(self):
@@ -186,3 +193,8 @@ def on_btn_sniff_use_signal_clicked(self):
@pyqtSlot()
def on_check_box_adaptive_noise_clicked(self):
self.sniffer.adaptive_noise = self.ui.checkBoxAdaptiveNoise.isChecked()
+
+ @pyqtSlot()
+ def on_check_box_auto_center_clicked(self):
+ self.sniffer.automatic_center = self.ui.checkBoxAutoCenter.isChecked()
+ self.ui.spinbox_sniff_Center.setDisabled(self.ui.checkBoxAutoCenter.isChecked())
diff --git a/src/urh/signalprocessing/ProtocolSniffer.py b/src/urh/signalprocessing/ProtocolSniffer.py
index b171bc0dc2..77620e62f8 100644
--- a/src/urh/signalprocessing/ProtocolSniffer.py
+++ b/src/urh/signalprocessing/ProtocolSniffer.py
@@ -5,8 +5,9 @@
import numpy as np
from PyQt5.QtCore import pyqtSignal, QObject
-
from urh.cythonext.signal_functions import grab_pulse_lens
+
+from urh.ainterpretation import AutoInterpretation
from urh.dev.BackendHandler import BackendHandler, Backends
from urh.dev.VirtualDevice import VirtualDevice, Mode
from urh.signalprocessing.Message import Message
@@ -24,6 +25,8 @@ class ProtocolSniffer(ProtocolAnalyzer, QObject):
stopped = pyqtSignal()
message_sniffed = pyqtSignal(int)
+ BUFFER_SIZE_MB = 100
+
def __init__(self, bit_len: int, center: float, noise: float, tolerance: int,
modulation_type: int, device: str, backend_handler: BackendHandler, network_raw_mode=False):
signal = Signal("", "LiveSignal")
@@ -45,9 +48,12 @@ def __init__(self, bit_len: int, center: float, noise: float, tolerance: int,
self.rcv_device.started.connect(self.__emit_started)
self.rcv_device.stopped.connect(self.__emit_stopped)
- self.data_cache = []
+ self.__buffer = np.zeros(int(self.BUFFER_SIZE_MB * 1000 * 1000 / 8), dtype=np.complex64)
+ self.__current_buffer_index = 0
+
self.reading_data = False
self.adaptive_noise = False
+ self.automatic_center = False
self.pause_length = 0
self.is_running = False
@@ -57,6 +63,20 @@ def __init__(self, bit_len: int, center: float, noise: float, tolerance: int,
self.__sniff_file = ""
self.__store_data = True
+ def __add_to_buffer(self, data: np.ndarray):
+ n = len(data)
+ if n + self.__current_buffer_index > len(self.__buffer):
+ n = len(self.__buffer) - self.__current_buffer_index - 1
+
+ self.__buffer[self.__current_buffer_index:self.__current_buffer_index + n] = data[:]
+ self.__current_buffer_index += n
+
+ def __clear_buffer(self):
+ self.__current_buffer_index = 0
+
+ def __buffer_is_full(self):
+ return self.__current_buffer_index >= len(self.__buffer) - 2
+
def decoded_to_string(self, view: int, start=0, include_timestamps=True):
result = []
for msg in self.messages[start:]:
@@ -118,7 +138,7 @@ def check_for_data(self):
msg = Message.from_plain_bits_str(bit_str)
msg.decoder = self.decoder
self.messages.append(msg)
- self.message_sniffed.emit(len(self.messages)-1)
+ self.message_sniffed.emit(len(self.messages) - 1)
self.rcv_device.free_data() # do not store received bits twice
@@ -140,30 +160,36 @@ def __demodulate_data(self, data):
if len(data) == 0:
return
- rssi_squared = np.mean(data.real ** 2 + data.imag ** 2)
- is_above_noise = rssi_squared > self.signal.noise_threshold ** 2
+ power_spectrum = data.real ** 2 + data.imag ** 2
+ is_above_noise = np.sqrt(np.mean(power_spectrum)) > self.signal.noise_threshold
+
if self.adaptive_noise and not is_above_noise:
- self.signal.noise_threshold = 0.9 * self.signal.noise_threshold + 0.1 * np.max(np.abs(data))
+ self.signal.noise_threshold = 0.9 * self.signal.noise_threshold + 0.1 * np.sqrt(np.max(power_spectrum))
if is_above_noise:
- self.data_cache.append(data)
+ self.__add_to_buffer(data)
self.pause_length = 0
- return
+ if not self.__buffer_is_full():
+ return
else:
self.pause_length += len(data)
if self.pause_length < 10 * self.signal.bit_len:
- self.data_cache.append(data)
- return
+ self.__add_to_buffer(data)
+ if not self.__buffer_is_full():
+ return
- if len(self.data_cache) == 0:
+ if self.__current_buffer_index == 0:
return
# clear cache and start a new message
- self.signal._fulldata = np.concatenate(self.data_cache)
- self.data_cache.clear()
+ self.signal._fulldata = self.__buffer[0:self.__current_buffer_index]
+ self.__clear_buffer()
self.signal._qad = None
bit_len = self.signal.bit_len
+ if self.automatic_center:
+ self.signal.qad_center = AutoInterpretation.detect_center(self.signal.qad, max_size=150*self.signal.bit_len)
+
ppseq = grab_pulse_lens(self.signal.qad, self.signal.qad_center,
self.signal.tolerance, self.signal.modulation_type, self.signal.bit_len)
@@ -173,7 +199,7 @@ def __demodulate_data(self, data):
message = Message(bits, pause, bit_len=bit_len, message_type=self.default_message_type,
decoder=self.decoder)
self.messages.append(message)
- self.message_sniffed.emit(len(self.messages)-1)
+ self.message_sniffed.emit(len(self.messages) - 1)
def stop(self):
self.is_running = False
@@ -184,7 +210,7 @@ def stop(self):
logger.error("Sniff thread is still alive")
def clear(self):
- self.data_cache.clear()
+ self.__clear_buffer()
self.messages.clear()
def __emit_started(self):
diff --git a/src/urh/ui/ui_send_recv_sniff_settings.py b/src/urh/ui/ui_send_recv_sniff_settings.py
index 205c7a74ab..faff9b2f04 100644
--- a/src/urh/ui/ui_send_recv_sniff_settings.py
+++ b/src/urh/ui/ui_send_recv_sniff_settings.py
@@ -9,7 +9,7 @@
class Ui_SniffSettings(object):
def setupUi(self, SniffSettings):
SniffSettings.setObjectName("SniffSettings")
- SniffSettings.resize(482, 377)
+ SniffSettings.resize(482, 388)
self.verticalLayout = QtWidgets.QVBoxLayout(SniffSettings)
self.verticalLayout.setContentsMargins(0, 0, 0, 0)
self.verticalLayout.setObjectName("verticalLayout")
@@ -84,12 +84,6 @@ def setupUi(self, SniffSettings):
self.label_sniff_Center = QtWidgets.QLabel(self.frame)
self.label_sniff_Center.setObjectName("label_sniff_Center")
self.gridLayout.addWidget(self.label_sniff_Center, 2, 0, 1, 1)
- self.spinbox_sniff_Center = QtWidgets.QDoubleSpinBox(self.frame)
- self.spinbox_sniff_Center.setDecimals(4)
- self.spinbox_sniff_Center.setMinimum(-3.14)
- self.spinbox_sniff_Center.setMaximum(3.14)
- self.spinbox_sniff_Center.setObjectName("spinbox_sniff_Center")
- self.gridLayout.addWidget(self.spinbox_sniff_Center, 2, 1, 1, 1)
self.label_sniff_BitLength = QtWidgets.QLabel(self.frame)
self.label_sniff_BitLength.setObjectName("label_sniff_BitLength")
self.gridLayout.addWidget(self.label_sniff_BitLength, 3, 0, 1, 1)
@@ -155,14 +149,30 @@ def setupUi(self, SniffSettings):
self.lineEdit_sniff_OutputFile.setClearButtonEnabled(True)
self.lineEdit_sniff_OutputFile.setObjectName("lineEdit_sniff_OutputFile")
self.gridLayout.addWidget(self.lineEdit_sniff_OutputFile, 8, 1, 1, 1)
+ self.horizontalLayout_4 = QtWidgets.QHBoxLayout()
+ self.horizontalLayout_4.setObjectName("horizontalLayout_4")
+ self.spinbox_sniff_Center = QtWidgets.QDoubleSpinBox(self.frame)
+ sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed)
+ sizePolicy.setHorizontalStretch(0)
+ sizePolicy.setVerticalStretch(0)
+ sizePolicy.setHeightForWidth(self.spinbox_sniff_Center.sizePolicy().hasHeightForWidth())
+ self.spinbox_sniff_Center.setSizePolicy(sizePolicy)
+ self.spinbox_sniff_Center.setDecimals(4)
+ self.spinbox_sniff_Center.setMinimum(-3.14)
+ self.spinbox_sniff_Center.setMaximum(3.14)
+ self.spinbox_sniff_Center.setObjectName("spinbox_sniff_Center")
+ self.horizontalLayout_4.addWidget(self.spinbox_sniff_Center)
+ self.checkBoxAutoCenter = QtWidgets.QCheckBox(self.frame)
+ self.checkBoxAutoCenter.setObjectName("checkBoxAutoCenter")
+ self.horizontalLayout_4.addWidget(self.checkBoxAutoCenter)
+ self.gridLayout.addLayout(self.horizontalLayout_4, 2, 1, 1, 1)
self.gridLayout_3.addWidget(self.frame, 0, 0, 1, 1)
self.verticalLayout.addWidget(self.groupBoxSniffSettings)
self.retranslateUi(SniffSettings)
self.groupBoxSniffSettings.toggled['bool'].connect(self.frame.setVisible)
SniffSettings.setTabOrder(self.groupBoxSniffSettings, self.spinbox_sniff_Noise)
- SniffSettings.setTabOrder(self.spinbox_sniff_Noise, self.spinbox_sniff_Center)
- SniffSettings.setTabOrder(self.spinbox_sniff_Center, self.spinbox_sniff_BitLen)
+ SniffSettings.setTabOrder(self.spinbox_sniff_Noise, self.spinbox_sniff_BitLen)
SniffSettings.setTabOrder(self.spinbox_sniff_BitLen, self.spinbox_sniff_ErrorTolerance)
SniffSettings.setTabOrder(self.spinbox_sniff_ErrorTolerance, self.combox_sniff_Modulation)
SniffSettings.setTabOrder(self.combox_sniff_Modulation, self.comboBox_sniff_encoding)
@@ -194,5 +204,6 @@ def retranslateUi(self, SniffSettings):
self.checkBox_sniff_Timestamp.setText(_translate("SniffSettings", "Show Timestamp"))
self.label_sniff_OutputFile.setText(_translate("SniffSettings", "Write bitstream to file:"))
self.lineEdit_sniff_OutputFile.setPlaceholderText(_translate("SniffSettings", "None"))
+ self.checkBoxAutoCenter.setText(_translate("SniffSettings", "Automatic"))
from . import urh_rc
diff --git a/tests/QtTestCase.py b/tests/QtTestCase.py
index ebcb7fc37b..cf2481bfc1 100644
--- a/tests/QtTestCase.py
+++ b/tests/QtTestCase.py
@@ -10,6 +10,7 @@
from PyQt5.QtGui import QDropEvent
from PyQt5.QtTest import QTest
from PyQt5.QtWidgets import QApplication
+from urh.signalprocessing.ProtocolSniffer import ProtocolSniffer
from tests.utils_testing import write_settings, get_path_for_data_file
from urh.controller.MainController import MainController
@@ -38,12 +39,12 @@ def setUpClass(cls):
def tearDownClass(cls):
cls.app.quit()
- sip.delete(cls.app)
cls.app = None
QTest.qWait(10)
time.sleep(0.1)
def setUp(self):
+ ProtocolSniffer.BUFFER_SIZE_MB = 0.5
self.form = MainController()
if self.SHOW:
self.form.show()
diff --git a/tests/auto_interpretation/auto_interpretation_test_util.py b/tests/auto_interpretation/auto_interpretation_test_util.py
index b04719d31c..26ddb3b63c 100644
--- a/tests/auto_interpretation/auto_interpretation_test_util.py
+++ b/tests/auto_interpretation/auto_interpretation_test_util.py
@@ -1,3 +1,9 @@
+import random
+
+import numpy as np
+
+from urh.signalprocessing.Message import Message
+from urh.signalprocessing.Modulator import Modulator
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
from urh.signalprocessing.Signal import Signal
@@ -17,3 +23,59 @@ def demodulate(signal_data, mod_type: str, bit_length, center, noise, tolerance,
pa.decoder = decoding
pa.get_protocol_from_signal()
return pa.decoded_hex_str
+
+
+def generate_signal(messages: list, modulator: Modulator, snr_db: int, add_noise=True):
+ result = []
+
+ message_powers = []
+ if isinstance(messages, Message):
+ messages = [messages]
+
+ for msg in messages:
+ modulated = modulator.modulate(msg.encoded_bits, msg.pause)
+
+ if add_noise:
+ message_powers.append(np.mean(np.abs(modulated[:len(modulated) - msg.pause])))
+
+ result.append(modulated)
+
+ result = np.concatenate(result)
+ if not add_noise:
+ return result
+
+ noise = np.random.normal(loc=0, scale=1, size=2 * len(result)).astype(np.float32).view(np.complex64)
+
+ # https://stackoverflow.com/questions/23690766/proper-way-to-add-noise-to-signal
+ snr_ratio = np.power(10, snr_db / 10)
+
+ signal_power = np.mean(message_powers)
+ noise_power = signal_power / snr_ratio
+ noise = 1 / np.sqrt(2) * noise_power * noise
+
+ return result + noise
+
+
+def generate_message_bits(num_bits=80, preamble="", sync="", eof=""):
+ bits_to_generate = num_bits - (len(preamble) + len(sync) + len(eof))
+
+ if bits_to_generate < 0:
+ raise ValueError("Preamble and Sync and EOF are together larger than requested num bits")
+
+ bytes_to_generate = bits_to_generate // 8
+ leftover_bits = bits_to_generate % 8
+ return "".join([preamble, sync]
+ + ["{0:08b}".format(random.choice(range(0, 256))) for _ in range(bytes_to_generate)]
+ + [random.choice(["0", "1"]) for _ in range(leftover_bits)]
+ + [eof]
+ )
+
+
+def generate_random_messages(num_messages: int, num_bits: int,
+ preamble: str, sync: str, eof: str, message_pause: int):
+ return [
+ Message.from_plain_bits_str(
+ generate_message_bits(num_bits, preamble, sync, eof), pause=message_pause
+ )
+ for _ in range(num_messages)
+ ]
diff --git a/tests/auto_interpretation/test_center_detection.py b/tests/auto_interpretation/test_center_detection.py
index 9e60d7e2eb..8af5cb4331 100644
--- a/tests/auto_interpretation/test_center_detection.py
+++ b/tests/auto_interpretation/test_center_detection.py
@@ -5,6 +5,8 @@
from tests.test_util import get_path_for_data_file
from urh.ainterpretation.AutoInterpretation import detect_center
from urh.cythonext.signal_functions import afp_demod
+
+from urh.signalprocessing.Filter import Filter, FilterType
from urh.signalprocessing.Signal import Signal
@@ -46,7 +48,7 @@ def test_enocean_center_detection(self):
for i, msg in enumerate(messages):
center = detect_center(msg)
self.assertGreaterEqual(center, 0.04, msg=str(i))
- self.assertLessEqual(center, 0.066, msg=str(i))
+ self.assertLessEqual(center, 0.072, msg=str(i))
def test_ask_50_center_detection(self):
message_indices = [(0, 8000), (18000, 26000), (36000, 44000), (54000, 62000), (72000, 80000)]
@@ -74,6 +76,15 @@ def test_homematic_center_detection(self):
self.assertGreaterEqual(center2, -0.1377)
self.assertLessEqual(center2, -0.0367)
+ def test_noised_homematic_center_detection(self):
+ data = Signal(get_path_for_data_file("noised_homematic.complex"), "").data
+ rect = afp_demod(data, 0.0, 1)
+
+ center = detect_center(rect)
+
+ self.assertGreater(center, -0.0148)
+ self.assertLess(center, 0.0024)
+
def test_fsk_15db_center_detection(self):
data = Signal(get_path_for_data_file("FSK15.complex"), "").data
rect = afp_demod(data, 0, 1)
@@ -87,3 +98,20 @@ def test_fsk_10db_center_detection(self):
center = detect_center(rect)
self.assertGreaterEqual(center, -0.1413)
self.assertLessEqual(center, 0.05)
+
+ def test_fsk_live_capture(self):
+ data = Signal(get_path_for_data_file("fsk_live.coco"), "").data
+
+ n = 10
+ moving_average_filter = Filter([1/n for _ in range(n)], filter_type=FilterType.moving_average)
+ filtered_data = moving_average_filter.apply_fir_filter(data)
+
+ rect = afp_demod(filtered_data, 0.0175, 1)
+ center = detect_center(rect)
+ self.assertGreaterEqual(center, -0.0148, msg="Filtered")
+ self.assertLessEqual(center, 0.01, msg="Filtered")
+
+ rect = afp_demod(data, 0.0175, 1)
+ center = detect_center(rect)
+ self.assertGreaterEqual(center, -0.02, msg="Original")
+ self.assertLessEqual(center, 0.01, msg="Original")
diff --git a/tests/data/fsk_live.coco b/tests/data/fsk_live.coco
new file mode 100644
index 0000000000..725b62521f
Binary files /dev/null and b/tests/data/fsk_live.coco differ
diff --git a/tests/data/noised_homematic.complex b/tests/data/noised_homematic.complex
new file mode 100644
index 0000000000..c654415d8b
Binary files /dev/null and b/tests/data/noised_homematic.complex differ
diff --git a/tests/performance/simulator_perfomance.py b/tests/performance/simulator_perfomance.py
new file mode 100644
index 0000000000..d36a4773e8
--- /dev/null
+++ b/tests/performance/simulator_perfomance.py
@@ -0,0 +1,153 @@
+import socket
+import time
+from multiprocessing import Value, Process
+
+from urh.util.SettingsProxy import SettingsProxy
+
+from urh.signalprocessing.Modulator import Modulator
+
+from urh.signalprocessing.MessageType import MessageType
+
+from urh.simulator.Simulator import Simulator
+
+from urh.dev.EndlessSender import EndlessSender
+
+from urh.dev.BackendHandler import BackendHandler
+from urh.signalprocessing.ProtocolSniffer import ProtocolSniffer
+
+from urh.plugins.NetworkSDRInterface.NetworkSDRInterfacePlugin import NetworkSDRInterfacePlugin
+
+from tests.QtTestCase import QtTestCase
+from tests.utils_testing import get_path_for_data_file
+from urh.controller.MainController import MainController
+from urh.signalprocessing.Participant import Participant
+from urh.simulator.SimulatorMessage import SimulatorMessage
+
+from urh.util.Logger import logger
+
+import numpy as np
+
+def receive(port, current_index, target_index, elapsed):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ s.bind(("", port))
+ s.listen(1)
+
+ conn, addr = s.accept()
+ logger.debug('Receiver got connection from address:'.format(addr))
+
+ start = False
+ while True:
+ data = conn.recv(65536 * 8)
+
+ if not start:
+ start = True
+ t = time.time()
+
+ if len(data) > 0:
+ while len(data) % 8 != 0:
+ data += conn.recv(len(data) % 8)
+
+ arr = np.frombuffer(data, dtype=np.complex64)
+ current_index.value += len(arr)
+
+ if current_index.value == target_index:
+ break
+
+ conn.close()
+ elapsed.value = 1000 * (time.time() - t)
+ s.close()
+
+
+class TestSimulatorPerfomance(QtTestCase):
+ def setUp(self):
+ super().setUp()
+ self.num_zeros_for_pause = 1000
+
+ def test_performance(self):
+ self.form = MainController()
+ self.cfc = self.form.compare_frame_controller
+ self.stc = self.form.simulator_tab_controller
+ self.gtc = self.form.generator_tab_controller
+
+ self.form.add_signalfile(get_path_for_data_file("esaver.coco"))
+ self.sframe = self.form.signal_tab_controller.signal_frames[0]
+ self.sim_frame = self.form.simulator_tab_controller
+ self.form.ui.tabWidget.setCurrentIndex(3)
+ self.cfc.proto_analyzer.auto_assign_labels()
+
+ self.network_sdr_plugin_sender = NetworkSDRInterfacePlugin(raw_mode=True)
+
+ part_a = Participant("Device A", shortname="A", color_index=0)
+ part_b = Participant("Device B", shortname="B", color_index=1)
+ part_b.simulate = True
+
+ self.form.project_manager.participants.append(part_a)
+ self.form.project_manager.participants.append(part_b)
+ self.form.project_manager.project_updated.emit()
+
+ sniffer = ProtocolSniffer(100, 0.01, 0.1, 5, 1, NetworkSDRInterfacePlugin.NETWORK_SDR_NAME, BackendHandler(),
+ network_raw_mode=True)
+ sender = EndlessSender(BackendHandler(), NetworkSDRInterfacePlugin.NETWORK_SDR_NAME)
+
+ simulator = Simulator(self.stc.simulator_config, self.gtc.modulators, self.stc.sim_expression_parser,
+ self.form.project_manager, sniffer=sniffer, sender=sender)
+
+ pause = 100
+ msg_a = SimulatorMessage(part_b,
+ [1, 0] * 16 + [1, 1, 0, 0] * 8 + [0, 0, 1, 1] * 8 + [1, 0, 1, 1, 1, 0, 0, 1, 1, 1] * 4,
+ pause=pause, message_type=MessageType("empty_message_type"), source=part_a)
+
+ msg_b = SimulatorMessage(part_a,
+ [1, 0] * 16 + [1, 1, 0, 0] * 8 + [1, 1, 0, 0] * 8 + [1, 0, 1, 1, 1, 0, 0, 1, 1, 1] * 4,
+ pause=pause, message_type=MessageType("empty_message_type"), source=part_b)
+
+ self.stc.simulator_config.add_items([msg_a, msg_b], 0, None)
+ self.stc.simulator_config.update_active_participants()
+
+ port = self.get_free_port()
+ sniffer = simulator.sniffer
+ sniffer.rcv_device.set_server_port(port)
+
+ self.network_sdr_plugin_sender.client_port = port
+
+ sender = simulator.sender
+ port = self.get_free_port()
+ sender.device.set_client_port(port)
+ sender.device._VirtualDevice__dev.name = "simulator_sender"
+
+ current_index = Value("L")
+ elapsed = Value("f")
+ target_num_samples = 13600 + pause
+ receive_process = Process(target=receive, args=(port, current_index, target_num_samples, elapsed))
+ receive_process.daemon = True
+ receive_process.start()
+
+ # Ensure receiver is running
+ time.sleep(2)
+
+ # spy = QSignalSpy(self.network_sdr_plugin_receiver.rcv_index_changed)
+ simulator.start()
+
+ modulator = Modulator("test_modulator")
+ modulator.samples_per_bit = 100
+ modulator.carrier_freq_hz = 55e3
+
+ # yappi.start()
+
+ self.network_sdr_plugin_sender.send_raw_data(modulator.modulate(msg_a.encoded_bits), 1)
+ time.sleep(0.5)
+ # send some zeros to simulate the end of a message
+ self.network_sdr_plugin_sender.send_raw_data(np.zeros(self.num_zeros_for_pause, dtype=np.complex64), 1)
+ time.sleep(0.5)
+ receive_process.join(15)
+
+ logger.info("PROCESS TIME: {0:.2f}ms".format(elapsed.value))
+
+ # self.assertEqual(current_index.value, target_num_samples)
+ self.assertLess(elapsed.value, 200)
+
+ # timeout = spy.wait(2000)
+ # yappi.get_func_stats().print_all()
+ # yappi.get_thread_stats().print_all()
\ No newline at end of file
diff --git a/tests/test_filter.py b/tests/test_filter.py
index 5f4d82f77e..e33991b285 100644
--- a/tests/test_filter.py
+++ b/tests/test_filter.py
@@ -4,6 +4,7 @@
import time
from tests.QtTestCase import QtTestCase
+from urh.controller.widgets.SignalFrame import SignalFrame
from urh.signalprocessing.Filter import Filter
@@ -12,7 +13,7 @@ def setUp(self):
super().setUp()
self.add_signal_to_form("unaveraged.coco")
- self.sig_frame = self.form.signal_tab_controller.signal_frames[0]
+ self.sig_frame = self.form.signal_tab_controller.signal_frames[0] # type: SignalFrame
def test_fir_filter(self):
input_signal = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 42], dtype=np.complex64)
@@ -31,10 +32,13 @@ def test_filter_full_signal(self):
center = 0
self.sig_frame.ui.btnFilter.click()
+ self.sig_frame.ui.cbModulationType.setCurrentText("FSK")
self.sig_frame.ui.spinBoxInfoLen.setValue(bit_len)
self.sig_frame.ui.spinBoxInfoLen.editingFinished.emit()
self.sig_frame.ui.spinBoxCenterOffset.setValue(center)
self.sig_frame.ui.spinBoxCenterOffset.editingFinished.emit()
+ self.sig_frame.ui.spinBoxTolerance.setValue(5)
+ self.sig_frame.ui.spinBoxTolerance.editingFinished.emit()
self.assertTrue(self.sig_frame.proto_analyzer.plain_hex_str[0].startswith(expected),
msg=self.sig_frame.proto_analyzer.plain_hex_str[0])
diff --git a/tests/test_generator.py b/tests/test_generator.py
index 4aa5adfd61..23eca32348 100644
--- a/tests/test_generator.py
+++ b/tests/test_generator.py
@@ -25,9 +25,10 @@ def test_generation(self):
self.add_signal_to_form("ask.complex")
signal_frame = self.form.signal_tab_controller.signal_frames[0]
signal_frame.ui.cbModulationType.setCurrentIndex(0) # ASK
- signal_frame.ui.spinBoxInfoLen.setValue(295)
- signal_frame.ui.spinBoxCenterOffset.setValue(-0.1667)
- signal_frame.refresh()
+ signal_frame.ui.spinBoxInfoLen.setValue(300)
+ signal_frame.ui.spinBoxInfoLen.editingFinished.emit()
+ signal_frame.ui.spinBoxCenterOffset.setValue(0.032)
+ signal_frame.ui.spinBoxCenterOffset.editingFinished.emit()
signal_frame.ui.cbProtoView.setCurrentIndex(0)
proto = "1011001001011011011011011011011011001000000"
@@ -41,16 +42,18 @@ def test_generation(self):
self.assertTrue(self.__is_inv_proto(proto, proto_inv))
# Move with encoding to generator
- gframe = self.form.generator_tab_controller
+ gframe = self.form.generator_tab_controller # type: GeneratorTabController
gframe.ui.cbViewType.setCurrentIndex(0)
self.add_signal_to_generator(signal_index=0)
self.assertEqual(array.array("B", list(map(int, proto_inv))), gframe.table_model.display_data[0])
self.assertNotEqual(array.array("B", list(map(int, proto))), gframe.table_model.display_data[0])
+ gframe.table_model.protocol.messages[0].pause = 0
+
# Generate Datafile
modulator = gframe.modulators[0]
modulator.modulation_type = 0
- modulator.samples_per_bit = 295
+ modulator.samples_per_bit = 300
buffer = gframe.prepare_modulation_buffer(gframe.total_modulated_samples, show_error=False)
modulated_data = gframe.modulate_data(buffer)
filename = os.path.join(QDir.tempPath(), "test_generator.complex")
@@ -61,7 +64,7 @@ def test_generation(self):
self.assertEqual(len(self.form.signal_tab_controller.signal_frames), 2)
signal_frame = self.form.signal_tab_controller.signal_frames[1]
- self.assertEqual(signal_frame.signal.num_samples, 14377)
+ self.assertEqual(signal_frame.signal.num_samples, 300 * len(proto))
signal_frame.ui.cbProtoView.setCurrentIndex(0)
self.assertEqual(signal_frame.ui.lineEditSignalName.text(), "test_generator")
signal_frame.ui.cbModulationType.setCurrentIndex(0) # ASK
diff --git a/tests/test_protocol_sniffer.py b/tests/test_protocol_sniffer.py
index 98436d6ef1..18624c3150 100644
--- a/tests/test_protocol_sniffer.py
+++ b/tests/test_protocol_sniffer.py
@@ -16,7 +16,7 @@
class TestProtocolSniffer(QtTestCase):
def setUp(self):
super().setUp()
- SettingsProxy.OVERWRITE_RECEIVE_BUFFER_SIZE = 100 * 10 ** 6
+ SettingsProxy.OVERWRITE_RECEIVE_BUFFER_SIZE = 50000
def test_protocol_sniffer(self):
bit_len = 100
diff --git a/tests/test_send_recv_dialog_gui.py b/tests/test_send_recv_dialog_gui.py
index 414112b5a8..15ae26dbf0 100644
--- a/tests/test_send_recv_dialog_gui.py
+++ b/tests/test_send_recv_dialog_gui.py
@@ -8,6 +8,7 @@
from PyQt5.QtGui import QMouseEvent
from PyQt5.QtTest import QTest
from PyQt5.QtWidgets import QApplication
+from urh.signalprocessing.ProtocolSniffer import ProtocolSniffer
from tests.QtTestCase import QtTestCase
from tests.utils_testing import get_path_for_data_file
@@ -57,7 +58,7 @@ class TestSendRecvDialog(QtTestCase):
def setUp(self):
super().setUp()
- SettingsProxy.OVERWRITE_RECEIVE_BUFFER_SIZE = 10 ** 6
+ SettingsProxy.OVERWRITE_RECEIVE_BUFFER_SIZE = 600000
self.signal = Signal(get_path_for_data_file("esaver.coco"), "testsignal")
self.form.ui.tabWidget.setCurrentIndex(2)
@@ -176,7 +177,7 @@ def test_spectrum(self):
sock.close()
QApplication.instance().processEvents()
- QTest.qWait(self.SEND_RECV_TIMEOUT)
+ QTest.qWait(5*self.SEND_RECV_TIMEOUT)
self.assertGreater(len(spectrum_dialog.scene_manager.peak), 0)
self.assertIsNone(spectrum_dialog.ui.graphicsViewFFT.scene().frequency_marker)
@@ -188,7 +189,8 @@ def test_spectrum(self):
Qt.LeftButton, Qt.NoModifier)
QApplication.postEvent(w, event)
QApplication.instance().processEvents()
- QTest.qWait(50)
+ QTest.qWait(500)
+ QApplication.instance().processEvents()
self.assertIsNotNone(spectrum_dialog.ui.graphicsViewFFT.scene().frequency_marker)
diff --git a/tests/test_simulator.py b/tests/test_simulator.py
index 70b519140f..ed915ccce1 100644
--- a/tests/test_simulator.py
+++ b/tests/test_simulator.py
@@ -3,160 +3,31 @@
import sys
import tempfile
import time
-from multiprocessing import Process, Value
import numpy as np
# import yappi
from PyQt5.QtTest import QTest
+from urh.signalprocessing.ProtocolSniffer import ProtocolSniffer
from tests.QtTestCase import QtTestCase
from tests.utils_testing import get_path_for_data_file
-from urh.controller.MainController import MainController
-from urh.dev.BackendHandler import BackendHandler
-from urh.dev.EndlessSender import EndlessSender
from urh.plugins.NetworkSDRInterface.NetworkSDRInterfacePlugin import NetworkSDRInterfacePlugin
from urh.signalprocessing.ChecksumLabel import ChecksumLabel
-from urh.signalprocessing.MessageType import MessageType
from urh.signalprocessing.Modulator import Modulator
-from urh.signalprocessing.Participant import Participant
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
-from urh.signalprocessing.ProtocolSniffer import ProtocolSniffer
from urh.signalprocessing.Signal import Signal
from urh.simulator.ActionItem import TriggerCommandActionItem, SleepActionItem, CounterActionItem
-from urh.simulator.Simulator import Simulator
-from urh.simulator.SimulatorMessage import SimulatorMessage
from urh.simulator.SimulatorProtocolLabel import SimulatorProtocolLabel
-from urh.util.Logger import logger
from urh.util.SettingsProxy import SettingsProxy
-def receive(port, current_index, target_index, elapsed):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- s.bind(("", port))
- s.listen(1)
-
- conn, addr = s.accept()
- logger.debug('Receiver got connection from address:'.format(addr))
-
- start = False
- while True:
- data = conn.recv(65536 * 8)
-
- if not start:
- start = True
- t = time.time()
-
- if len(data) > 0:
- while len(data) % 8 != 0:
- data += conn.recv(len(data) % 8)
-
- arr = np.frombuffer(data, dtype=np.complex64)
- current_index.value += len(arr)
-
- if current_index.value == target_index:
- break
-
- conn.close()
- elapsed.value = 1000 * (time.time() - t)
- s.close()
-
-
class TestSimulator(QtTestCase):
def setUp(self):
super().setUp()
- SettingsProxy.OVERWRITE_RECEIVE_BUFFER_SIZE = 10 * 10 ** 6
+ SettingsProxy.OVERWRITE_RECEIVE_BUFFER_SIZE = 50000
self.num_zeros_for_pause = 1000
- def test_performance(self):
- self.form = MainController()
- self.cfc = self.form.compare_frame_controller
- self.stc = self.form.simulator_tab_controller
- self.gtc = self.form.generator_tab_controller
-
- self.form.add_signalfile(get_path_for_data_file("esaver.coco"))
- self.sframe = self.form.signal_tab_controller.signal_frames[0]
- self.sim_frame = self.form.simulator_tab_controller
- self.form.ui.tabWidget.setCurrentIndex(3)
- self.cfc.proto_analyzer.auto_assign_labels()
-
- self.network_sdr_plugin_sender = NetworkSDRInterfacePlugin(raw_mode=True)
-
- part_a = Participant("Device A", shortname="A", color_index=0)
- part_b = Participant("Device B", shortname="B", color_index=1)
- part_b.simulate = True
-
- self.form.project_manager.participants.append(part_a)
- self.form.project_manager.participants.append(part_b)
- self.form.project_manager.project_updated.emit()
-
- sniffer = ProtocolSniffer(100, 0.01, 0.1, 5, 1, NetworkSDRInterfacePlugin.NETWORK_SDR_NAME, BackendHandler(),
- network_raw_mode=True)
- sender = EndlessSender(BackendHandler(), NetworkSDRInterfacePlugin.NETWORK_SDR_NAME)
-
- simulator = Simulator(self.stc.simulator_config, self.gtc.modulators, self.stc.sim_expression_parser,
- self.form.project_manager, sniffer=sniffer, sender=sender)
-
- pause = 100
- msg_a = SimulatorMessage(part_b,
- [1, 0] * 16 + [1, 1, 0, 0] * 8 + [0, 0, 1, 1] * 8 + [1, 0, 1, 1, 1, 0, 0, 1, 1, 1] * 4,
- pause=pause, message_type=MessageType("empty_message_type"), source=part_a)
-
- msg_b = SimulatorMessage(part_a,
- [1, 0] * 16 + [1, 1, 0, 0] * 8 + [1, 1, 0, 0] * 8 + [1, 0, 1, 1, 1, 0, 0, 1, 1, 1] * 4,
- pause=pause, message_type=MessageType("empty_message_type"), source=part_b)
-
- self.stc.simulator_config.add_items([msg_a, msg_b], 0, None)
- self.stc.simulator_config.update_active_participants()
-
- port = self.get_free_port()
- sniffer = simulator.sniffer
- sniffer.rcv_device.set_server_port(port)
-
- self.network_sdr_plugin_sender.client_port = port
-
- sender = simulator.sender
- port = self.get_free_port()
- sender.device.set_client_port(port)
- sender.device._VirtualDevice__dev.name = "simulator_sender"
-
- current_index = Value("L")
- elapsed = Value("f")
- target_num_samples = 13600 + pause
- receive_process = Process(target=receive, args=(port, current_index, target_num_samples, elapsed))
- receive_process.daemon = True
- receive_process.start()
-
- # Ensure receiver is running
- time.sleep(2)
-
- # spy = QSignalSpy(self.network_sdr_plugin_receiver.rcv_index_changed)
- simulator.start()
-
- modulator = Modulator("test_modulator")
- modulator.samples_per_bit = 100
- modulator.carrier_freq_hz = 55e3
-
- # yappi.start()
-
- self.network_sdr_plugin_sender.send_raw_data(modulator.modulate(msg_a.encoded_bits), 1)
- time.sleep(0.5)
- # send some zeros to simulate the end of a message
- self.network_sdr_plugin_sender.send_raw_data(np.zeros(self.num_zeros_for_pause, dtype=np.complex64), 1)
- time.sleep(0.5)
- receive_process.join(15)
-
- logger.info("PROCESS TIME: {0:.2f}ms".format(elapsed.value))
-
- #self.assertEqual(current_index.value, target_num_samples)
- self.assertLess(elapsed.value, 200)
-
- # timeout = spy.wait(2000)
- # yappi.get_func_stats().print_all()
- # yappi.get_thread_stats().print_all()
-
def test_simulation_flow(self):
"""
test a simulation flow with an increasing sequence number
@@ -175,7 +46,6 @@ def test_simulation_flow(self):
name = NetworkSDRInterfacePlugin.NETWORK_SDR_NAME
dialog.device_settings_rx_widget.ui.cbDevice.setCurrentText(name)
dialog.device_settings_tx_widget.ui.cbDevice.setCurrentText(name)
- QTest.qWait(10)
simulator = dialog.simulator
simulator.sniffer.rcv_device.set_server_port(port)
@@ -185,11 +55,11 @@ def test_simulation_flow(self):
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.bind(("", port))
s.listen(1)
- QTest.qWait(10)
simulator.sender.device.set_client_port(port)
dialog.ui.btnStartStop.click()
- QTest.qWait(1500)
+
+ QTest.qWait(500)
conn, addr = s.accept()
@@ -211,7 +81,7 @@ def test_simulation_flow(self):
msg1 = preamble + sync + seq + data + checksum
self.alice.send_raw_data(modulator.modulate(msg1), 1)
- time.sleep(0.1)
+ time.sleep(0.5)
self.alice.send_raw_data(np.zeros(self.num_zeros_for_pause, dtype=np.complex64), 1)
bits = self.__demodulate(conn)
@@ -226,7 +96,7 @@ def test_simulation_flow(self):
msg2 = preamble + sync + seq + data + checksum
self.alice.send_raw_data(modulator.modulate(msg2), 1)
- time.sleep(0.1)
+ time.sleep(0.5)
self.alice.send_raw_data(np.zeros(self.num_zeros_for_pause, dtype=np.complex64), 1)
bits = self.__demodulate(conn)
@@ -241,7 +111,7 @@ def test_simulation_flow(self):
msg3 = preamble + sync + seq + data + checksum
self.alice.send_raw_data(modulator.modulate(msg3), 1)
- time.sleep(0.1)
+ time.sleep(0.5)
self.alice.send_raw_data(np.zeros(self.num_zeros_for_pause, dtype=np.complex64), 1)
bits = self.__demodulate(conn)
@@ -251,16 +121,11 @@ def test_simulation_flow(self):
bits = bits.replace(preamble_str + sync_str, "")
self.assertEqual(int(bits, 2), seq_num + 5)
- QTest.qWait(50)
- self.assertTrue(simulator.simulation_is_finished())
-
time.sleep(1)
conn.close()
s.close()
- QTest.qWait(100)
-
def test_external_program_simulator(self):
stc = self.form.simulator_tab_controller
stc.ui.btnAddParticipant.click()
@@ -294,7 +159,7 @@ def test_external_program_simulator(self):
ext_program = get_path_for_data_file("external_program_simulator.py") + " " + counter_item_str
if sys.platform == "win32":
- ext_program = "python " + ext_program
+ ext_program = sys.executable + " " + ext_program
lbl1.value_type_index = 3
lbl1.external_program = ext_program
@@ -321,7 +186,7 @@ def test_external_program_simulator(self):
name = NetworkSDRInterfacePlugin.NETWORK_SDR_NAME
dialog.device_settings_rx_widget.ui.cbDevice.setCurrentText(name)
dialog.device_settings_tx_widget.ui.cbDevice.setCurrentText(name)
- QTest.qWait(10)
+ QTest.qWait(100)
simulator = dialog.simulator
simulator.sniffer.rcv_device.set_server_port(port)
@@ -331,7 +196,7 @@ def test_external_program_simulator(self):
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.bind(("", port))
s.listen(1)
- QTest.qWait(10)
+ QTest.qWait(100)
simulator.sender.device.set_client_port(port)
dialog.ui.btnStartStop.click()
@@ -342,16 +207,17 @@ def test_external_program_simulator(self):
modulator = dialog.project_manager.modulators[0] # type: Modulator
self.alice.send_raw_data(modulator.modulate("100"+"10101010"*42), 1)
- time.sleep(0.1)
+ time.sleep(1)
+ QTest.qWait(100)
self.alice.send_raw_data(np.zeros(self.num_zeros_for_pause, dtype=np.complex64), 1)
+ time.sleep(1)
+ QTest.qWait(100)
bits = self.__demodulate(conn)
self.assertEqual(bits[0].rstrip("0"), "101010101")
- time.sleep(0.5)
+ time.sleep(10)
QTest.qWait(1000)
- self.assertTrue(simulator.simulation_is_finished())
-
conn.close()
s.close()
@@ -360,7 +226,6 @@ def test_external_program_simulator(self):
self.assertTrue(os.path.isfile(fname))
def __demodulate(self, connection):
- QTest.qWait(100)
data = connection.recv(65536)
while len(data) % 8 != 0:
data += connection.recv(65536)
diff --git a/tests/test_simulator_tab_gui.py b/tests/test_simulator_tab_gui.py
index 0f4c5280d6..25dc72e86c 100644
--- a/tests/test_simulator_tab_gui.py
+++ b/tests/test_simulator_tab_gui.py
@@ -9,6 +9,9 @@
from PyQt5.QtGui import QContextMenuEvent
from PyQt5.QtTest import QTest
from PyQt5.QtWidgets import QApplication, QMenu, QCompleter
+from urh.util.SettingsProxy import SettingsProxy
+
+from urh.signalprocessing.ProtocolSniffer import ProtocolSniffer
from tests.QtTestCase import QtTestCase
from urh import constants
@@ -28,6 +31,9 @@
class TestSimulatorTabGUI(QtTestCase):
def setUp(self):
super().setUp()
+
+ SettingsProxy.OVERWRITE_RECEIVE_BUFFER_SIZE = 50000
+
self.carl = Participant("Carl", "C")
self.dennis = Participant("Dennis", "D")
self.participants = [self.carl, self.dennis]
@@ -217,8 +223,8 @@ def test_simulator_message_table_context_menu(self):
stc.simulator_scene.get_all_message_items()[0].setSelected(True)
self.assertEqual(stc.simulator_message_field_model.rowCount(), 1)
- stc.ui.tblViewMessage.selectColumn(2)
- x, y = stc.ui.tblViewMessage.columnViewportPosition(2), stc.ui.tblViewMessage.rowViewportPosition(0)
+ stc.ui.tblViewMessage.selectColumn(4)
+ x, y = stc.ui.tblViewMessage.columnViewportPosition(4), stc.ui.tblViewMessage.rowViewportPosition(0)
pos = QPoint(x, y)
stc.ui.tblViewMessage.context_menu_pos = pos
menu = stc.ui.tblViewMessage.create_context_menu()
@@ -354,14 +360,14 @@ def test_open_simulator_dialog_and_send_mismatching_message(self):
sender = NetworkSDRInterfacePlugin(raw_mode=True, sending=True)
sender.client_port = rcv_port
sender.send_raw_data(modulator.modulate("1" * 352), 1)
- time.sleep(0.5)
+ time.sleep(1)
sender.send_raw_data(np.zeros(1000, dtype=np.complex64), 1)
- time.sleep(0.5)
+ time.sleep(1)
sender.send_raw_data(modulator.modulate("10" * 176), 1)
- time.sleep(0.5)
+ time.sleep(1)
sender.send_raw_data(np.zeros(1000, dtype=np.complex64), 1)
- time.sleep(0.5)
- QTest.qWait(500)
+ time.sleep(1)
+ QTest.qWait(1000)
simulator_log = dialog.ui.textEditSimulation.toPlainText()
self.assertIn("Received message 1", simulator_log)