From 226805d32532f4a46aaf74075d0028ad8c852146 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Pierret=20=28fepitre=29?= Date: Sat, 20 Jul 2024 15:49:07 +0200 Subject: [PATCH] tests/integ/audio.py: add test for audio-intput in QubesDB --- qubes/tests/integ/audio.py | 180 +++++++++++++++++++++++++++++-------- 1 file changed, 143 insertions(+), 37 deletions(-) diff --git a/qubes/tests/integ/audio.py b/qubes/tests/integ/audio.py index 42c0a1c65..63f03b1b7 100644 --- a/qubes/tests/integ/audio.py +++ b/qubes/tests/integ/audio.py @@ -30,6 +30,7 @@ import numpy as np import qubes.vm +import qubes.devices from qubes.tests.integ.vm_qrexec_gui import TC_00_AppVMMixin, in_qemu @@ -142,7 +143,7 @@ def assert_pacat_running(self, audiovm, testvm, expected=True): def check_audio_sample(self, sample, sfreq): rec = np.fromstring(sample, dtype=np.float32) # determine sample size using silence threshold - threshold = 10**-3 + threshold = 10 ** -3 rec_size = np.count_nonzero((rec > threshold) | (rec < -threshold)) if not rec_size: self.fail('only silence detected, no useful audio data') @@ -151,34 +152,34 @@ def check_audio_sample(self, sample, sfreq): # be less strict on HVM tests in nested virt, the test environment # has huge overhead already margin = 0.80 - if rec_size < margin*441000: + if rec_size < margin * 441000: fname = f"/tmp/audio-sample-{self.id()}.raw" with open(fname, "wb") as f: f.write(sample) - self.fail(f'too short audio, expected 10s, got {rec_size/44100}, saved to {fname}') + self.fail(f'too short audio, expected 10s, got {rec_size / 44100}, saved to {fname}') # find zero crossings crossings = np.nonzero((rec[1:] > threshold) & - (rec[:-1] < -threshold))[0] + (rec[:-1] < -threshold))[0] np.seterr('raise') # compare against sine wave frequency - rec_freq = 44100/np.mean(np.diff(crossings)) - if not sfreq*0.8 < rec_freq < sfreq*1.2: + rec_freq = 44100 / np.mean(np.diff(crossings)) + if not sfreq * 0.8 < rec_freq < sfreq * 1.2: fname = f"/tmp/audio-sample-{self.id()}.raw" with open(fname, "wb") as f: f.write(sample) self.fail('frequency {} not in specified range, saved to {}' - .format(rec_freq, fname)) + .format(rec_freq, fname)) def common_audio_playback(self): # sine frequency sfreq = 4400 # generate signal - audio_in = np.sin(2*np.pi*np.arange(441000)*sfreq/44100) + audio_in = np.sin(2 * np.pi * np.arange(441000) * sfreq / 44100) # Need to use .snd extension so that pw-play (really libsndfile) # recognizes the file as raw audio. self.loop.run_until_complete( self.testvm1.run_for_stdio('cat > audio_in.snd', - input=audio_in.astype(np.float32).tobytes())) + input=audio_in.astype(np.float32).tobytes())) local_user = grp.getgrnam('qubes').gr_mem[0] if self.testvm1.features['service.pipewire']: cmd = 'timeout 20s pw-play --format=f32 --rate=44100 --channels=1 - < audio_in.snd' @@ -188,9 +189,10 @@ def common_audio_playback(self): with tempfile.NamedTemporaryFile() as recorded_audio: os.chmod(recorded_audio.name, 0o666) p = subprocess.Popen(['sudo', '-E', '-u', local_user, - 'parecord', '-d', '@DEFAULT_MONITOR@', '--raw', - '--format=float32le', '--rate=44100', '--channels=1', - recorded_audio.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + 'parecord', '-d', '@DEFAULT_MONITOR@', '--raw', + '--format=float32le', '--rate=44100', '--channels=1', + recorded_audio.name], stdout=subprocess.PIPE, + stderr=subprocess.PIPE) try: self.loop.run_until_complete(self.testvm1.run_for_stdio(cmd)) except subprocess.CalledProcessError as err: @@ -199,17 +201,29 @@ def common_audio_playback(self): self.loop.run_until_complete(asyncio.sleep(2)) if p.returncode is not None: self.fail("Recording process ended prematurely: exit code {}, stderr: {}".format( - p.returncode, p.stderr.read())) + p.returncode, p.stderr.read())) p.send_signal(signal.SIGINT) p.wait() self.check_audio_sample(recorded_audio.file.read(), sfreq) def _configure_audio_recording(self, vm): - """Connect VM's output-source to sink monitor instead of mic""" + """Connect VM's source-output to sink monitor instead of mic""" local_user = grp.getgrnam("qubes").gr_mem[0] + audiovm = vm.audiovm + sudo = ["sudo", "-E", "-u", local_user] - source_outputs = json.loads(subprocess.check_output( - sudo + ["pactl", "-f", "json", "list", "source-outputs"])) + + source_outputs_cmd = ["pactl", "-f", "json", "list", "source-outputs"] + if audiovm.name != "dom0": + stdout, _ = self.loop.run_until_complete( + audiovm.run_for_stdio(" ".join(source_outputs_cmd))) + source_outputs = json.loads(stdout) + else: + source_outputs = json.loads(subprocess.check_output(sudo + source_outputs_cmd)) + + if not source_outputs: + self.fail("no source-output found in {}".format(audiovm.name)) + assert False try: output_index = [s["index"] for s in source_outputs @@ -220,8 +234,17 @@ def _configure_audio_recording(self, vm): # self.fail never returns assert False - sources = json.loads(subprocess.check_output( - sudo + ["pactl", "-f", "json", "list", "sources"])) + sources_cmd = ["pactl", "-f", "json", "list", "sources"] + if audiovm.name != "dom0": + res, _ = self.loop.run_until_complete(audiovm.run_for_stdio(" ".join(sources_cmd))) + sources = json.loads(res) + else: + sources = json.loads(subprocess.check_output(sudo + sources_cmd)) + + if not sources: + self.fail("no sources found in {}".format(audiovm.name)) + assert False + try: source_index = [s["index"] for s in sources if s["name"].endswith(".monitor")][0] @@ -230,8 +253,36 @@ def _configure_audio_recording(self, vm): # self.fail never returns assert False - subprocess.check_call(sudo + - ["pactl", "move-source-output", str(output_index), str(source_index)]) + cmd = ["pactl", "move-source-output", str(output_index), str(source_index)] + if audiovm.name != "dom0": + self.loop.run_until_complete(audiovm.run(" ".join(cmd))) + else: + subprocess.check_call(sudo + cmd) + + async def retrieve_audio_input(self, vm, status): + try: + await asyncio.wait_for(self._check_audio_input_status(vm, status), timeout=2) + except asyncio.TimeoutError: + self.fail("Failed to get mic attach/detach status!") + + @staticmethod + async def _check_audio_input_status(vm, status): + while vm.audiovm.untrusted_qdb.read("/audio-input/{}".format(vm.name)) != status: + await asyncio.sleep(0.5) + + def attach_mic(self): + deva = qubes.device_protocol.DeviceAssignment(self.app.domains[0], 'mic') + self.loop.run_until_complete( + self.testvm1.devices['mic'].attach(deva) + ) + self.loop.run_until_complete(self.retrieve_audio_input(self.testvm1, b"1")) + + def detach_mic(self): + deva = qubes.device_protocol.DeviceAssignment(self.app.domains[0], 'mic') + self.loop.run_until_complete( + self.testvm1.devices['mic'].detach(deva) + ) + self.loop.run_until_complete(self.retrieve_audio_input(self.testvm1, b"0")) def common_audio_record_muted(self): # connect VM's recording source output monitor (instead of mic) @@ -240,6 +291,7 @@ def common_audio_record_muted(self): # generate some "audio" data audio_in = b'\x20' * 4 * 44100 local_user = grp.getgrnam('qubes').gr_mem[0] + sudo = ["sudo", "-E", "-u", local_user] # Need to use .snd extension so that pw-play (really libsndfile) # recognizes the file as raw audio. if self.testvm1.features['service.pipewire']: @@ -249,18 +301,28 @@ def common_audio_record_muted(self): cmd = 'parecord --raw audio_rec.snd' kill_cmd = 'pkill --signal SIGINT parecord' record = self.loop.run_until_complete(self.testvm1.run(cmd, - stdout=subprocess.PIPE, stderr=subprocess.PIPE)) + stdout=subprocess.PIPE, + stderr=subprocess.PIPE)) # give it time to start recording self.loop.run_until_complete(asyncio.sleep(0.5)) - p = subprocess.Popen(['sudo', '-E', '-u', local_user, - 'paplay', '--raw'], - stdin=subprocess.PIPE) - p.communicate(audio_in) + + play_cmd = ['paplay', '--raw'] + if self.testvm1.audiovm.name != "dom0": + self.loop.run_until_complete( + self.testvm1.audiovm.run_for_stdio( + " ".join(play_cmd), + input=audio_in + ) + ) + else: + p = subprocess.Popen(sudo + play_cmd, stdin=subprocess.PIPE) + p.communicate(audio_in) + # wait for possible parecord buffering self.loop.run_until_complete(asyncio.sleep(2)) if record.returncode is not None: self.fail("Recording process ended prematurely: exit code {}, stderr: {}".format( - record.returncode, self.loop.run_until_complete(record.stderr.read()))) + record.returncode, self.loop.run_until_complete(record.stderr.read()))) try: self.loop.run_until_complete( self.testvm1.run_for_stdio(kill_cmd)) @@ -273,15 +335,20 @@ def common_audio_record_muted(self): if audio_in[:32] in recorded_audio: self.fail('VM recorded something, even though mic disabled') - def common_audio_record_unmuted(self): - deva = qubes.device_protocol.DeviceAssignment(self.app.domains[0], 'mic') - self.loop.run_until_complete( - self.testvm1.devices['mic'].attach(deva)) + def common_audio_record_unmuted(self, attach_mic=True, detach_mic=True): + if attach_mic: + try: + self.detach_mic() + except qubes.devices.DeviceNotAssigned: + pass + self.attach_mic() # connect VM's recording source output monitor (instead of mic) self._configure_audio_recording(self.testvm1) sfreq = 4400 - audio_in = np.sin(2*np.pi*np.arange(441000)*sfreq/44100) + audio_in = np.sin(2 * np.pi * np.arange(441000) * sfreq / 44100) local_user = grp.getgrnam('qubes').gr_mem[0] + sudo = ["sudo", "-E", "-u", local_user] + # Need to use .snd extension so that pw-play (really libsndfile) # recognizes the file as raw audio. if self.testvm1.features['service.pipewire']: @@ -295,16 +362,28 @@ def common_audio_record_unmuted(self): record = self.loop.run_until_complete(self.testvm1.run(record_cmd)) # give it time to start recording self.loop.run_until_complete(asyncio.sleep(0.5)) - p = subprocess.Popen(['sudo', '-E', '-u', local_user, - 'paplay', '--raw', '--format=float32le', - '--rate=44100', '--channels=1'], - stdin=subprocess.PIPE) - p.communicate(audio_in.astype(np.float32).tobytes()) + + # play sound that will be used as source-output + play_cmd = ['paplay', '--raw', '--format=float32le', '--rate=44100', '--channels=1'] + if self.testvm1.audiovm.name != "dom0": + self.loop.run_until_complete( + self.testvm1.audiovm.run_for_stdio( + " ".join(play_cmd), + input=audio_in.astype(np.float32).tobytes() + ) + ) + else: + p = subprocess.Popen( + sudo + play_cmd, + stdin=subprocess.PIPE + ) + p.communicate(audio_in.astype(np.float32).tobytes()) + # wait for possible parecord buffering self.loop.run_until_complete(asyncio.sleep(2)) if record.returncode is not None: self.fail("Recording process ended prematurely: exit code {}, stderr: {}".format( - record.returncode, self.loop.run_until_complete(record.stderr.read()))) + record.returncode, self.loop.run_until_complete(record.stderr.read()))) try: self.loop.run_until_complete(self.testvm1.run_for_stdio(kill_cmd)) except subprocess.CalledProcessError: @@ -317,6 +396,8 @@ def common_audio_record_unmuted(self): recorded_audio, _ = self.loop.run_until_complete( self.testvm1.run_for_stdio('cat audio_rec.snd')) self.check_audio_sample(recorded_audio, sfreq) + if detach_mic: + self.detach_mic() class TC_20_AudioVM_Pulse(TC_00_AudioMixin): @@ -459,6 +540,31 @@ def test_251_audio_playback_audiovm_pipewire_late_start(self): self.assert_pacat_running(self.app.domains[0], self.testvm1, False) self.common_audio_playback() + @unittest.skipUnless(spawn.find_executable('parecord'), + "pulseaudio-utils not installed in dom0") + def test_260_audio_mic_enabled_switch_audiovm(self): + self.create_audio_vm('pipewire', start=False) + self.testvm1.audiovm = self.audiovm + self.prepare_audio_test('pipewire') + self.loop.run_until_complete(self.audiovm.start()) + + # check mic is enabled in first audiovm + self.assert_pacat_running(self.audiovm, self.testvm1, True) + self.common_audio_record_unmuted(detach_mic=False) + + # check mic is enabled in second audiovm, admin ext will + # allow mic during switch as it was previously enabled + self.testvm1.audiovm = self.app.domains[0] + self.assert_pacat_running(self.testvm1.audiovm, self.testvm1, True) + self.common_audio_record_unmuted(attach_mic=False, detach_mic=False) + + # detach mic, switch to original audiovm and check there + # is no sound as we disabled mic + self.detach_mic() + self.testvm1.audiovm = self.audiovm + self.assert_pacat_running(self.audiovm, self.testvm1, True) + self.common_audio_record_muted() + def create_testcases_for_templates(): yield from qubes.tests.create_testcases_for_templates(