-
Notifications
You must be signed in to change notification settings - Fork 182
/
Copy pathtest_util.py
119 lines (91 loc) · 4.08 KB
/
test_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import platform
import selectors
import subprocess
import threading
import time
from pathlib import Path
import pytest
from pytest_mock import MockerFixture
from dangerzone import util
from . import sanitized_text, uncommon_text
VERSION_FILE_NAME = "version.txt"
def test_get_resource_path() -> None:
share_dir = Path("share").resolve()
resource_path = Path(util.get_resource_path(VERSION_FILE_NAME)).parent
assert share_dir.samefile(
resource_path
), f"{share_dir} is not the same file as {resource_path}"
@pytest.mark.skipif(platform.system() != "Windows", reason="Windows-specific")
def test_get_subprocess_startupinfo() -> None:
startupinfo = util.get_subprocess_startupinfo()
assert isinstance(startupinfo, subprocess.STARTUPINFO) # type: ignore[attr-defined]
def test_replace_control_chars(uncommon_text: str, sanitized_text: str) -> None:
"""Test that the replace_control_chars() function works properly."""
assert util.replace_control_chars(uncommon_text) == sanitized_text
assert util.replace_control_chars("normal text") == "normal text"
assert util.replace_control_chars("") == ""
@pytest.mark.skipif(
platform.system() == "Windows", reason="Cannot test non-blocking read on Windows"
)
def test_nonblocking_read(mocker: MockerFixture) -> None:
"""Test that the nonblocking_read() function works properly."""
size = 9
timeout = 1
r, w = os.pipe()
# Test 1 - Check that invalid arguments (blocking fd, negative size/timeout ) raise
# an exception.
with pytest.raises(ValueError, match="Expected a non-blocking file descriptor"):
util.nonblocking_read(r, size, timeout)
os.set_blocking(r, False)
with pytest.raises(ValueError, match="Expected a positive size value"):
util.nonblocking_read(r, 0, timeout)
with pytest.raises(ValueError, match="Expected a positive timeout value"):
util.nonblocking_read(r, size, 0)
# Test 2 - Check that partial reads are retried, for the timeout's duration,
# and we never read more than we want.
select_spy = mocker.spy(selectors.DefaultSelector, "select")
read_spy = mocker.spy(os, "read")
# Write "1234567890", with a delay of 0.3 seconds.
os.write(w, b"12345")
def write_rest() -> None:
time.sleep(0.3)
os.write(w, b"67890")
threading.Thread(target=write_rest).start()
# Ensure that we receive all the characters, except for the last one ("0"), since it
# exceeds the requested size.
assert util.nonblocking_read(r, size, timeout) == b"123456789"
# Ensure that the read/select calls were retried.
# FIXME: The following asserts are racy, and assume that a 0.3 second delay will
# trigger a re-read. If our tests fail due to it, we should find a smarter way to
# test it.
assert read_spy.call_count == 2
assert read_spy.call_args_list[0].args[1] == 9
assert read_spy.call_args_list[1].args[1] == 4
assert read_spy.spy_return == b"6789"
assert select_spy.call_count == 2
timeout1 = select_spy.call_args_list[0].args[1]
timeout2 = select_spy.call_args_list[1].args[1]
assert 1 > timeout1 > timeout2
# Test 3 - Check that timeouts work, even when we partially read something.
select_spy.reset_mock()
read_spy.reset_mock()
# Ensure that the function raises a timeout error.
with pytest.raises(TimeoutError):
util.nonblocking_read(r, size, 0.1)
# Ensure that the function has read a single character from the previous write
# operation.
assert read_spy.call_count == 1
assert read_spy.spy_return == b"0"
# Ensure that the select() method has been called twice, and that the second time it
# returned an empty list (meaning that timeout expired).
assert select_spy.call_count == 2
assert select_spy.spy_return == []
timeout1 = select_spy.call_args_list[0].args[1]
timeout2 = select_spy.call_args_list[1].args[1]
assert 0.1 > timeout1 > timeout2
# Test 4 - Check that EOF is detected.
buf = b"Bye!"
os.write(w, buf)
os.close(w)
assert util.nonblocking_read(r, size, timeout) == buf