Skip to content

Commit

Permalink
Resolved client-server import chain and added installation key for in…
Browse files Browse the repository at this point in the history
…strument server dependencies (#448)

* Moves the `_midpoint()` function to `murfey.util.tomo` to keep client and server modules detached
* Replaces remaining instances of `procrunner.run()` with `subprocess.run()`
* Creates a new key for instrument server package dependencies
  • Loading branch information
tieneupin authored Jan 15, 2025
1 parent 19f2f31 commit 3d915f3
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 84 deletions.
11 changes: 7 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,16 @@ classifiers = [
dependencies = [
"backports.entry_points_selectable",
"defusedxml", # For safely parsing XML files
"pydantic<2", # Locked to <2 by zocalo
"pydantic<2", # Locked to <2 by cygwin terminal
"requests",
"rich",
"werkzeug",
]
[project.optional-dependencies]
cicd = [
"pytest-cov", # Used by Azure Pipelines for PyTest coverage reports
"pytest-cov", # Used for generating PyTest coverage reports
]
client = [
"procrunner",
"textual==0.42.0",
"websocket-client",
"xmltodict",
Expand All @@ -53,8 +52,12 @@ developer = [
"pre-commit", # Formatting, linting, type checking, etc.
"pytest", # Test code functionality
]
instrument-server = [
"fastapi[standard]",
"python-jose[cryptography]",
"uvicorn[standard]",
]
server = [
# "matplotlib", # For visual statistical analysis of images
"aiohttp",
"cryptography",
"fastapi[standard]",
Expand Down
4 changes: 2 additions & 2 deletions src/murfey/cli/transfer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import argparse
import subprocess
from pathlib import Path
from urllib.parse import urlparse

import procrunner
import requests
from rich.console import Console
from rich.prompt import Confirm
Expand Down Expand Up @@ -76,6 +76,6 @@ def run():
cmd.extend(list(Path(args.source or ".").glob("*")))
cmd.append(f"{murfey_url.hostname}::{args.destination}")

result = procrunner.run(cmd)
result = subprocess.run(cmd)
if result.returncode:
console.print(f"[red]rsync failed returning code {result.returncode}")
17 changes: 2 additions & 15 deletions src/murfey/client/contexts/tomo.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from murfey.util import authorised_requests, capture_post, get_machine_config_client
from murfey.util.mdoc import get_block, get_global_data, get_num_blocks
from murfey.util.tomo import midpoint

logger = logging.getLogger("murfey.client.contexts.tomo")

Expand Down Expand Up @@ -64,20 +65,6 @@ def _construct_tilt_series_name(file_path: Path) -> str:
return "_".join(split_name[:-5])


def _midpoint(angles: List[float]) -> int:
if not angles:
return 0
if len(angles) <= 2:
return round(angles[0])
sorted_angles = sorted(angles)
return round(
sorted_angles[len(sorted_angles) // 2]
if sorted_angles[len(sorted_angles) // 2]
and sorted_angles[len(sorted_angles) // 2 + 1]
else 0
)


class ProcessFileIncomplete(BaseModel):
dest: Path
source: Path
Expand Down Expand Up @@ -738,7 +725,7 @@ def gather_metadata(
if environment
else None
)
mdoc_metadata["manual_tilt_offset"] = -_midpoint(
mdoc_metadata["manual_tilt_offset"] = -midpoint(
[float(b["TiltAngle"]) for b in blocks]
)
mdoc_metadata["source"] = str(self._basepath)
Expand Down
19 changes: 8 additions & 11 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import secrets
import subprocess
import time
from datetime import datetime
from functools import partial
Expand All @@ -9,7 +10,6 @@
from typing import Annotated, Dict, List, Optional, Union
from urllib.parse import urlparse

import procrunner
import requests
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
Expand All @@ -21,7 +21,7 @@
from murfey.client.multigrid_control import MultigridController
from murfey.client.rsync import RSyncer
from murfey.client.watchdir_multigrid import MultigridDirWatcher
from murfey.util import sanitise_nonpath, secure_path
from murfey.util import sanitise, sanitise_nonpath, secure_path
from murfey.util.instrument_models import MultigridWatcherSpec
from murfey.util.models import File, Token

Expand Down Expand Up @@ -278,19 +278,16 @@ class GainReference(BaseModel):

@router.post("/sessions/{session_id}/upload_gain_reference")
def upload_gain_reference(session_id: MurfeySessionID, gain_reference: GainReference):
safe_gain_path = sanitise(str(gain_reference.gain_path))
safe_visit_path = sanitise(gain_reference.visit_path)
safe_destination_dir = sanitise(gain_reference.gain_destination_dir)
cmd = [
"rsync",
str(gain_reference.gain_path),
f"{urlparse(_get_murfey_url(), allow_fragments=False).hostname}::{gain_reference.visit_path}/{gain_reference.gain_destination_dir}/{secure_filename(gain_reference.gain_path.name)}",
safe_gain_path,
f"{urlparse(_get_murfey_url(), allow_fragments=False).hostname}::{safe_visit_path}/{safe_destination_dir}/{secure_filename(gain_reference.gain_path.name)}",
]
gain_rsync = procrunner.run(cmd)
gain_rsync = subprocess.run(cmd)
if gain_rsync.returncode:
safe_gain_path = (
str(gain_reference.gain_path).replace("\r\n", "").replace("\n", "")
)
safe_visit_path = gain_reference.visit_path.replace("\r\n", "").replace(
"\n", ""
)
logger.warning(
f"Gain reference file {safe_gain_path} was not successfully transferred to {safe_visit_path}/processing"
)
Expand Down
4 changes: 2 additions & 2 deletions src/murfey/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import murfey.server.prometheus as prom
import murfey.server.websocket
import murfey.util.db as db
from murfey.client.contexts.tomo import _midpoint
from murfey.server.murfey_db import url # murfey_db
from murfey.util import LogFilter
from murfey.util.config import (
Expand All @@ -60,6 +59,7 @@
)
from murfey.util.processing_params import default_spa_parameters
from murfey.util.state import global_state
from murfey.util.tomo import midpoint

try:
from murfey.server.ispyb import TransportManager # Session
Expand Down Expand Up @@ -2576,7 +2576,7 @@ def feedback_callback(header: dict, message: dict) -> None:
)
if not stack_file.parent.exists():
stack_file.parent.mkdir(parents=True)
tilt_offset = _midpoint([float(get_angle(t)) for t in tilts])
tilt_offset = midpoint([float(get_angle(t)) for t in tilts])
zocalo_message = {
"recipes": ["em-tomo-align"],
"parameters": {
Expand Down
4 changes: 2 additions & 2 deletions src/murfey/server/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import murfey.server.websocket as ws
import murfey.util.eer
from murfey.server import (
_midpoint,
_murfey_id,
_transport_object,
check_tilt_series_mc,
Expand Down Expand Up @@ -108,6 +107,7 @@
)
from murfey.util.processing_params import default_spa_parameters
from murfey.util.state import global_state
from murfey.util.tomo import midpoint

log = logging.getLogger("murfey.server.api")

Expand Down Expand Up @@ -840,7 +840,7 @@ def register_completed_tilt_series(
)
if not stack_file.parent.exists():
stack_file.parent.mkdir(parents=True)
tilt_offset = _midpoint([float(get_angle(t)) for t in tilts])
tilt_offset = midpoint([float(get_angle(t)) for t in tilts])
zocalo_message = {
"recipes": ["em-tomo-align"],
"parameters": {
Expand Down
90 changes: 42 additions & 48 deletions src/murfey/util/rsync.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from __future__ import annotations

import logging
import subprocess
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple, Union

import procrunner

from murfey.util import Processor
from murfey.util.file_monitor import Monitor

Expand All @@ -32,7 +31,7 @@ def __init__(
self.received_bytes = 0
self.byte_rate: float = 0
self.total_size = 0
self.runner_return: List[procrunner.ReturnObject] = []
self.runner_return: List[subprocess.CompletedProcess] = []
self._root = root
self._sub_structure: Optional[Path] = None
self._notify = notify or (lambda f: None)
Expand All @@ -53,7 +52,7 @@ def _run_rsync(
retry: bool = True,
):
"""
Run rsync -v on a list of files using procrunner.
Run rsync -v on a list of files using subprocess.
:param root: root path of files for transferring; structure below the root is preserved
:type root: pathlib.Path object
Expand Down Expand Up @@ -109,69 +108,64 @@ def _single_rsync(
else:
cmd.append(str(self._finaldir / sub_struct) + "/")
self._transferring = True
runner = procrunner.run(
runner = subprocess.run(
cmd,
callback_stdout=self._parse_rsync_stdout,
callback_stderr=self._parse_rsync_stderr,
capture_output=True,
)
for line in runner.stdout.decode("utf-8", "replace").split("\n"):
self._parse_rsync_stdout(line)
for line in runner.stderr.decode("utf-8", "replace").split("\n"):
self._parse_rsync_stderr(line)
self.runner_return.append(runner)
self.failed.extend(root / sub_struct / f for f in self._failed_tmp)
if retry:
self._in.put(root / sub_struct / f for f in self._failed_tmp)

def _parse_rsync_stdout(self, stdout: bytes):
def _parse_rsync_stdout(self, line: str):
"""
Parse rsync stdout to collect information such as the paths of transferred
files and the amount of data transferred.
:param stdout: stdout of rsync process
:type stdout: bytes
"""
stringy_stdout = str(stdout)
if stringy_stdout:
if self._transferring:
if stringy_stdout.startswith("sent"):
self._transferring = False
byte_info = stringy_stdout.split()
self.sent_bytes = int(
byte_info[byte_info.index("sent") + 1].replace(",", "")
)
self.received_bytes = int(
byte_info[byte_info.index("received") + 1].replace(",", "")
)
self.byte_rate = float(
byte_info[byte_info.index("bytes/sec") - 1].replace(",", "")
)
elif len(stringy_stdout.split()) == 1:
if self._root and self._sub_structure:
self._notify(
self._finaldir / self._sub_structure / stringy_stdout
)
self._out.put(self._root / self._sub_structure / stringy_stdout)
else:
logger.warning(
f"root or substructure not set for transfer of {stringy_stdout}"
)
else:
if "total size" in stringy_stdout:
self.total_size = int(
stringy_stdout.replace("total size", "").split()[1]
if self._transferring:
if line.startswith("sent"):
self._transferring = False
byte_info = line.split()
self.sent_bytes = int(
byte_info[byte_info.index("sent") + 1].replace(",", "")
)
self.received_bytes = int(
byte_info[byte_info.index("received") + 1].replace(",", "")
)
self.byte_rate = float(
byte_info[byte_info.index("bytes/sec") - 1].replace(",", "")
)
elif len(line.split()) == 1:
if self._root and self._sub_structure:
self._notify(self._finaldir / self._sub_structure / line)
self._out.put(self._root / self._sub_structure / line)
else:
logger.warning(
f"root or substructure not set for transfer of {line}"
)
else:
if "total size" in line:
self.total_size = int(line.replace("total size", "").split()[1])

def _parse_rsync_stderr(self, stderr: bytes):
def _parse_rsync_stderr(self, line: str):
"""
Parse rsync stderr to collect information on any files that failed to transfer.
:param stderr: stderr of rsync process
:type stderr: bytes
"""
stringy_stderr = str(stderr)
if stringy_stderr:
if (
stringy_stderr.startswith("rsync: link_stat")
or stringy_stderr.startswith("rsync: [sender] link_stat")
) and "failed" in stringy_stderr:
failed_msg = stringy_stderr.split()
self._failed_tmp.append(
failed_msg[failed_msg.index("failed:") - 1].replace('"', "")
)
if (
line.startswith("rsync: link_stat")
or line.startswith("rsync: [sender] link_stat")
) and "failed" in line:
failed_msg = line.split()
self._failed_tmp.append(
failed_msg[failed_msg.index("failed:") - 1].replace('"', "")
)
16 changes: 16 additions & 0 deletions src/murfey/util/tomo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def midpoint(angles: list[float]) -> int:
"""
Utility function to calculate the midpoint of the angles used in a tilt series.
Used primarily in the tomography workflow.
"""
if not angles:
return 0
if len(angles) <= 2:
return round(angles[0])
sorted_angles = sorted(angles)
return round(
sorted_angles[len(sorted_angles) // 2]
if sorted_angles[len(sorted_angles) // 2]
and sorted_angles[len(sorted_angles) // 2 + 1]
else 0
)

0 comments on commit 3d915f3

Please sign in to comment.