From f0a7df5e7f8f297ce97da823e1096730b7ec17ef Mon Sep 17 00:00:00 2001 From: Wilfred Tyler Gee Date: Sun, 19 May 2024 07:20:26 -1000 Subject: [PATCH] Observation processing updates * Change the order of the processing updates. * Remove the `upload_exposure` method, which spawn yet another process. * Consolidate the uploading of files * Add the uploaded public url to the metadata --- src/panoptes/pocs/observatory.py | 99 +++++++++++++++----------------- src/panoptes/pocs/utils/cloud.py | 4 +- 2 files changed, 47 insertions(+), 56 deletions(-) diff --git a/src/panoptes/pocs/observatory.py b/src/panoptes/pocs/observatory.py index 66ee3f274..18d18f3aa 100644 --- a/src/panoptes/pocs/observatory.py +++ b/src/panoptes/pocs/observatory.py @@ -1,7 +1,6 @@ import os from collections import OrderedDict from contextlib import suppress -from multiprocessing import Process from pathlib import Path from typing import Dict, Optional @@ -500,18 +499,16 @@ def process_observation( metadata['filepath'] = compressed_file_path self.logger.debug(f'Compressed {compressed_file_path}') - if record_observations or self.get_config('observations.record_observations', default=False): - self.logger.debug(f"Adding current observation to db: {image_id}") - metadata['status'] = 'complete' - self.db.insert_current('images', metadata, store_permanently=False) - - if should_upload: - self.logger.debug(f"Uploading current observation: {image_id}") - try: - self.upload_exposure(exposure_info=exposure) - except Exception as e: - self.logger.warning(f'Problem uploading exposure: {e!r}') + bucket_name = self.get_config('panoptes_network.buckets.upload') + # Get the images directory. + images_dir = Path( + self.get_config( + 'directories.images', + default=Path('~/images') + ) + ).expanduser().as_posix() + pretty_image_path = None if make_pretty_images or self.get_config('observations.make_pretty_images', default=False): try: image_title = f'{field_name} [{exptime}s] {seq_id}' @@ -523,19 +520,47 @@ def process_observation( link_path = Path(self.get_config('directories.images')) / 'latest.jpg' self.logger.debug(f"Making pretty image for {cr2_file_path=!r}") - pretty_image_path = img_utils.make_pretty_image(cr2_file_path, title=image_title, link_path=link_path) + pretty_image_path = img_utils.make_pretty_image( + cr2_file_path, title=image_title, link_path=link_path + ) self.logger.debug(f"Pretty image created: {pretty_image_path}") self.logger.debug(f'Pretty image linked to {link_path}') - if should_upload: - public_url = image_uploader( - file_path=pretty_image_path, - bucket_path=f'{unit_id}/{seq_id}/{image_id}.jpg', - bucket_name='panoptes-images-pretty' - ) - self.logger.info(f"Pretty image uploaded: {public_url}") except Exception as e: # pragma: no cover self.logger.warning(f'Problem with extracting pretty image: {e!r}') + if should_upload: + self.logger.debug(f"Uploading current observation: {image_id}") + try: + image_path = exposure.path.as_posix() + self.logger.debug(f'Preparing {image_path=} for upload to {bucket_name=}') + + # Remove images directory from path so it's stored in bucket relative to images directory. + bucket_path = Path(image_path[image_path.find(images_dir) + len(images_dir):]) + + self.logger.debug(f'Adding {unit_id=} to {bucket_path=}') + bucket_path = Path(unit_id) / bucket_path.relative_to('/') + + # Upload FITS. + metadata['fits_public_url'] = image_uploader( + file_path=exposure.path, + bucket_path=bucket_path.as_posix(), + bucket_name=bucket_name + ) + # Upload pretty image. + if pretty_image_path: + metadata['pretty_image_url'] = image_uploader( + file_path=pretty_image_path, + bucket_path=bucket_path.with_suffix('.jpg').as_posix().replace('.fits', ''), + bucket_name=bucket_name + ) + except Exception as e: + self.logger.warning(f'Problem uploading exposure: {e!r}') + + if record_observations or self.get_config('observations.record_observations', default=False): + self.logger.debug(f"Adding current observation to db: {image_id}") + metadata['status'] = 'complete' + self.db.insert_current('images', metadata, store_permanently=False) + def analyze_recent(self): """Analyze the most recent exposure @@ -583,40 +608,6 @@ def analyze_recent(self): return self.current_offset_info - def upload_exposure(self, exposure_info, bucket_name=None): - """Uploads the most recent image from the current observation.""" - image_path = exposure_info.path - if not image_path.exists(): - raise FileNotFoundError(f'File does not exist: {image_path.as_posix()}') - - bucket_name = bucket_name or self.get_config('panoptes_network.buckets.upload') - - self.logger.debug(f'Preparing {image_path=} for upload to {bucket_name=}') - - # Get the images directory. - images_dir = Path(self.get_config('directories.images', default=Path('~/images'))).expanduser().as_posix() - - # Remove images directory from path so it's stored in bucket relative to images directory. - bucket_path = Path(image_path.as_posix()[image_path.as_posix().find(images_dir) + len(images_dir):]) - # Prepend the PANOPTES unit id to the bucket path. - pan_id = self.get_config('pan_id') - self.logger.debug(f'Adding {pan_id=} to {bucket_path=}') - bucket_path = Path(pan_id) / bucket_path.relative_to('/') - - # Create a separate process for the upload. - upload_process = Process( - name=f'ImageUploaderProcess-{exposure_info.image_id}', - target=image_uploader, - kwargs=dict( - file_path=image_path, - bucket_path=bucket_path.as_posix(), - bucket_name=bucket_name - ) - ) - - self.logger.debug(f'Uploading {str(image_path)} to {bucket_path} on {bucket_name}') - upload_process.start() - def update_tracking(self, **kwargs): """Update tracking with rate adjustment. diff --git a/src/panoptes/pocs/utils/cloud.py b/src/panoptes/pocs/utils/cloud.py index 646c7a5a8..a71eea595 100644 --- a/src/panoptes/pocs/utils/cloud.py +++ b/src/panoptes/pocs/utils/cloud.py @@ -17,7 +17,7 @@ def upload_image(file_path: Path, blob = bucket.blob(bucket_path) logger.debug(f'Uploading {file_path} to {bucket_name}/{bucket_path}') - blob.upload_from_filename(str(file_path), timeout=timeout) - logger.debug(f'File successfully uploaded to {blob.public_url}') + blob.upload_from_filename(file_path.as_posix(), timeout=timeout) + logger.info(f'File successfully uploaded to {blob.public_url}') return blob.public_url