Skip to content

Commit

Permalink
use conftest.py in tests, eliminate star imports
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Wiggins authored and Roy Wiggins committed Dec 12, 2024
1 parent 6b2818c commit 08a1ccb
Show file tree
Hide file tree
Showing 19 changed files with 236 additions and 252 deletions.
6 changes: 2 additions & 4 deletions app/routing/route_studies.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
from pathlib import Path
from typing import Dict, Union

import pyfakefs

# App-specific includes
import common.config as config
import common.helper as helper
Expand All @@ -36,7 +34,7 @@ def route_studies(pending_series: Dict[str, float]) -> None:
# TODO: Handle studies that exceed the "force completion" timeout in the "CONDITION_RECEIVED_SERIES" mode
studies_ready = {}
with os.scandir(config.mercure.studies_folder) as it:
it = list(it) # type: ignore
it = list(it) # type: ignore
for entry in it:
if entry.is_dir() and not is_study_locked(entry.path):
if is_study_complete(entry.path, pending_series):
Expand Down Expand Up @@ -198,7 +196,7 @@ def check_force_study_timeout(folder: Path) -> bool:
lock_file = Path(folder / mercure_names.LOCK)
try:
lock = helper.FileLock(lock_file)
except:
except Exception:
logger.error(f"Unable to lock study for removal {lock_file}") # handle_error
return False
if not move_study_folder(task.id, folder.name, "DISCARD"):
Expand Down
Empty file added app/tests/__init__.py
Empty file.
112 changes: 112 additions & 0 deletions app/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@

import os
import socket
import uuid
from typing import Any, Callable, Dict

import common # noqa: F401
import common.config as config
import process # noqa: F401
import pytest
import routing # noqa: F401
from bookkeeping import bookkeeper
from common.types import Config


def spy_on(mocker, obj) -> None:
pieces = obj.split(".")
module = ".".join(pieces[0:-1])
mocker.patch(obj, new=mocker.spy(eval(module), pieces[-1]))


def spies(mocker, list_of_spies) -> None:
for spy in list_of_spies:
spy_on(mocker, spy)


def attach_spies(mocker) -> None:
spies(
mocker,
[
"routing.route_series.push_series_serieslevel",
"routing.route_series.push_serieslevel_outgoing",
"routing.route_studies.route_study",
"routing.generate_taskfile.create_series_task",
"routing.route_studies.move_study_folder",
"routing.route_studies.push_studylevel_error",
"routing.generate_taskfile.create_study_task",
"routing.router.route_series",
"routing.router.route_studies",
"process.processor.process_series",
# "process.process_series",
"common.monitor.post",
"common.monitor.send_event",
"common.monitor.send_register_series",
"common.monitor.send_register_task",
"common.monitor.send_task_event",
"common.monitor.async_send_task_event",
"common.monitor.send_processor_output",
"common.monitor.send_update_task",
"common.notification.trigger_notification_for_rule",
"common.notification.send_email",
"uuid.uuid1"
],
)


@pytest.fixture(scope="function")
def mocked(mocker):
mocker.resetall()
attach_spies(mocker)
return mocker


@pytest.fixture(scope="module")
def bookkeeper_port():
return random_port()


@pytest.fixture(scope="module")
def receiver_port():
return random_port()


@pytest.fixture(scope="function", autouse=True)
def mercure_config(fs, bookkeeper_port) -> Callable[[Dict], Config]:
# TODO: config from previous calls seems to leak in here
config_path = os.path.realpath(os.path.dirname(os.path.realpath(__file__)) + "/data/test_config.json")

fs.add_real_file(config_path, target_path=config.configuration_filename, read_only=False)
for k in ["incoming", "studies", "outgoing", "success", "error", "discard", "processing", "jobs"]:
fs.create_dir(f"/var/{k}")

def set_config(extra: Dict[Any, Any] = {}) -> Config:
config.read_config()
config.mercure = Config(**{**config.mercure.dict(), **extra}) # type: ignore
print(config.mercure.targets)
config.save_config()
return config.mercure

# set_config()
# sqlite3 is not inside the fakefs so this is going to be a real file
set_config({"bookkeeper": "sqlite:///tmp/mercure_bookkeeper_" + str(uuid.uuid4()) + ".db"})

bookkeeper_env = f"""PORT={bookkeeper_port}
HOST=0.0.0.0
DATABASE_URL={config.mercure.bookkeeper}"""
fs.create_file(bookkeeper.bk_config.config_filename, contents=bookkeeper_env)

fs.add_real_directory(os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/../alembic'))
fs.add_real_file(os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/../alembic.ini'), read_only=True)
return set_config


def random_port() -> int:
"""
Generate a free port number to use as an ephemeral endpoint.
"""
s = socket.socket()
s.bind(('', 0)) # bind to any available port
port = s.getsockname()[1] # get the port number
s.close()
return int(port)
3 changes: 1 addition & 2 deletions app/tests/dispatch/test_restart_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
from process import processor
from pytest_mock import MockerFixture
from routing import router
from tests.testing_common import (FakeDockerContainer, bookkeeper_port, make_fake_processor, mercure_config, # noqa: F401
mock_incoming_uid, mock_task_ids, mocked)
from tests.testing_common import FakeDockerContainer, make_fake_processor, mock_incoming_uid, mock_task_ids
from webinterface.queue import RestartTaskErrors, restart_dispatch

logger = config.get_logger()
Expand Down
1 change: 0 additions & 1 deletion app/tests/dispatch/test_send.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from common.constants import mercure_names
from common.monitor import m_events, severity, task_event
from dispatch.send import execute, is_ready_for_sending
from tests.testing_common import *

dummy_info = {
"action": "route",
Expand Down
Empty file.
71 changes: 71 additions & 0 deletions app/tests/integration/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import subprocess
import tempfile
from dataclasses import dataclass

import pydicom
import requests


def send_dicom(ds, dest_host, dest_port) -> None:
with tempfile.NamedTemporaryFile('w') as ds_temp:
ds.save_as(ds_temp.name)
subprocess.run(["dcmsend", dest_host, str(dest_port), ds_temp.name], check=True)


@dataclass
class MercureService:
name: str
command: str
numprocs: int = 1
stopasgroup: bool = False
startsecs: int = 0


def is_dicoms_received(mercure_base, dicoms) -> None:
dicoms_recieved = set()
for series_folder in (mercure_base / 'data' / 'incoming').glob('*/'):
for dicom in series_folder.glob('*.dcm'):
ds_ = pydicom.dcmread(dicom)
assert ds_.SeriesInstanceUID == series_folder.name
assert ds_.SOPInstanceUID not in dicoms_recieved
dicoms_recieved.add(ds_.SOPInstanceUID)

assert dicoms_recieved == set(ds.SOPInstanceUID for ds in dicoms)
print(f"Received {len(dicoms)} dicoms as expected")


def is_dicoms_in_folder(folder, dicoms) -> None:
uids_found = set()
print(f"Looking for dicoms in {folder}")
dicoms_found = []
for f in folder.rglob('*'):
if not f.is_file():
continue
if f.suffix == '.dcm':
dicoms_found.append(f)
if f.suffix not in ('.error', '.tags'):
dicoms_found.append(f)
print("Dicoms", dicoms_found)
for dicom in dicoms_found:

try:
uid = pydicom.dcmread(dicom).SOPInstanceUID
uids_found.add(uid)
except Exception:
pass
try:
assert uids_found == set(ds.SOPInstanceUID for ds in dicoms), f"Dicoms missing from {folder}"
except Exception:
print("Expected dicoms not found")
for dicom in folder.glob('**/*.dcm'):
print(dicom)
raise
print(f"Found {len(dicoms)} dicoms in {folder.name} as expected")


def is_series_registered(bookkeeper_port, dicoms) -> None:
result = requests.get(f"http://localhost:{bookkeeper_port}/query/series",
headers={"Authorization": "Token test"})
assert result.status_code == 200
result_json = result.json()
assert set([r['series_uid'] for r in result_json]) == set([d.SeriesInstanceUID for d in dicoms])
Original file line number Diff line number Diff line change
@@ -1,34 +1,28 @@
import json
import multiprocessing
import os
import socket
import subprocess
import sys
import tempfile
import threading
import time
import xmlrpc.client
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Generator, Optional

import pydicom
import pytest
import requests
from common.config import mercure_defaults
from supervisor.options import ServerOptions
from supervisor.states import RUNNING_STATES
from supervisor.supervisord import Supervisor
from supervisor.xmlrpc import SupervisorTransport

# current workding directory
here = os.path.abspath(os.getcwd())
from app.tests.integration.common import MercureService


def send_dicom(ds, dest_host, dest_port) -> None:
with tempfile.NamedTemporaryFile('w') as ds_temp:
ds.save_as(ds_temp.name)
subprocess.run(["dcmsend", dest_host, str(dest_port), ds_temp.name], check=True)
# current workding directory
def here() -> str:
return os.path.abspath(os.getcwd())


class SupervisorManager:
Expand Down Expand Up @@ -151,65 +145,6 @@ def stop(self) -> None:
pass


@dataclass
class MercureService:
name: str
command: str
numprocs: int = 1
stopasgroup: bool = False
startsecs: int = 0


def is_dicoms_received(mercure_base, dicoms) -> None:
dicoms_recieved = set()
for series_folder in (mercure_base / 'data' / 'incoming').glob('*/'):
for dicom in series_folder.glob('*.dcm'):
ds_ = pydicom.dcmread(dicom)
assert ds_.SeriesInstanceUID == series_folder.name
assert ds_.SOPInstanceUID not in dicoms_recieved
dicoms_recieved.add(ds_.SOPInstanceUID)

assert dicoms_recieved == set(ds.SOPInstanceUID for ds in dicoms)
print(f"Received {len(dicoms)} dicoms as expected")


def is_dicoms_in_folder(folder, dicoms) -> None:
uids_found = set()
print(f"Looking for dicoms in {folder}")
dicoms_found = []
for f in folder.rglob('*'):
if not f.is_file():
continue
if f.suffix == '.dcm':
dicoms_found.append(f)
if f.suffix not in ('.error', '.tags'):
dicoms_found.append(f)
print("Dicoms", dicoms_found)
for dicom in dicoms_found:

try:
uid = pydicom.dcmread(dicom).SOPInstanceUID
uids_found.add(uid)
except Exception:
pass
try:
assert uids_found == set(ds.SOPInstanceUID for ds in dicoms), f"Dicoms missing from {folder}"
except Exception:
print("Expected dicoms not found")
for dicom in folder.glob('**/*.dcm'):
print(dicom)
raise
print(f"Found {len(dicoms)} dicoms in {folder.name} as expected")


def is_series_registered(bookkeeper_port, dicoms) -> None:
result = requests.get(f"http://localhost:{bookkeeper_port}/query/series",
headers={"Authorization": "Token test"})
assert result.status_code == 200
result_json = result.json()
assert set([r['series_uid'] for r in result_json]) == set([d.SeriesInstanceUID for d in dicoms])


@pytest.fixture(scope="function")
def supervisord(mercure_base):
supervisor: Optional[SupervisorManager] = None
Expand Down Expand Up @@ -248,7 +183,7 @@ def python_bin():
if os.environ.get("CLEAN_VENV"):
with tempfile.TemporaryDirectory(prefix="mercure_venv") as venvdir:
subprocess.run([sys.executable, "-m", "venv", venvdir], check=True)
subprocess.run([os.path.join(venvdir, "bin", "pip"), "install", "-r", f"{here}/requirements.txt"], check=True)
subprocess.run([os.path.join(venvdir, "bin", "pip"), "install", "-r", f"{here()}/requirements.txt"], check=True)
yield (venvdir + "/bin/python")
else:
yield sys.executable
Expand All @@ -270,7 +205,7 @@ def mercure(supervisord: Callable[[Any], SupervisorManager], python_bin
) -> Generator[Callable[[Any], SupervisorManager], None, None]:
def py_service(service, **kwargs) -> MercureService:
if 'command' not in kwargs:
kwargs['command'] = f"{python_bin} {here}/app/{service}.py"
kwargs['command'] = f"{python_bin} {here()}/app/{service}.py"
return MercureService(service, **kwargs)
services = [
py_service("bookkeeper", startsecs=6),
Expand All @@ -280,7 +215,7 @@ def py_service(service, **kwargs) -> MercureService:
py_service("worker_fast", command=f"{python_bin} -m rq.cli worker mercure_fast"),
py_service("worker_slow", command=f"{python_bin} -m rq.cli worker mercure_slow")
]
services += [MercureService("receiver", f"{here}/app/receiver.sh --inject-errors", stopasgroup=True)]
services += [MercureService("receiver", f"{here()}/app/receiver.sh --inject-errors", stopasgroup=True)]
supervisor = supervisord(services)

def do_start(services_to_start=["bookkeeper", "reciever", "router", "processor", "dispatcher"]) -> SupervisorManager:
Expand All @@ -295,27 +230,6 @@ def do_start(services_to_start=["bookkeeper", "reciever", "router", "processor",
print("=============")


def random_port() -> int:
"""
Generate a free port number to use as an ephemeral endpoint.
"""
s = socket.socket()
s.bind(('', 0)) # bind to any available port
port = s.getsockname()[1] # get the port number
s.close()
return int(port)


@pytest.fixture(scope="module")
def receiver_port():
return random_port()


@pytest.fixture(scope="module")
def bookkeeper_port():
return random_port()


@pytest.fixture(scope="function")
def mercure_config(mercure_base, receiver_port, bookkeeper_port):
mercure_config = {k: v for k, v in mercure_defaults.items()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

import pytest
from common.types import FolderTarget, Module, Rule
from testing_integration_common import *
from testing_integration_common import is_dicoms_in_folder, is_dicoms_received, is_series_registered, send_dicom
from app.tests.integration.common import is_dicoms_in_folder, is_dicoms_received, is_series_registered, send_dicom
from tests.testing_common import create_minimal_dicom


Expand Down
Loading

0 comments on commit 08a1ccb

Please sign in to comment.