diff --git a/pyproject.toml b/pyproject.toml index 2c6da691..a12f74ee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,6 +107,7 @@ murfey = "murfey.client:run" "clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result" "clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result" "clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result" +"spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess" [tool.setuptools] package-dir = {"" = "src"} diff --git a/src/murfey/client/contexts/spa.py b/src/murfey/client/contexts/spa.py index 65cf34a3..0bec9e98 100644 --- a/src/murfey/client/contexts/spa.py +++ b/src/murfey/client/contexts/spa.py @@ -4,7 +4,7 @@ from datetime import datetime from itertools import count from pathlib import Path -from typing import Any, Dict, List, NamedTuple, Optional, OrderedDict, Tuple +from typing import Any, Dict, List, Optional, OrderedDict, Tuple import requests import xmltodict @@ -21,113 +21,19 @@ capture_post, get_machine_config_client, ) +from murfey.util.spa_metadata import ( + foil_hole_data, + foil_hole_from_file, + get_grid_square_atlas_positions, + grid_square_data, + grid_square_from_file, +) logger = logging.getLogger("murfey.client.contexts.spa") requests.get, requests.post, requests.put, requests.delete = authorised_requests() -class FoilHole(NamedTuple): - session_id: int - id: int - grid_square_id: int - x_location: Optional[float] = None - y_location: Optional[float] = None - x_stage_position: Optional[float] = None - y_stage_position: Optional[float] = None - readout_area_x: Optional[int] = None - readout_area_y: Optional[int] = None - thumbnail_size_x: Optional[int] = None - thumbnail_size_y: Optional[int] = None - pixel_size: Optional[float] = None - image: str = "" - diameter: Optional[float] = None - - -class GridSquare(NamedTuple): - session_id: int - id: int - x_location: Optional[float] = None - y_location: Optional[float] = None - x_stage_position: Optional[float] = None - y_stage_position: Optional[float] = None - readout_area_x: Optional[int] = None - readout_area_y: Optional[int] = None - thumbnail_size_x: Optional[int] = None - thumbnail_size_y: Optional[int] = None - pixel_size: Optional[float] = None - image: str = "" - tag: str = "" - - -def _get_grid_square_atlas_positions(xml_path: Path, grid_square: str = "") -> Dict[ - str, - Tuple[ - Optional[int], - Optional[int], - Optional[float], - Optional[float], - Optional[int], - Optional[int], - Optional[float], - ], -]: - with open( - xml_path, - "r", - ) as dm: - atlas_data = xmltodict.parse(dm.read()) - tile_info = atlas_data["AtlasSessionXml"]["Atlas"]["TilesEfficient"]["_items"][ - "TileXml" - ] - gs_pix_positions: Dict[ - str, - Tuple[ - Optional[int], - Optional[int], - Optional[float], - Optional[float], - Optional[int], - Optional[int], - Optional[float], - ], - ] = {} - for ti in tile_info: - try: - nodes = ti["Nodes"]["KeyValuePairs"] - except KeyError: - continue - required_key = "" - for key in nodes.keys(): - if key.startswith("KeyValuePairOfintNodeXml"): - required_key = key - break - if not required_key: - continue - for gs in nodes[required_key]: - if not isinstance(gs, dict): - continue - if not grid_square or gs["key"] == grid_square: - gs_pix_positions[gs["key"]] = ( - int(float(gs["value"]["b:PositionOnTheAtlas"]["c:Center"]["d:x"])), - int(float(gs["value"]["b:PositionOnTheAtlas"]["c:Center"]["d:y"])), - float(gs["value"]["b:PositionOnTheAtlas"]["c:Physical"]["d:x"]) - * 1e9, - float(gs["value"]["b:PositionOnTheAtlas"]["c:Physical"]["d:y"]) - * 1e9, - int( - float(gs["value"]["b:PositionOnTheAtlas"]["c:Size"]["d:width"]) - ), - int( - float(gs["value"]["b:PositionOnTheAtlas"]["c:Size"]["d:height"]) - ), - float(gs["value"]["b:PositionOnTheAtlas"]["c:Rotation"]), - ) - if grid_square: - break - return gs_pix_positions - - def _file_transferred_to( environment: MurfeyInstanceEnvironment, source: Path, file_path: Path ): @@ -150,17 +56,6 @@ def _file_transferred_to( ) -def _grid_square_from_file(f: Path) -> int: - for p in f.parts: - if p.startswith("GridSquare"): - return int(p.split("_")[1]) - raise ValueError(f"Grid square ID could not be determined from path {f}") - - -def _foil_hole_from_file(f: Path) -> int: - return int(f.name.split("_")[1]) - - def _grid_square_metadata_file( f: Path, data_directories: List[Path], visit: str, grid_square: int ) -> Path: @@ -180,97 +75,6 @@ def _grid_square_metadata_file( ) -def _grid_square_data(xml_path: Path, grid_square: int, session_id: int) -> GridSquare: - image_paths = list( - (xml_path.parent.parent).glob( - f"Images-Disc*/GridSquare_{grid_square}/GridSquare_*.jpg" - ) - ) - if image_paths: - image_paths.sort(key=lambda x: x.stat().st_ctime) - image_path = image_paths[-1] - with open(Path(image_path).with_suffix(".xml")) as gs_xml: - gs_xml_data = xmltodict.parse(gs_xml.read()) - readout_area = gs_xml_data["MicroscopeImage"]["microscopeData"]["acquisition"][ - "camera" - ]["ReadoutArea"] - pixel_size = gs_xml_data["MicroscopeImage"]["SpatialScale"]["pixelSize"]["x"][ - "numericValue" - ] - full_size = (int(readout_area["a:width"]), int(readout_area["a:height"])) - return GridSquare( - id=grid_square, - session_id=session_id, - readout_area_x=full_size[0] if image_path else None, - readout_area_y=full_size[1] if image_path else None, - thumbnail_size_x=int((512 / max(full_size)) * full_size[0]), - thumbnail_size_y=int((512 / max(full_size)) * full_size[1]), - pixel_size=float(pixel_size) if image_path else None, - image=str(image_path), - ) - return GridSquare(id=grid_square, session_id=session_id) - - -def _foil_hole_data( - xml_path: Path, foil_hole: int, grid_square: int, session_id: int -) -> FoilHole: - with open(xml_path, "r") as xml: - for_parsing = xml.read() - data = xmltodict.parse(for_parsing) - data = data["GridSquareXml"] - serialization_array = data["TargetLocations"]["TargetLocationsEfficient"][ - "a:m_serializationArray" - ] - required_key = "" - for key in serialization_array.keys(): - if key.startswith("b:KeyValuePairOfintTargetLocation"): - required_key = key - break - if required_key: - image_paths = list( - (xml_path.parent.parent).glob( - f"Images-Disc*/GridSquare_{grid_square}/FoilHoles/FoilHole_{foil_hole}_*.jpg" - ) - ) - image_paths.sort(key=lambda x: x.stat().st_ctime) - image_path: Path | str = image_paths[-1] if image_paths else "" - if image_path: - with open(Path(image_path).with_suffix(".xml")) as fh_xml: - fh_xml_data = xmltodict.parse(fh_xml.read()) - readout_area = fh_xml_data["MicroscopeImage"]["microscopeData"][ - "acquisition" - ]["camera"]["ReadoutArea"] - pixel_size = fh_xml_data["MicroscopeImage"]["SpatialScale"]["pixelSize"][ - "x" - ]["numericValue"] - full_size = (int(readout_area["a:width"]), int(readout_area["a:height"])) - for fh_block in serialization_array[required_key]: - pix = fh_block["b:value"]["PixelCenter"] - stage = fh_block["b:value"]["StagePosition"] - diameter = fh_block["b:value"]["PixelWidthHeight"]["c:width"] - if int(fh_block["b:key"]) == foil_hole: - return FoilHole( - id=foil_hole, - grid_square_id=grid_square, - session_id=session_id, - x_location=float(pix["c:x"]), - y_location=float(pix["c:y"]), - x_stage_position=float(stage["c:X"]), - y_stage_position=float(stage["c:Y"]), - readout_area_x=full_size[0] if image_path else None, - readout_area_y=full_size[1] if image_path else None, - thumbnail_size_x=None, - thumbnail_size_y=None, - pixel_size=float(pixel_size) if image_path else None, - image=str(image_path), - diameter=diameter, - ) - logger.warning( - f"Foil hole positions could not be determined from metadata file {xml_path} for foil hole {foil_hole}" - ) - return FoilHole(id=foil_hole, grid_square_id=grid_square, session_id=session_id) - - def _get_source(file_path: Path, environment: MurfeyInstanceEnvironment) -> Path | None: for s in environment.sources: if file_path.is_relative_to(s): @@ -566,8 +370,8 @@ def _position_analysis( environment: MurfeyInstanceEnvironment, source: Path, machine_config: dict, - ) -> int: - grid_square = _grid_square_from_file(transferred_file) + ) -> Optional[int]: + grid_square = grid_square_from_file(transferred_file) grid_square_metadata_file = _grid_square_metadata_file( transferred_file, [Path(p) for p in machine_config["data_directories"]], @@ -596,6 +400,9 @@ def _position_analysis( .json() .get(str(source), {}) ) + if not data_collection_group: + logger.info("Data collection group has not yet been made") + return None if data_collection_group.get("atlas"): visit_path = "" for p in transferred_file.parts: @@ -606,15 +413,14 @@ def _position_analysis( local_atlas_path = ( Path(visit_path) / environment.samples[source].atlas ) - gs_pix_position = _get_grid_square_atlas_positions( + gs_pix_position = get_grid_square_atlas_positions( local_atlas_path, grid_square=str(grid_square), )[str(grid_square)] gs_url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/grid_square/{grid_square}" - gs = _grid_square_data( + gs = grid_square_data( grid_square_metadata_file, grid_square, - environment.murfey_session, ) metadata_source = Path( ( @@ -647,18 +453,17 @@ def _position_analysis( "angle": gs_pix_position[6], }, ) - foil_hole = _foil_hole_from_file(transferred_file) + foil_hole = foil_hole_from_file(transferred_file) if foil_hole not in self._foil_holes[grid_square]: fh_url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/grid_square/{grid_square}/foil_hole" if ( grid_square_metadata_file.is_file() and environment.murfey_session is not None ): - fh = _foil_hole_data( + fh = foil_hole_data( grid_square_metadata_file, foil_hole, grid_square, - environment.murfey_session, ) metadata_source = Path( ( diff --git a/src/murfey/client/contexts/spa_metadata.py b/src/murfey/client/contexts/spa_metadata.py index 26c3a181..87ca9994 100644 --- a/src/murfey/client/contexts/spa_metadata.py +++ b/src/murfey/client/contexts/spa_metadata.py @@ -6,9 +6,10 @@ import xmltodict from murfey.client.context import Context -from murfey.client.contexts.spa import _get_grid_square_atlas_positions, _get_source +from murfey.client.contexts.spa import _get_source from murfey.client.instance_environment import MurfeyInstanceEnvironment, SampleInfo from murfey.util import authorised_requests, capture_post, get_machine_config_client +from murfey.util.spa_metadata import get_grid_square_atlas_positions logger = logging.getLogger("murfey.client.contexts.spa_metadata") @@ -116,7 +117,7 @@ def post_transfer( .get(str(source), []) ) if registered_grid_squares: - gs_pix_positions = _get_grid_square_atlas_positions( + gs_pix_positions = get_grid_square_atlas_positions( source_visit_dir / partial_path ) for gs in registered_grid_squares: diff --git a/src/murfey/server/__init__.py b/src/murfey/server/__init__.py index a258f71a..39786ff2 100644 --- a/src/murfey/server/__init__.py +++ b/src/murfey/server/__init__.py @@ -1930,117 +1930,6 @@ def _register_initial_model(message: dict, _db=murfey_db, demo: bool = False): _db.close() -def _flush_spa_preprocessing(message: dict): - session_id = message["session_id"] - stashed_files = murfey_db.exec( - select(db.PreprocessStash) - .where(db.PreprocessStash.session_id == session_id) - .where(db.PreprocessStash.tag == message["tag"]) - ).all() - if not stashed_files: - return None - instrument_name = ( - murfey_db.exec(select(db.Session).where(db.Session.id == message["session_id"])) - .one() - .instrument_name - ) - machine_config = get_machine_config(instrument_name=instrument_name)[ - instrument_name - ] - recipe_name = machine_config.recipes.get("em-spa-preprocess", "em-spa-preprocess") - collected_ids = murfey_db.exec( - select( - db.DataCollectionGroup, - db.DataCollection, - db.ProcessingJob, - db.AutoProcProgram, - ) - .where(db.DataCollectionGroup.session_id == session_id) - .where(db.DataCollectionGroup.tag == message["tag"]) - .where(db.DataCollection.dcg_id == db.DataCollectionGroup.id) - .where(db.ProcessingJob.dc_id == db.DataCollection.id) - .where(db.AutoProcProgram.pj_id == db.ProcessingJob.id) - .where(db.ProcessingJob.recipe == recipe_name) - ).one() - params = murfey_db.exec( - select(db.SPARelionParameters, db.SPAFeedbackParameters) - .where(db.SPARelionParameters.pj_id == collected_ids[2].id) - .where(db.SPAFeedbackParameters.pj_id == db.SPARelionParameters.pj_id) - ).one() - proc_params = params[0] - feedback_params = params[1] - if not proc_params: - logger.warning( - f"No SPA processing parameters found for client processing job ID {collected_ids[2].id}" - ) - raise ValueError( - "No processing parameters were found in the database when flushing SPA preprocessing" - ) - - murfey_ids = _murfey_id( - collected_ids[3].id, - murfey_db, - number=2 * len(stashed_files), - close=False, - ) - if feedback_params.picker_murfey_id is None: - feedback_params.picker_murfey_id = murfey_ids[1] - murfey_db.add(feedback_params) - - for i, f in enumerate(stashed_files): - mrcp = Path(f.mrc_out) - ppath = Path(f.file_path) - if not mrcp.parent.exists(): - mrcp.parent.mkdir(parents=True) - movie = db.Movie( - murfey_id=murfey_ids[2 * i], - path=f.file_path, - image_number=f.image_number, - tag=f.tag, - foil_hole_id=f.foil_hole_id, - ) - murfey_db.add(movie) - zocalo_message: dict = { - "recipes": [recipe_name], - "parameters": { - "node_creator_queue": machine_config.node_creator_queue, - "dcid": collected_ids[1].id, - "kv": proc_params.voltage, - "autoproc_program_id": collected_ids[3].id, - "movie": f.file_path, - "mrc_out": f.mrc_out, - "pixel_size": proc_params.angpix, - "image_number": f.image_number, - "microscope": get_microscope(), - "mc_uuid": murfey_ids[2 * i], - "ft_bin": proc_params.motion_corr_binning, - "fm_dose": proc_params.dose_per_frame, - "gain_ref": proc_params.gain_ref, - "picker_uuid": murfey_ids[2 * i + 1], - "session_id": session_id, - "particle_diameter": proc_params.particle_diameter or 0, - "fm_int_file": f.eer_fractionation_file, - "do_icebreaker_jobs": default_spa_parameters.do_icebreaker_jobs, - "foil_hole_id": f.foil_hole_id, - }, - } - if _transport_object: - zocalo_message["parameters"][ - "feedback_queue" - ] = _transport_object.feedback_queue - _transport_object.send( - "processing_recipe", zocalo_message, new_connection=True - ) - murfey_db.delete(f) - else: - logger.error( - f"Pre-processing was requested for {ppath.name} but no Zocalo transport object was found" - ) - murfey_db.commit() - murfey_db.close() - return None - - def _flush_tomography_preprocessing(message: dict): session_id = message["session_id"] instrument_name = ( @@ -2840,11 +2729,6 @@ def feedback_callback(header: dict, message: dict) -> None: if _transport_object: _transport_object.transport.ack(header) return None - elif message["register"] == "flush_spa_preprocess": - _flush_spa_preprocessing(message) - if _transport_object: - _transport_object.transport.ack(header) - return None elif message["register"] == "spa_processing_parameters": session_id = message["session_id"] collected_ids = murfey_db.exec( diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index d6232f9a..4873f81e 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -24,7 +24,7 @@ from PIL import Image from pydantic import BaseModel from sqlalchemy import func -from sqlalchemy.exc import NoResultFound, OperationalError +from sqlalchemy.exc import OperationalError from sqlmodel import col, select from werkzeug.utils import secure_filename @@ -108,6 +108,10 @@ from murfey.util.processing_params import default_spa_parameters from murfey.util.state import global_state from murfey.util.tomo import midpoint +from murfey.workflows.spa.flush_spa_preprocess import ( + register_foil_hole, + register_grid_square, +) log = logging.getLogger("murfey.server.api") @@ -486,69 +490,13 @@ def get_number_of_movies_from_foil_hole( @router.post("/sessions/{session_id}/grid_square/{gsid}") -def register_grid_square( +def posted_grid_square( session_id: MurfeySessionID, gsid: int, grid_square_params: GridSquareParameters, db=murfey_db, ): - try: - grid_square = db.exec( - select(GridSquare) - .where(GridSquare.name == gsid) - .where(GridSquare.tag == grid_square_params.tag) - .where(GridSquare.session_id == session_id) - ).one() - grid_square.x_location = grid_square_params.x_location - grid_square.y_location = grid_square_params.y_location - grid_square.x_stage_position = grid_square_params.x_stage_position - grid_square.y_stage_position = grid_square_params.y_stage_position - if _transport_object: - _transport_object.do_update_grid_square(grid_square.id, grid_square_params) - except Exception: - if _transport_object: - dcg = db.exec( - select(DataCollectionGroup) - .where(DataCollectionGroup.session_id == session_id) - .where(DataCollectionGroup.tag == grid_square_params.tag) - ).one() - gs_ispyb_response = _transport_object.do_insert_grid_square( - dcg.atlas_id, gsid, grid_square_params - ) - else: - # mock up response so that below still works - gs_ispyb_response = {"success": False, "return_value": None} - secured_grid_square_image_path = secure_filename(grid_square_params.image) - if ( - secured_grid_square_image_path - and Path(secured_grid_square_image_path).is_file() - ): - jpeg_size = Image.open(secured_grid_square_image_path).size - else: - jpeg_size = (0, 0) - grid_square = GridSquare( - id=( - gs_ispyb_response["return_value"] - if gs_ispyb_response["success"] - else None - ), - name=gsid, - session_id=session_id, - tag=grid_square_params.tag, - x_location=grid_square_params.x_location, - y_location=grid_square_params.y_location, - x_stage_position=grid_square_params.x_stage_position, - y_stage_position=grid_square_params.y_stage_position, - readout_area_x=grid_square_params.readout_area_x, - readout_area_y=grid_square_params.readout_area_y, - thumbnail_size_x=grid_square_params.thumbnail_size_x or jpeg_size[0], - thumbnail_size_y=grid_square_params.thumbnail_size_y or jpeg_size[1], - pixel_size=grid_square_params.pixel_size, - image=secured_grid_square_image_path, - ) - db.add(grid_square) - db.commit() - db.close() + return register_grid_square(session_id, gsid, grid_square_params, db) @router.get("/sessions/{session_id}/foil_hole/{fh_name}") @@ -565,80 +513,13 @@ def get_foil_hole( @router.post("/sessions/{session_id}/grid_square/{gs_name}/foil_hole") -def register_foil_hole( +def post_foil_hole( session_id: MurfeySessionID, gs_name: int, foil_hole_params: FoilHoleParameters, db=murfey_db, ): - try: - gs = db.exec( - select(GridSquare) - .where(GridSquare.tag == foil_hole_params.tag) - .where(GridSquare.session_id == session_id) - .where(GridSquare.name == gs_name) - ).one() - gsid = gs.id - except NoResultFound: - log.debug( - f"Foil hole {sanitise(str(foil_hole_params.name))} could not be registered as grid square {sanitise(str(gs_name))} was not found" - ) - return - secured_foil_hole_image_path = secure_filename(foil_hole_params.image) - if foil_hole_params.image and Path(secured_foil_hole_image_path).is_file(): - jpeg_size = Image.open(secured_foil_hole_image_path).size - else: - jpeg_size = (0, 0) - try: - foil_hole = db.exec( - select(FoilHole) - .where(FoilHole.name == foil_hole_params.name) - .where(FoilHole.grid_square_id == gsid) - .where(FoilHole.session_id == session_id) - ).one() - foil_hole.x_location = foil_hole_params.x_location - foil_hole.y_location = foil_hole_params.y_location - foil_hole.x_stage_position = foil_hole_params.x_stage_position - foil_hole.y_stage_position = foil_hole_params.y_stage_position - foil_hole.readout_area_x = foil_hole_params.readout_area_x - foil_hole.readout_area_y = foil_hole_params.readout_area_y - foil_hole.thumbnail_size_x = foil_hole_params.thumbnail_size_x or jpeg_size[0] - foil_hole.thumbnail_size_y = foil_hole_params.thumbnail_size_y or jpeg_size[1] - foil_hole.pixel_size = foil_hole_params.pixel_size - if _transport_object: - _transport_object.do_update_foil_hole( - foil_hole.id, gs.thumbnail_size_x / gs.readout_area_x, foil_hole_params - ) - except Exception: - if _transport_object: - fh_ispyb_response = _transport_object.do_insert_foil_hole( - gs.id, gs.thumbnail_size_x / gs.readout_area_x, foil_hole_params - ) - else: - fh_ispyb_response = {"success": False, "return_value": None} - foil_hole = FoilHole( - id=( - fh_ispyb_response["return_value"] - if fh_ispyb_response["success"] - else None - ), - name=foil_hole_params.name, - session_id=session_id, - grid_square_id=gsid, - x_location=foil_hole_params.x_location, - y_location=foil_hole_params.y_location, - x_stage_position=foil_hole_params.x_stage_position, - y_stage_position=foil_hole_params.y_stage_position, - readout_area_x=foil_hole_params.readout_area_x, - readout_area_y=foil_hole_params.readout_area_y, - thumbnail_size_x=foil_hole_params.thumbnail_size_x or jpeg_size[0], - thumbnail_size_y=foil_hole_params.thumbnail_size_y or jpeg_size[1], - pixel_size=foil_hole_params.pixel_size, - image=secured_foil_hole_image_path, - ) - db.add(foil_hole) - db.commit() - db.close() + return register_foil_hole(session_id, gs_name, foil_hole_params, db) @router.post("/sessions/{session_id}/tomography_preprocessing_parameters") diff --git a/src/murfey/util/spa_metadata.py b/src/murfey/util/spa_metadata.py new file mode 100644 index 00000000..3db0a0bc --- /dev/null +++ b/src/murfey/util/spa_metadata.py @@ -0,0 +1,204 @@ +import logging +from pathlib import Path +from typing import Dict, NamedTuple, Optional, Tuple + +import xmltodict + +logger = logging.getLogger("murfey.util.spa_metadata") + + +class FoilHoleInfo(NamedTuple): + id: int + grid_square_id: int + x_location: Optional[float] = None + y_location: Optional[float] = None + x_stage_position: Optional[float] = None + y_stage_position: Optional[float] = None + readout_area_x: Optional[int] = None + readout_area_y: Optional[int] = None + thumbnail_size_x: Optional[int] = None + thumbnail_size_y: Optional[int] = None + pixel_size: Optional[float] = None + image: str = "" + diameter: Optional[float] = None + + +class GridSquareInfo(NamedTuple): + id: int + x_location: Optional[float] = None + y_location: Optional[float] = None + x_stage_position: Optional[float] = None + y_stage_position: Optional[float] = None + readout_area_x: Optional[int] = None + readout_area_y: Optional[int] = None + thumbnail_size_x: Optional[int] = None + thumbnail_size_y: Optional[int] = None + pixel_size: Optional[float] = None + image: str = "" + tag: str = "" + + +def grid_square_from_file(f: Path) -> int: + for p in f.parts: + if p.startswith("GridSquare"): + return int(p.split("_")[1]) + raise ValueError(f"Grid square ID could not be determined from path {f}") + + +def foil_hole_from_file(f: Path) -> int: + return int(f.name.split("_")[1]) + + +def get_grid_square_atlas_positions(xml_path: Path, grid_square: str = "") -> Dict[ + str, + Tuple[ + Optional[int], + Optional[int], + Optional[float], + Optional[float], + Optional[int], + Optional[int], + Optional[float], + ], +]: + with open( + xml_path, + "r", + ) as dm: + atlas_data = xmltodict.parse(dm.read()) + tile_info = atlas_data["AtlasSessionXml"]["Atlas"]["TilesEfficient"]["_items"][ + "TileXml" + ] + gs_pix_positions: Dict[ + str, + Tuple[ + Optional[int], + Optional[int], + Optional[float], + Optional[float], + Optional[int], + Optional[int], + Optional[float], + ], + ] = {} + for ti in tile_info: + try: + nodes = ti["Nodes"]["KeyValuePairs"] + except KeyError: + continue + required_key = "" + for key in nodes.keys(): + if key.startswith("KeyValuePairOfintNodeXml"): + required_key = key + break + if not required_key: + continue + for gs in nodes[required_key]: + if not isinstance(gs, dict): + continue + if not grid_square or gs["key"] == grid_square: + gs_pix_positions[gs["key"]] = ( + int(float(gs["value"]["b:PositionOnTheAtlas"]["c:Center"]["d:x"])), + int(float(gs["value"]["b:PositionOnTheAtlas"]["c:Center"]["d:y"])), + float(gs["value"]["b:PositionOnTheAtlas"]["c:Physical"]["d:x"]) + * 1e9, + float(gs["value"]["b:PositionOnTheAtlas"]["c:Physical"]["d:y"]) + * 1e9, + int( + float(gs["value"]["b:PositionOnTheAtlas"]["c:Size"]["d:width"]) + ), + int( + float(gs["value"]["b:PositionOnTheAtlas"]["c:Size"]["d:height"]) + ), + float(gs["value"]["b:PositionOnTheAtlas"]["c:Rotation"]), + ) + if grid_square: + break + return gs_pix_positions + + +def grid_square_data(xml_path: Path, grid_square: int) -> GridSquareInfo: + image_paths = list( + (xml_path.parent.parent).glob( + f"Images-Disc*/GridSquare_{grid_square}/GridSquare_*.jpg" + ) + ) + if image_paths: + image_paths.sort(key=lambda x: x.stat().st_ctime) + image_path = image_paths[-1] + with open(Path(image_path).with_suffix(".xml")) as gs_xml: + gs_xml_data = xmltodict.parse(gs_xml.read()) + readout_area = gs_xml_data["MicroscopeImage"]["microscopeData"]["acquisition"][ + "camera" + ]["ReadoutArea"] + pixel_size = gs_xml_data["MicroscopeImage"]["SpatialScale"]["pixelSize"]["x"][ + "numericValue" + ] + full_size = (int(readout_area["a:width"]), int(readout_area["a:height"])) + return GridSquareInfo( + id=grid_square, + readout_area_x=full_size[0] if image_path else None, + readout_area_y=full_size[1] if image_path else None, + thumbnail_size_x=int((512 / max(full_size)) * full_size[0]), + thumbnail_size_y=int((512 / max(full_size)) * full_size[1]), + pixel_size=float(pixel_size) if image_path else None, + image=str(image_path), + ) + return GridSquareInfo(id=grid_square) + + +def foil_hole_data(xml_path: Path, foil_hole: int, grid_square: int) -> FoilHoleInfo: + with open(xml_path, "r") as xml: + for_parsing = xml.read() + data = xmltodict.parse(for_parsing) + data = data["GridSquareXml"] + serialization_array = data["TargetLocations"]["TargetLocationsEfficient"][ + "a:m_serializationArray" + ] + required_key = "" + for key in serialization_array.keys(): + if key.startswith("b:KeyValuePairOfintTargetLocation"): + required_key = key + break + if required_key: + image_paths = list( + (xml_path.parent.parent).glob( + f"Images-Disc*/GridSquare_{grid_square}/FoilHoles/FoilHole_{foil_hole}_*.jpg" + ) + ) + image_paths.sort(key=lambda x: x.stat().st_ctime) + image_path: Path | str = image_paths[-1] if image_paths else "" + if image_path: + with open(Path(image_path).with_suffix(".xml")) as fh_xml: + fh_xml_data = xmltodict.parse(fh_xml.read()) + readout_area = fh_xml_data["MicroscopeImage"]["microscopeData"][ + "acquisition" + ]["camera"]["ReadoutArea"] + pixel_size = fh_xml_data["MicroscopeImage"]["SpatialScale"]["pixelSize"][ + "x" + ]["numericValue"] + full_size = (int(readout_area["a:width"]), int(readout_area["a:height"])) + for fh_block in serialization_array[required_key]: + pix = fh_block["b:value"]["PixelCenter"] + stage = fh_block["b:value"]["StagePosition"] + diameter = fh_block["b:value"]["PixelWidthHeight"]["c:width"] + if int(fh_block["b:key"]) == foil_hole: + return FoilHoleInfo( + id=foil_hole, + grid_square_id=grid_square, + x_location=float(pix["c:x"]), + y_location=float(pix["c:y"]), + x_stage_position=float(stage["c:X"]), + y_stage_position=float(stage["c:Y"]), + readout_area_x=full_size[0] if image_path else None, + readout_area_y=full_size[1] if image_path else None, + thumbnail_size_x=None, + thumbnail_size_y=None, + pixel_size=float(pixel_size) if image_path else None, + image=str(image_path), + diameter=diameter, + ) + logger.warning( + f"Foil hole positions could not be determined from metadata file {xml_path} for foil hole {foil_hole}" + ) + return FoilHoleInfo(id=foil_hole, grid_square_id=grid_square) diff --git a/src/murfey/workflows/spa/flush_spa_preprocess.py b/src/murfey/workflows/spa/flush_spa_preprocess.py new file mode 100644 index 00000000..1f8cbc70 --- /dev/null +++ b/src/murfey/workflows/spa/flush_spa_preprocess.py @@ -0,0 +1,391 @@ +import logging +from pathlib import Path +from typing import Optional + +from PIL import Image +from sqlalchemy.exc import NoResultFound +from sqlmodel import Session, select +from werkzeug.utils import secure_filename + +from murfey.server import _murfey_id, _transport_object, sanitise +from murfey.server.api.auth import MurfeySessionID +from murfey.server.murfey_db import murfey_db +from murfey.util.config import get_machine_config, get_microscope +from murfey.util.db import DataCollectionGroup, FoilHole, GridSquare +from murfey.util.models import FoilHoleParameters, GridSquareParameters +from murfey.util.processing_params import default_spa_parameters +from murfey.util.spa_metadata import ( + GridSquareInfo, + foil_hole_data, + foil_hole_from_file, + get_grid_square_atlas_positions, + grid_square_data, + grid_square_from_file, +) + +logger = logging.getLogger("murfey.workflows.spa.flush_spa_preprocess") + + +def register_grid_square( + session_id: MurfeySessionID, + gsid: int, + grid_square_params: GridSquareParameters, + db=murfey_db, +): + try: + grid_square = db.exec( + select(GridSquare) + .where(GridSquare.name == gsid) + .where(GridSquare.tag == grid_square_params.tag) + .where(GridSquare.session_id == session_id) + ).one() + grid_square.x_location = grid_square_params.x_location + grid_square.y_location = grid_square_params.y_location + grid_square.x_stage_position = grid_square_params.x_stage_position + grid_square.y_stage_position = grid_square_params.y_stage_position + if _transport_object: + _transport_object.do_update_grid_square(grid_square.id, grid_square_params) + except Exception: + if _transport_object: + dcg = db.exec( + select(DataCollectionGroup) + .where(DataCollectionGroup.session_id == session_id) + .where(DataCollectionGroup.tag == grid_square_params.tag) + ).one() + gs_ispyb_response = _transport_object.do_insert_grid_square( + dcg.atlas_id, gsid, grid_square_params + ) + else: + # mock up response so that below still works + gs_ispyb_response = {"success": False, "return_value": None} + secured_grid_square_image_path = secure_filename(grid_square_params.image) + if ( + secured_grid_square_image_path + and Path(secured_grid_square_image_path).is_file() + ): + jpeg_size = Image.open(secured_grid_square_image_path).size + else: + jpeg_size = (0, 0) + grid_square = GridSquare( + id=( + gs_ispyb_response["return_value"] + if gs_ispyb_response["success"] + else None + ), + name=gsid, + session_id=session_id, + tag=grid_square_params.tag, + x_location=grid_square_params.x_location, + y_location=grid_square_params.y_location, + x_stage_position=grid_square_params.x_stage_position, + y_stage_position=grid_square_params.y_stage_position, + readout_area_x=grid_square_params.readout_area_x, + readout_area_y=grid_square_params.readout_area_y, + thumbnail_size_x=grid_square_params.thumbnail_size_x or jpeg_size[0], + thumbnail_size_y=grid_square_params.thumbnail_size_y or jpeg_size[1], + pixel_size=grid_square_params.pixel_size, + image=secured_grid_square_image_path, + ) + db.add(grid_square) + db.commit() + db.close() + + +def register_foil_hole( + session_id: MurfeySessionID, + gs_name: int, + foil_hole_params: FoilHoleParameters, + db=murfey_db, +): + try: + gs = db.exec( + select(GridSquare) + .where(GridSquare.tag == foil_hole_params.tag) + .where(GridSquare.session_id == session_id) + .where(GridSquare.name == gs_name) + ).one() + gsid = gs.id + except NoResultFound: + logger.debug( + f"Foil hole {sanitise(str(foil_hole_params.name))} could not be registered as grid square {sanitise(str(gs_name))} was not found" + ) + return + secured_foil_hole_image_path = secure_filename(foil_hole_params.image) + if foil_hole_params.image and Path(secured_foil_hole_image_path).is_file(): + jpeg_size = Image.open(secured_foil_hole_image_path).size + else: + jpeg_size = (0, 0) + try: + foil_hole = db.exec( + select(FoilHole) + .where(FoilHole.name == foil_hole_params.name) + .where(FoilHole.grid_square_id == gsid) + .where(FoilHole.session_id == session_id) + ).one() + foil_hole.x_location = foil_hole_params.x_location + foil_hole.y_location = foil_hole_params.y_location + foil_hole.x_stage_position = foil_hole_params.x_stage_position + foil_hole.y_stage_position = foil_hole_params.y_stage_position + foil_hole.readout_area_x = foil_hole_params.readout_area_x + foil_hole.readout_area_y = foil_hole_params.readout_area_y + foil_hole.thumbnail_size_x = foil_hole_params.thumbnail_size_x or jpeg_size[0] + foil_hole.thumbnail_size_y = foil_hole_params.thumbnail_size_y or jpeg_size[1] + foil_hole.pixel_size = foil_hole_params.pixel_size + if _transport_object: + _transport_object.do_update_foil_hole( + foil_hole.id, gs.thumbnail_size_x / gs.readout_area_x, foil_hole_params + ) + except Exception: + if _transport_object: + fh_ispyb_response = _transport_object.do_insert_foil_hole( + gs.id, gs.thumbnail_size_x / gs.readout_area_x, foil_hole_params + ) + else: + fh_ispyb_response = {"success": False, "return_value": None} + foil_hole = FoilHole( + id=( + fh_ispyb_response["return_value"] + if fh_ispyb_response["success"] + else None + ), + name=foil_hole_params.name, + session_id=session_id, + grid_square_id=gsid, + x_location=foil_hole_params.x_location, + y_location=foil_hole_params.y_location, + x_stage_position=foil_hole_params.x_stage_position, + y_stage_position=foil_hole_params.y_stage_position, + readout_area_x=foil_hole_params.readout_area_x, + readout_area_y=foil_hole_params.readout_area_y, + thumbnail_size_x=foil_hole_params.thumbnail_size_x or jpeg_size[0], + thumbnail_size_y=foil_hole_params.thumbnail_size_y or jpeg_size[1], + pixel_size=foil_hole_params.pixel_size, + image=secured_foil_hole_image_path, + ) + db.add(foil_hole) + db.commit() + db.close() + + +def _grid_square_metadata_file(f: Path, grid_square: int) -> Optional[Path]: + """Search through metadata directories to find the required grid square dm""" + raw_dir = f.parent.parent.parent + metadata_dirs = raw_dir.glob("metadata*") + for md_dir in metadata_dirs: + gs_path = md_dir / f"Metadata/GridSquare_{grid_square}.dm" + if gs_path.is_file(): + return gs_path + logger.error(f"Could not determine grid square metadata path for {f}") + return None + + +def _flush_position_analysis( + movie_path: Path, dcg_id: int, session_id: int, db: Session +) -> Optional[int]: + """Register a grid square and foil hole in the database""" + data_collection_group = murfey_db.exec( + select(db.DataCollectionGroup).where(db.DataCollectionGroup.id == dcg_id) + ).one() + + # Work out the grid square and associated metadata file + grid_square = grid_square_from_file(movie_path) + grid_square_metadata_file = _grid_square_metadata_file(movie_path, grid_square) + if grid_square_metadata_file: + gs = grid_square_data(grid_square_metadata_file, grid_square) + else: + gs = GridSquareInfo(id=grid_square) + if data_collection_group.atlas: + # If an atlas if present, work out where this grid square is on it + gs_pix_position = get_grid_square_atlas_positions( + data_collection_group.atlas, + grid_square=str(grid_square), + )[str(grid_square)] + grid_square_parameters = GridSquareParameters( + tag=data_collection_group.tag, + x_location=gs_pix_position[0], + y_location=gs_pix_position[1], + x_stage_position=gs_pix_position[2], + y_stage_position=gs_pix_position[3], + readout_area_x=gs.readout_area_x, + readout_area_y=gs.readout_area_y, + thumbnail_size_x=gs.thumbnail_size_x, + thumbnail_size_y=gs.thumbnail_size_y, + height=gs_pix_position[5], + width=gs_pix_position[4], + pixel_size=gs.pixel_size, + image=gs.image, + angle=gs_pix_position[6], + ) + else: + # Skip location analysis if no atlas + grid_square_parameters = GridSquareParameters( + tag=data_collection_group.tag, + readout_area_x=gs.readout_area_x, + readout_area_y=gs.readout_area_y, + thumbnail_size_x=gs.thumbnail_size_x, + thumbnail_size_y=gs.thumbnail_size_y, + pixel_size=gs.pixel_size, + image=gs.image, + ) + # Insert or update this grid square in the database + register_grid_square(session_id, gs.id, grid_square_parameters, murfey_db) + + # Find the foil hole info and register it + foil_hole = foil_hole_from_file(movie_path) + if grid_square_metadata_file: + fh = foil_hole_data( + grid_square_metadata_file, + foil_hole, + grid_square, + ) + foil_hole_parameters = FoilHoleParameters( + tag=data_collection_group.tag, + name=foil_hole, + x_location=fh.x_location, + y_location=fh.y_location, + x_stage_position=fh.x_stage_position, + y_stage_position=fh.y_stage_position, + readout_area_x=fh.readout_area_x, + readout_area_y=fh.readout_area_y, + thumbnail_size_x=fh.thumbnail_size_x, + thumbnail_size_y=fh.thumbnail_size_y, + pixel_size=fh.pixel_size, + image=fh.image, + diameter=fh.diameter, + ) + else: + foil_hole_parameters = FoilHoleParameters( + tag=data_collection_group.tag, + name=foil_hole, + ) + # Insert or update this foil hole in the database + register_foil_hole(session_id, gs.id, foil_hole_parameters, murfey_db) + return foil_hole + + +def flush_spa_preprocessing(message: dict, db: Session, demo: bool = False): + session_id = message["session_id"] + stashed_files = murfey_db.exec( + select(db.PreprocessStash) + .where(db.PreprocessStash.session_id == session_id) + .where(db.PreprocessStash.tag == message["tag"]) + ).all() + if not stashed_files: + return None + instrument_name = ( + murfey_db.exec(select(db.Session).where(db.Session.id == message["session_id"])) + .one() + .instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + recipe_name = machine_config.recipes.get("em-spa-preprocess", "em-spa-preprocess") + collected_ids = murfey_db.exec( + select( + db.DataCollectionGroup, + db.DataCollection, + db.ProcessingJob, + db.AutoProcProgram, + ) + .where(db.DataCollectionGroup.session_id == session_id) + .where(db.DataCollectionGroup.tag == message["tag"]) + .where(db.DataCollection.dcg_id == db.DataCollectionGroup.id) + .where(db.ProcessingJob.dc_id == db.DataCollection.id) + .where(db.AutoProcProgram.pj_id == db.ProcessingJob.id) + .where(db.ProcessingJob.recipe == recipe_name) + ).one() + params = murfey_db.exec( + select(db.SPARelionParameters, db.SPAFeedbackParameters) + .where(db.SPARelionParameters.pj_id == collected_ids[2].id) + .where(db.SPAFeedbackParameters.pj_id == db.SPARelionParameters.pj_id) + ).one() + proc_params = params[0] + feedback_params = params[1] + if not proc_params: + logger.warning( + f"No SPA processing parameters found for client processing job ID {collected_ids[2].id}" + ) + raise ValueError( + "No processing parameters were found in the database when flushing SPA preprocessing" + ) + + murfey_ids = _murfey_id( + collected_ids[3].id, + murfey_db, + number=2 * len(stashed_files), + close=False, + ) + if feedback_params.picker_murfey_id is None: + feedback_params.picker_murfey_id = murfey_ids[1] + murfey_db.add(feedback_params) + + for i, f in enumerate(stashed_files): + if f.foil_hole_id: + foil_hole_id = f.foil_hole_id + else: + # Register grid square and foil hole if not present + try: + foil_hole_id = _flush_position_analysis( + movie_path=f.file_path, + dcg_id=collected_ids[0].id, + session_id=session_id, + db=db, + ) + except Exception as e: + logger.error( + f"Flushing position analysis for {f.file_path} caused exception {e}", exc_info=True + ) + foil_hole_id = None + + mrcp = Path(f.mrc_out) + ppath = Path(f.file_path) + if not mrcp.parent.exists(): + mrcp.parent.mkdir(parents=True) + movie = db.Movie( + murfey_id=murfey_ids[2 * i], + path=f.file_path, + image_number=f.image_number, + tag=f.tag, + foil_hole_id=foil_hole_id, + ) + murfey_db.add(movie) + zocalo_message: dict = { + "recipes": [recipe_name], + "parameters": { + "node_creator_queue": machine_config.node_creator_queue, + "dcid": collected_ids[1].id, + "kv": proc_params.voltage, + "autoproc_program_id": collected_ids[3].id, + "movie": f.file_path, + "mrc_out": f.mrc_out, + "pixel_size": proc_params.angpix, + "image_number": f.image_number, + "microscope": get_microscope(), + "mc_uuid": murfey_ids[2 * i], + "ft_bin": proc_params.motion_corr_binning, + "fm_dose": proc_params.dose_per_frame, + "gain_ref": proc_params.gain_ref, + "picker_uuid": murfey_ids[2 * i + 1], + "session_id": session_id, + "particle_diameter": proc_params.particle_diameter or 0, + "fm_int_file": f.eer_fractionation_file, + "do_icebreaker_jobs": default_spa_parameters.do_icebreaker_jobs, + "foil_hole_id": foil_hole_id, + }, + } + if _transport_object: + zocalo_message["parameters"][ + "feedback_queue" + ] = _transport_object.feedback_queue + _transport_object.send( + "processing_recipe", zocalo_message, new_connection=True + ) + murfey_db.delete(f) + else: + logger.error( + f"Pre-processing was requested for {ppath.name} but no Zocalo transport object was found" + ) + murfey_db.commit() + murfey_db.close() + return None