From f544638fbbd90f6c93baa6fd05eb775c73557188 Mon Sep 17 00:00:00 2001 From: Wilfred Tyler Gee Date: Mon, 5 Feb 2018 10:37:11 +1100 Subject: [PATCH] Massive commit of decoupling from mongo and addinng a PanFileDB type (#414) * PanDB is a factory class that will load either PanMongoDB or PanFileDB * PanFileDB merely writes the json to a flat file * One file per "collection" along with a `current_ 0: + if store_result and len(sensor_data) > 0: if self.db is None: - self.db = PanMongo() - self.logger.info('Connected to PanMongo') + self.db = PanDB() self.db.insert_current('environment', sensor_data) return sensor_data diff --git a/peas/weather.py b/peas/weather.py index 108e0685a..967899d59 100755 --- a/peas/weather.py +++ b/peas/weather.py @@ -19,8 +19,8 @@ def get_mongodb(): - from pocs.utils.database import PanMongo - return PanMongo() + from pocs.utils.database import PanDB + return PanDB() def movingaverage(interval, window_size): @@ -101,7 +101,7 @@ class AAGCloudSensor(object): """ - def __init__(self, serial_address=None, use_mongo=True): + def __init__(self, serial_address=None, store_result=True): self.config = load_config(config_files='peas') self.logger = get_root_logger() @@ -111,7 +111,7 @@ def __init__(self, serial_address=None, use_mongo=True): self.safety_delay = self.cfg.get('safety_delay', 15.) self.db = None - if use_mongo: + if store_result: self.db = get_mongodb() self.messaging = None @@ -610,7 +610,7 @@ def send_message(self, msg, channel='weather'): self.messaging.send_message(channel, msg) - def capture(self, use_mongo=False, send_message=False, **kwargs): + def capture(self, store_result=False, send_message=False, **kwargs): """ Query the CloudWatcher """ self.logger.debug("Updating weather") @@ -662,7 +662,7 @@ def capture(self, use_mongo=False, send_message=False, **kwargs): if send_message: self.send_message({'data': data}, channel='weather') - if use_mongo: + if store_result: self.db.insert_current('weather', data) return data diff --git a/pocs/base.py b/pocs/base.py index 4d9d5b92b..0a7422a39 100644 --- a/pocs/base.py +++ b/pocs/base.py @@ -3,7 +3,7 @@ from pocs import hardware from pocs import __version__ from pocs.utils import config -from pocs.utils.database import PanMongo +from pocs.utils.database import PanDB from pocs.utils.logger import get_root_logger # Global vars @@ -49,9 +49,22 @@ def __init__(self, *args, **kwargs): self.config['simulator'] = hardware.get_simulator_names(config=self.config, kwargs=kwargs) - # Set up connection to database - db = kwargs.get('db', self.config['db']['name']) - _db = PanMongo(db=db, logger=self.logger) + # Get passed DB or set up new connection + _db = kwargs.get('db', None) + if _db is None: + # If the user requests a db_type then update runtime config + db_type = kwargs.get('db_type', None) + db_name = kwargs.get('db_name', None) + + if db_type is not None: + self.config['db']['type'] = db_type + if db_name is not None: + self.config['db']['name'] = db_name + + db_type = self.config['db']['type'] + db_name = self.config['db']['name'] + + _db = PanDB(db_type=db_type, db_name=db_name, logger=self.logger).db self.db = _db diff --git a/pocs/camera/canon_gphoto2.py b/pocs/camera/canon_gphoto2.py index c92c8d047..573f7d42b 100644 --- a/pocs/camera/canon_gphoto2.py +++ b/pocs/camera/canon_gphoto2.py @@ -7,8 +7,6 @@ from pocs.utils import current_time from pocs.utils import error -from pocs.utils import images as img_utils -from pocs.utils.images import fits as fits_utils from pocs.utils.images import cr2 as cr2_utils from pocs.camera import AbstractGPhotoCamera diff --git a/pocs/camera/sbig.py b/pocs/camera/sbig.py index 13983bb00..8710eac0d 100644 --- a/pocs/camera/sbig.py +++ b/pocs/camera/sbig.py @@ -2,7 +2,6 @@ from warnings import warn from astropy import units as u -from astropy.io import fits from pocs.camera import AbstractCamera from pocs.camera.sbigudrv import INVALID_HANDLE_VALUE diff --git a/pocs/camera/simulator.py b/pocs/camera/simulator.py index 34331ab07..d44a8fc07 100644 --- a/pocs/camera/simulator.py +++ b/pocs/camera/simulator.py @@ -8,7 +8,6 @@ from astropy import units as u from astropy.io import fits -from astropy.time import Time from pocs.camera import AbstractCamera from pocs.utils.images import fits as fits_utils @@ -76,8 +75,9 @@ def take_exposure(self, seconds=1.0 * u.second, filename=None, dark=False, block # Build FITS header header = self._fits_header(seconds, dark) - # Set up a Timer that will wait for the duration of the exposure then copy a dummy FITS file - # to the specified path and adjust the headers according to the exposure time, type. + # Set up a Timer that will wait for the duration of the exposure then + # copy a dummy FITS file to the specified path and adjust the headers + # according to the exposure time, type. exposure_event = Event() exposure_thread = Timer(interval=seconds, function=self._fake_exposure, diff --git a/pocs/core.py b/pocs/core.py index 4fc1aade1..1e47b1985 100644 --- a/pocs/core.py +++ b/pocs/core.py @@ -222,8 +222,7 @@ def power_down(self): if self.connected: self.say("I'm powering down") self.logger.info( - "Shutting down {}, please be patient and allow for exit.".format( - self.name)) + "Shutting down {}, please be patient and allow for exit.", self.name) if not self.observatory.close_dome(): self.logger.critical('Unable to close dome!') @@ -347,10 +346,9 @@ def is_weather_safe(self, stale=180): bool: Conditions are safe (True) or unsafe (False) """ - assert self.db.current, self.logger.warning( - "No connection to sensors, can't check weather safety") # Always assume False + self.logger.debug("Checking weather safety") is_safe = False record = {'safe': False} @@ -366,15 +364,16 @@ def is_weather_safe(self, stale=180): record = self.db.get_current('weather') is_safe = record['data'].get('safe', False) - timestamp = record['date'] + timestamp = record['date'].replace(tzinfo=None) # current_time is timezone naive age = (current_time().datetime - timestamp).total_seconds() self.logger.debug( "Weather Safety: {} [{:.0f} sec old - {}]".format(is_safe, age, timestamp)) - except TypeError as e: - self.logger.warning("No record found in Mongo DB") - self.logger.debug('DB: {}'.format(self.db.current)) + except (TypeError, KeyError) as e: + self.logger.warning("No record found in DB: {}", e) + except BaseException as e: + self.logger.error("Error checking weather: {}", e) else: if age > stale: self.logger.warning("Weather record looks stale, marking unsafe.") @@ -445,10 +444,11 @@ def wait_until_safe(self): ################################################################################################## -# Private Methods +# Class Methods ################################################################################################## - def _check_environment(self): + @classmethod + def check_environment(cls): """ Checks to see if environment is set up correctly There are a number of environmental variables that are expected @@ -476,6 +476,10 @@ def _check_environment(self): print("Creating log dir at {}/logs".format(pandir)) os.makedirs("{}/logs".format(pandir)) +################################################################################################## +# Private Methods +################################################################################################## + def _check_messages(self, queue_type, q): cmd_dispatch = { 'command': { diff --git a/pocs/observatory.py b/pocs/observatory.py index 5ceba4d7f..260170125 100644 --- a/pocs/observatory.py +++ b/pocs/observatory.py @@ -304,16 +304,13 @@ def analyze_recent(self): self.logger.debug('Offset Info: {}'.format( self.current_offset_info)) - # Update the observation info with the offsets - self.db.observations.update({'data.image_id': image_id}, { - '$set': { - 'offset_info': { - 'd_ra': self.current_offset_info.delta_ra.value, - 'd_dec': self.current_offset_info.delta_dec.value, - 'magnitude': self.current_offset_info.magnitude.value, - 'unit': 'arcsec' - } - }, + # Store the offset information + self.db.insert('offset_info', { + 'image_id': image_id, + 'd_ra': self.current_offset_info.delta_ra.value, + 'd_dec': self.current_offset_info.delta_dec.value, + 'magnitude': self.current_offset_info.magnitude.value, + 'unit': 'arcsec', }) except error.SolveError: diff --git a/pocs/tests/conftest.py b/pocs/tests/conftest.py index f37df7f16..18b2df8a2 100644 --- a/pocs/tests/conftest.py +++ b/pocs/tests/conftest.py @@ -8,7 +8,8 @@ import pocs.base from pocs.utils.config import load_config -from pocs.utils.database import PanMongo +from pocs.utils.database import PanDB +from pocs.utils.logger import get_root_logger # Global variable with the default config; we read it once, copy it each time it is needed. _one_time_config = None @@ -37,9 +38,36 @@ def config_with_simulated_dome(config): return config -@pytest.fixture -def db(): - return PanMongo(db='panoptes_testing') +@pytest.fixture(scope='function', params=['mongo', 'file']) +def db(request): + try: + _db = PanDB( + db_type=request.param, + db_name='panoptes_testing', + logger=get_root_logger(), + connect=True + ) + except Exception: + pytest.skip("Can't connect to {} DB, skipping".format(request.param)) + + return _db + + +@pytest.fixture(scope='function', params=['mongo', 'file']) +def db_type(request): + # If testing mongo, make sure we can connect, otherwise skip + if request.param == 'mongo': + try: + PanDB( + db_type='mongo', + db_name='panoptes_testing', + logger=get_root_logger(), + connect=True + ) + except Exception: + pytest.skip("Can't connect to {} DB, skipping".format(request.param)) + + return request.param @pytest.fixture diff --git a/pocs/tests/test_messaging.py b/pocs/tests/test_messaging.py index 1ffb12e53..bd40a827a 100644 --- a/pocs/tests/test_messaging.py +++ b/pocs/tests/test_messaging.py @@ -66,13 +66,12 @@ def test_send_datetime(forwarder, sub, pub): assert msg_obj['date'] == '2017-01-01T00:00:00' -def test_mongo_objectid(forwarder, sub, pub, config, db): +def test_storage_id(forwarder, sub, pub, config, db): - db.insert_current('config', {'foo': 'bar'}) + id0 = db.insert_current('config', {'foo': 'bar'}, store_permanently=False) pub.send_message('TEST-CHANNEL', db.get_current('config')) msg_type, msg_obj = sub.receive_message() assert '_id' in msg_obj assert isinstance(msg_obj['_id'], str) - - db.current.remove({'type': 'config'}) + assert id0 == msg_obj['_id'] diff --git a/pocs/tests/test_panmongo.py b/pocs/tests/test_panmongo.py index 339c81780..99407e353 100644 --- a/pocs/tests/test_panmongo.py +++ b/pocs/tests/test_panmongo.py @@ -5,43 +5,27 @@ def test_insert_and_get_current(db): rec = {'test': 'insert'} db.insert_current('config', rec) - record = db.config.find_one({'data.test': {'$exists': 1}}) - assert record['data']['test'] == rec['test'] - - record = db.current.find_one({'type': 'config'}) - assert record['data']['test'] == rec['test'] - record = db.get_current('config') assert record['data']['test'] == rec['test'] - db.config.remove({'data.test': 'insert'}) - record = db.config.find({'data.test': {'$exists': 1}}) - assert record.count() == 0 - - db.current.remove({'type': 'config'}) - -def test_insert_and_no_collection(db): +def test_insert_and_no_permanent(db): rec = {'test': 'insert'} - db.insert_current('config', rec, include_collection=False) + id0 = db.insert_current('config', rec, store_permanently=False) record = db.get_current('config') assert record['data']['test'] == rec['test'] - record = db.config.find({'data.test': {'$exists': 1}}) - assert record.count() == 0 - - db.current.remove({'type': 'config'}) + record = db.find('config', id0) + assert record is None def test_simple_insert(db): rec = {'test': 'insert'} - db.insert('config', rec) + id0 = db.insert('config', rec) - record = db.config.find({'data.test': {'$exists': 1}}) - assert record.count() == 1 - - db.current.remove({'type': 'config'}) + record = db.find('config', id0) + assert record['data']['test'] == rec['test'] # Filter out (hide) "UserWarning: Collection not available" diff --git a/pocs/tests/test_pocs.py b/pocs/tests/test_pocs.py index 432a7fc3b..b9786b9e8 100644 --- a/pocs/tests/test_pocs.py +++ b/pocs/tests/test_pocs.py @@ -13,8 +13,13 @@ @pytest.fixture(scope='function') -def observatory(config): - observatory = Observatory(config=config, simulator=['all'], ignore_local_config=True) +def observatory(config, db_type): + observatory = Observatory( + config=config, + simulator=['all'], + ignore_local_config=True, + db_type=db_type + ) return observatory @@ -25,7 +30,7 @@ def pocs(config, observatory): pocs = POCS(observatory, run_once=True, config=config, - ignore_local_config=True, db='panoptes_testing') + ignore_local_config=True) pocs.observatory.scheduler.fields_list = [ {'name': 'Wasp 33', @@ -43,17 +48,19 @@ def pocs(config, observatory): @pytest.fixture(scope='function') -def pocs_with_dome(config_with_simulated_dome): +def pocs_with_dome(config_with_simulated_dome, db_type): os.environ['POCSTIME'] = '2016-08-13 13:00:00' simulator = hardware.get_all_names(without=['dome']) observatory = Observatory(config=config_with_simulated_dome, simulator=simulator, - ignore_local_config=True) + ignore_local_config=True, + db_type=db_type + ) pocs = POCS(observatory, run_once=True, config=config_with_simulated_dome, - ignore_local_config=True, db='panoptes_testing') + ignore_local_config=True) pocs.observatory.scheduler.fields_list = [ {'name': 'Wasp 33', @@ -74,7 +81,7 @@ def test_bad_pandir_env(pocs): pandir = os.getenv('PANDIR') os.environ['PANDIR'] = '/foo/bar' with pytest.raises(SystemExit): - pocs._check_environment() + POCS.check_environment() os.environ['PANDIR'] = pandir @@ -82,7 +89,7 @@ def test_bad_pocs_env(pocs): pocs_dir = os.getenv('POCS') os.environ['POCS'] = '/foo/bar' with pytest.raises(SystemExit): - pocs._check_environment() + POCS.check_environment() os.environ['POCS'] = pocs_dir @@ -92,7 +99,7 @@ def test_make_log_dir(pocs): old_pandir = os.environ['PANDIR'] os.environ['PANDIR'] = os.getcwd() - pocs._check_environment() + POCS.check_environment() assert os.path.exists(log_dir) is True os.removedirs(log_dir) @@ -168,7 +175,7 @@ def test_is_weather_safe_simulator(pocs): assert pocs.is_weather_safe() is True -def test_is_weather_safe_no_simulator(pocs, db): +def test_is_weather_safe_no_simulator(pocs): pocs.initialize() pocs.config['simulator'] = ['camera', 'mount', 'night'] @@ -176,7 +183,7 @@ def test_is_weather_safe_no_simulator(pocs, db): os.environ['POCSTIME'] = '2016-08-13 23:00:00' # Insert a dummy weather record - db.insert_current('weather', {'safe': True}) + pocs.db.insert_current('weather', {'safe': True}) assert pocs.is_weather_safe() is True # Set a time 181 seconds later @@ -184,7 +191,7 @@ def test_is_weather_safe_no_simulator(pocs, db): assert pocs.is_weather_safe() is False -def test_run_wait_until_safe(db, observatory): +def test_run_wait_until_safe(observatory): os.environ['POCSTIME'] = '2016-08-13 23:00:00' def start_pocs(): @@ -192,13 +199,13 @@ def start_pocs(): pocs = POCS(observatory, messaging=True, safe_delay=15) - pocs.db.current.remove({}) pocs.initialize() pocs.logger.info('Starting observatory run') assert pocs.is_weather_safe() is False pocs.send_message('RUNNING') pocs.run(run_once=True, exit_when_done=True) assert pocs.is_weather_safe() is True + pocs.power_down() pub = PanMessaging.create_publisher(6500) sub = PanMessaging.create_subscriber(6511) @@ -210,13 +217,12 @@ def start_pocs(): while True: msg_type, msg_obj = sub.receive_message() if msg_obj is None: - time.sleep(2) continue if msg_obj.get('message', '') == 'RUNNING': time.sleep(2) # Insert a dummy weather record to break wait - db.insert_current('weather', {'safe': True}) + observatory.db.insert_current('weather', {'safe': True}) if msg_type == 'STATUS': current_state = msg_obj.get('state', {}) @@ -224,8 +230,6 @@ def start_pocs(): pub.send_message('POCS-CMD', 'shutdown') break - time.sleep(0.5) - pocs_process.join() assert pocs_process.is_alive() is False @@ -250,6 +254,7 @@ def test_unsafe_park(pocs): pocs.clean_up() pocs.goto_sleep() assert pocs.state == 'sleeping' + pocs.power_down() def test_power_down_while_running(pocs): @@ -311,6 +316,7 @@ def test_run_complete(pocs): pocs.run(exit_when_done=True, run_once=True) assert pocs.state == 'sleeping' + pocs.power_down() def test_run_power_down_interrupt(observatory): @@ -326,6 +332,7 @@ def start_pocs(): }] pocs.logger.info('Starting observatory run') pocs.run() + pocs.power_down() pocs_process = Process(target=start_pocs) pocs_process.start() @@ -380,8 +387,8 @@ def test_pocs_park_to_ready_without_obs(config, observatory): messaging=True, run_once=False, config=config, - ignore_local_config=True, - db='panoptes_testing') + ignore_local_config=True + ) pocs.observatory.scheduler.fields_list = [ {'name': 'Wasp 33', diff --git a/pocs/utils/__init__.py b/pocs/utils/__init__.py index b01c7d018..4e91b6f40 100644 --- a/pocs/utils/__init__.py +++ b/pocs/utils/__init__.py @@ -3,6 +3,7 @@ import shutil import subprocess + from astropy import units as u from astropy.coordinates import AltAz from astropy.coordinates import ICRS @@ -36,7 +37,8 @@ def current_time(flatten=False, datetime=False, pretty=False): _time = _time.isot.split('.')[0].replace('T', ' ') if datetime: - _time = _time.datetime + # Add UTC timezone + _time = _time.to_datetime() return _time diff --git a/pocs/utils/database.py b/pocs/utils/database.py index 1a63ae63e..78b5f2d41 100644 --- a/pocs/utils/database.py +++ b/pocs/utils/database.py @@ -1,14 +1,13 @@ -from datetime import date -from datetime import datetime -import gzip -import json -from bson import json_util import os import pymongo -from warnings import warn import weakref +from warnings import warn +from uuid import uuid4 +from glob import glob from pocs.utils import current_time +from pocs.utils import serializers as json_util +from pocs.utils.config import load_config _shared_mongo_clients = weakref.WeakValueDictionary() @@ -22,14 +21,77 @@ def get_shared_mongo_client(host, port, connect): return client except KeyError: pass - client = pymongo.MongoClient(host, port, connect=connect) + + client = pymongo.MongoClient( + host, + port, + connect=connect, + connectTimeoutMS=2500, + serverSelectionTimeoutMS=2500 + ) + _shared_mongo_clients[key] = client return client -class PanMongo(object): +class PanDB(object): + """ Simple class to load the appropriate DB type """ + + def __init__(self, db_type=None, logger=None, *args, **kwargs): - def __init__(self, db='panoptes', host='localhost', port=27017, connect=False, logger=None): + if logger is not None: + self.logger = logger + + if db_type is None: + db_type = load_config['db']['type'] + + self.collections = [ + 'config', + 'current', + 'drift_align', + 'environment', + 'mount', + 'observations', + 'offset_info', + 'state', + 'weather', + ] + + self.db = None + + if db_type == 'mongo': + try: + self.db = PanMongoDB(collections=self.collections, *args, **kwargs) + except Exception: + raise Exception( + "Can't connect to mongo, please check settings or change DB storage type") + + if db_type == 'file': + self.db = PanFileDB(collections=self.collections, *args, **kwargs) + + def insert(self, *args, **kwargs): + return self.db.insert(*args, **kwargs) + + def insert_current(self, *args, **kwargs): + return self.db.insert_current(*args, **kwargs) + + def get_current(self, *args, **kwargs): + return self.db.get_current(*args, **kwargs) + + def find(self, *args, **kwargs): + return self.db.find(*args, **kwargs) + + +class PanMongoDB(object): + + def __init__(self, + db_name='panoptes', + host='localhost', + port=27017, + connect=False, + collections=list(), + *args, **kwargs + ): """Connection to the running MongoDB instance This is a collection of parameters that are initialized when the unit @@ -48,61 +110,47 @@ def __init__(self, db='panoptes', host='localhost', port=27017, connect=False, l or databases. Args: - db (str, optional): Name of the database containing the PANOPTES collections. + db_name (str, optional): Name of the database containing the PANOPTES collections. host (str, optional): hostname running MongoDB. port (int, optional): port running MongoDb. connect (bool, optional): Connect to mongo on create, defaults to True. logger (None, optional): An instance of the logger. """ - if logger is not None: - self.logger = logger # Get the mongo client self._client = get_shared_mongo_client(host, port, connect) # Pre-defined list of collections that are valid. - self.collections = [ - 'config', - 'current', - 'drift_align', - 'environment', - 'mount', - 'observations', - 'state', - 'weather', - ] + self.collections = collections # Create an attribute on the client with the db name. - db_handle = self._client[db] + db_handle = self._client[db_name] # Setup static connections to the collections we want. for collection in self.collections: # Add the collection as an attribute setattr(self, collection, getattr(db_handle, collection)) - def _warn(self, *args, **kwargs): - if hasattr(self, 'logger'): - self.logger.warning(*args, **kwargs) - else: - warn(*args) + # Clear out the `current` collection + self.current.remove() - def insert_current(self, collection, obj, include_collection=True): + def insert_current(self, collection, obj, store_permanently=True): """Insert an object into both the `current` collection and the collection provided. Args: collection (str): Name of valid collection within panoptes db. obj (dict or str): Object to be inserted. - include_collection (bool): Whether to also update the collection, + store_permanently (bool): Whether to also update the collection, defaults to True. Returns: - str: Mongo object ID of record. If `include_collection` is True, will + str: Mongo object ID of record. If `store_permanently` is True, will be the id of the object in the `collection`, otherwise will be the id of object in the `current` collection. """ - if include_collection: - assert collection in self.collections, self._warn("Collection not available") + if store_permanently: + assert collection in self.collections, self._warn("Collection type not available") _id = None try: @@ -117,12 +165,14 @@ def insert_current(self, collection, obj, include_collection=True): {'type': collection}, current_obj, True # True for upsert ).upserted_id - if include_collection: + if store_permanently: _id = self.insert(collection, current_obj) + elif _id is None: + _id = self.get_current(collection)['_id'] except Exception as e: self._warn("Problem inserting object into collection: {}, {!r}".format(e, current_obj)) - return _id + return str(_id) def insert(self, collection, obj): """Insert an object into the collection provided. @@ -139,7 +189,7 @@ def insert(self, collection, obj): Returns: str: Mongo object ID of record in `collection`. """ - assert collection in self.collections, self._warn("Collection not available") + assert collection in self.collections, self._warn("Collection type not available") _id = None try: @@ -171,133 +221,173 @@ def get_current(self, collection): """ return self.current.find_one({'type': collection}) - def export(self, - yesterday=True, - start_date=None, - end_date=None, - collections=['all'], - backup_dir=None, - compress=True): # pragma: no cover - """Exports the mongodb to an external file + def find(self, type, id): + """Find an object by it's id. Args: - yesterday (bool, optional): Export only yesterday, defaults to True - start_date (str, optional): Start date for export if `yesterday` is False, - defaults to None, e.g. 2016-01-01 - end_date (None, optional): End date for export if `yesterday is False, - defaults to None, e.g. 2016-01-31 - collections (list, optional): Which collections to include, defaults to all - backup_dir (str, optional): Backup directory, defaults to /backups - compress (bool, optional): Compress output file with gzip, defaults to True + type (str): Collection to search for object. + id (ObjectID|str): Mongo object id str. Returns: - list: List of saved files + dict|None: Object matching id or None. """ - if backup_dir is None: - backup_dir = '{}/backups/'.format(os.getenv('PANDIR', default='/var/panoptes/')) - - if not os.path.exists(backup_dir): - warn("Creating backup dir") - os.makedirs(backup_dir) + collection = getattr(self, type) + return collection.find_one({'_id': id}) - if yesterday: - start_dt = (current_time() - 1. * u.day).datetime - start = datetime(start_dt.year, start_dt.month, start_dt.day, 0, 0, 0, 0) - end = datetime(start_dt.year, start_dt.month, start_dt.day, 23, 59, 59, 0) + def _warn(self, *args, **kwargs): + if hasattr(self, 'logger'): + self.logger.warning(*args, **kwargs) else: - assert start_date, warn("start-date required if not using yesterday") + warn(*args) - y, m, d = [int(x) for x in start_date.split('-')] - start_dt = date(y, m, d) - if end_date is None: - end_dt = start_dt - else: - y, m, d = [int(x) for x in end_date.split('-')] - end_dt = date(y, m, d) +class PanFileDB(object): + + def __init__(self, db_name='panoptes', collections=list(), *args, **kwargs): + """Flat file storage for json records - start = datetime.fromordinal(start_dt.toordinal()) - end = datetime(end_dt.year, end_dt.month, end_dt.day, 23, 59, 59, 0) + This will simply store each json record inside a file corresponding + to the type. Each entry will be stored in a single line. + """ - if 'all' in collections: - collections = self.collections + self.db_folder = db_name - date_str = start.strftime('%Y-%m-%d') - end_str = end.strftime('%Y-%m-%d') - if end_str != date_str: - date_str = '{}_to_{}'.format(date_str, end_str) + # Pre-defined list of collections that are valid. + self.collections = collections - out_files = list() + # Set up storage directory + self._storage_dir = '{}/json_store/{}'.format(os.environ['PANDIR'], self.db_folder) + os.makedirs(self._storage_dir, exist_ok=True) - console.color_print( - "Exporting collections: ", - 'default', - "\t{}".format( - date_str.replace( - '_', - ' ')), - 'yellow') - for collection in collections: - if collection not in self.collections: - next - console.color_print("\t{}".format(collection)) + # Clear out any `current_X` files + for current_f in glob(os.path.join(self._storage_dir, 'current_*')): + os.remove(current_f) - out_file = '{}{}_{}.json'.format(backup_dir, date_str.replace('-', ''), collection) + def insert_current(self, collection, obj, store_permanently=True): + """Insert an object into both the `current` collection and the collection provided. - col = getattr(self, collection) - try: - entries = [x for x in col.find({'date': {'$gt': start, '$lt': end}} - ).sort([('date', pymongo.ASCENDING)])] - except pymongo.errors.OperationFailure: - entries = [x for x in col.find({'date': {'$gt': start, '$lt': end}})] - - if len(entries): - console.color_print("\t\t{} records exported".format(len(entries)), 'yellow') - content = json.dumps(entries, default=json_util.default) - write_type = 'w' - - if compress: - console.color_print("\t\tCompressing...", 'lightblue') - content = gzip.compress(bytes(content, 'utf8')) - out_file = out_file + '.gz' - write_type = 'wb' - - with open(out_file, write_type)as f: - console.color_print("\t\tWriting file: ", 'lightblue', out_file, 'yellow') - f.write(content) - - out_files.append(out_file) + Args: + collection (str): Name of valid collection within panoptes db. + obj (dict or str): Object to be inserted. + store_permanently (bool): Whether to also update the collection, + defaults to True. + + Returns: + str: UUID of record. If `store_permanently` is True, will + be the id of the object in the `collection`, otherwise will be the + id of object in the `current` collection. + """ + if store_permanently: + assert collection in self.collections, self._warn("Collection type not available") + + _id = self._make_id() + try: + current_obj = { + '_id': _id, + 'type': collection, + 'data': obj, + 'date': current_time(datetime=True), + } + + current_fn = os.path.join(self._storage_dir, 'current_{}.json'.format(collection)) + + json_util.dumps_file(current_fn, current_obj, clobber=True) + + if store_permanently: + _id = self.insert(collection, current_obj) + except Exception as e: + self._warn("Problem inserting object into collection: {}, {!r}".format(e, current_obj)) + + return _id + + def insert(self, collection, obj): + """Insert an object into the collection provided. + + The `obj` to be stored in a collection should include the `type` + and `date` metadata as well as a `data` key that contains the actual + object data. If these keys are not provided then `obj` will be wrapped + in a corresponding object that does contain the metadata. + + Args: + collection (str): Name of valid collection within panoptes db. + obj (dict or str): Object to be inserted. + + Returns: + str: UUID of record in `collection`. + """ + assert collection in self.collections, self._warn("Collection type not available") + + _id = self._make_id() + try: + # If `data` key is present we assume it has "metadata" (see above). + if isinstance(obj, dict) and 'data' in obj: + # But still check for a `type` + if 'type' not in obj: + obj['type'] = collection else: - console.color_print("\t\tNo records found", 'yellow') - - console.color_print("Output file: {}".format(out_files)) - return out_files - - -if __name__ == '__main__': # pragma: no cover - from astropy.utils import console - from astropy import units as u - - import argparse - - parser = argparse.ArgumentParser(description="Exporter for mongo collections") - parser.add_argument('--yesterday', action="store_true", default=True, - help='Export yesterday, defaults to True unless start-date specified') - parser.add_argument('--start-date', default=None, help='Export start date, e.g. 2016-01-01') - parser.add_argument('--end-date', default=None, help='Export end date, e.g. 2016-01-31') - parser.add_argument( - '--collections', - action="append", - default=['all'], - help='Collections to export') - parser.add_argument( - '--backup-dir', - help='Directory to store backup files, defaults to $PANDIR/backups') - parser.add_argument('--compress', action="store_true", default=True, - help='If exported files should be compressed, defaults to True') - - args = parser.parse_args() - if args.start_date is not None: - args.yesterday = False - - PanMongo().export(**vars(args)) + obj = { + '_id': _id, + 'type': collection, + 'data': obj, + 'date': current_time(datetime=True), + } + + # Insert record into file + collection_fn = os.path.join(self._storage_dir, '{}.json'.format(collection)) + + json_util.dumps_file(collection_fn, obj) + except Exception as e: + self._warn("Problem inserting object into collection: {}, {!r}".format(e, obj)) + + return _id + + def get_current(self, collection): + """Returns the most current record for the given collection + + Args: + collection (str): Name of the collection to get most current from + + Returns: + dict|None: Most recent object of type `collection` or None. + """ + current_fn = os.path.join(self._storage_dir, 'current_{}.json'.format(collection)) + + record = dict() + + try: + record = json_util.loads_file(current_fn) + except FileNotFoundError as e: + self._warn("No record found for {}".format(collection)) + + return record + + def find(self, type, id): + """Find an object by it's id. + + Args: + type (str): Collection to search for object. + id (ObjectID|str): Mongo object id str. + + Returns: + dict|None: Object matching `id` or None. + """ + collection_fn = os.path.join(self._storage_dir, '{}.json'.format(type)) + + obj = None + with open(collection_fn, 'r') as f: + for line in f: + temp_obj = json_util.loads(line) + if temp_obj['_id'] == id: + obj = temp_obj + break + + return obj + + def _make_id(self): + return str(uuid4()) + + def _warn(self, *args, **kwargs): + if hasattr(self, 'logger'): + self.logger.warning(*args, **kwargs) + else: + warn(*args) diff --git a/pocs/utils/serializers.py b/pocs/utils/serializers.py new file mode 100644 index 000000000..eb2b08f27 --- /dev/null +++ b/pocs/utils/serializers.py @@ -0,0 +1,65 @@ +from bson import json_util + + +def dumps(obj): + """Dump an object to JSON. + + Args: + obj (dict): An object to serialize. + + Returns: + str: Serialized representation of object. + """ + return json_util.dumps(obj) + + +def loads(msg): + """Load an object from JSON. + + Args: + msg (str): A serialized string representation of object. + + Returns: + dict: The loaded object. + """ + return json_util.loads(msg) + + +def dumps_file(fn, obj, clobber=False): + """Convenience warpper to dump an object to a a file. + + Args: + fn (str): Path of filename where object representation will be saved. + obj (dict): An object to serialize. + clobber (bool, optional): If object should be overwritten or appended to. + Defaults to False, which will append to file. + + Returns: + str: Filename of the file that was written to. + """ + if clobber is True: + mode = 'w' + else: + mode = 'a' + + with open(fn, mode) as f: + f.write(dumps(obj) + "\n") + + return fn + + +def loads_file(file_path): + """Convenience wrapper to load an object from a file. + + Args: + file_path (str): Path of filename that contains a serialization of the + the object. + + Returns: + dict: The loaded object from the given file. + """ + obj = None + with open(file_path, 'r') as f: + obj = loads(f.read()) + + return obj diff --git a/scripts/simple_weather_capture.py b/scripts/simple_weather_capture.py index 6eb281822..893d38714 100644 --- a/scripts/simple_weather_capture.py +++ b/scripts/simple_weather_capture.py @@ -142,19 +142,19 @@ def write_capture(filename=None, data=None): parser.add_argument('--serial-port', dest='serial_port', default=None, help='Serial port to connect') parser.add_argument('--plotly-stream', action='store_true', default=False, help="Stream to plotly") - parser.add_argument('--store-mongo', action='store_true', default=True, help="Save to mongo") + parser.add_argument('--store-result', action='store_true', default=True, help="Save to db") parser.add_argument('--send-message', action='store_true', default=True, help="Send message") args = parser.parse_args() # Weather object - aag = weather.AAGCloudSensor(serial_address=args.serial_port, use_mongo=args.store_mongo) + aag = weather.AAGCloudSensor(serial_address=args.serial_port, store_result=args.store_result) if args.plotly_stream: streams = None streams = get_plot(filename=args.filename) while True: - data = aag.capture(use_mongo=args.store_mongo, send_message=args.send_message) + data = aag.capture(store_result=args.store_result, send_message=args.send_message) # Save to file if args.filename is not None: