From b7f835aef4f8ea717d731247b5fee09e693dc93f Mon Sep 17 00:00:00 2001 From: "K. Shankari" Date: Thu, 23 Jun 2016 23:36:50 -0700 Subject: [PATCH] Bunch of fixes to the standard timeseries - convert get_sort_key to an instance method so that there can be separate implementations for the aggregate and user specific timeseries - handle the case in which there is no entry for a particular _id. but then where did we get the _id from? we never delete objects. is this a wierd hack that should be removed and investigated further? - deal with searching If there are no entries in the orig_ts_db or analysis_ts_db, return an empty list. Why do we deal with orig_ts_db and analysis_ts_db differently? In one case, we check to see if there are any keys, in the other, we just return all entries. - Also we only deal with the `sort_key is None` case for the analysis DB, which makes sense because we currently only aggregate analysed data, but may be good to fix correctly long-term - handle the case where the user_id for the entry is None by setting it to the current timeseries user_id. So we only raise the error if the entry has an entry and it is different --- .../storage/timeseries/builtin_timeseries.py | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index fc7419810..5c9a0d996 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -66,8 +66,7 @@ def _get_query(self, key_list = None, time_query = None, geo_query = None, ret_query.update(extra_query) return ret_query - @staticmethod - def _get_sort_key(time_query = None): + def _get_sort_key(self, time_query = None): if time_query is None: return "metadata.write_ts" else: @@ -82,7 +81,11 @@ def _to_df_entry(entry): return ret_val def get_entry_from_id(self, key, entry_id): - return ecwe.Entry(self.get_timeseries_db(key).find_one({"_id": entry_id})) + entry_doc = self.get_timeseries_db(key).find_one({"_id": entry_id}) + if entry_doc is None: + return None + else: + return ecwe.Entry(entry_doc) def _split_key_list(self, key_list): if key_list is None: @@ -100,12 +103,23 @@ def find_entries(self, key_list = None, time_query = None, geo_query = None, (self._get_query(key_list, time_query, geo_query, extra_query_list), sort_key)) (orig_ts_db_keys, analysis_ts_db_keys) = self._split_key_list(key_list) - orig_ts_db_result = self.timeseries_db.find( + logging.debug("orig_ts_db_keys = %s, analysis_ts_db_keys = %s" % + (orig_ts_db_keys, analysis_ts_db_keys)) + # workaround for https://github.com/e-mission/e-mission-server/issues/271 + # during the migration + if orig_ts_db_keys is None or len(orig_ts_db_keys) > 0: + orig_ts_db_result = self.timeseries_db.find( self._get_query(orig_ts_db_keys, time_query, geo_query)).sort( sort_key, pymongo.ASCENDING) - analysis_ts_db_result = self.analysis_timeseries_db.find( - self._get_query(analysis_ts_db_keys, time_query, geo_query)).sort( - sort_key, pymongo.ASCENDING) + else: + orig_ts_db_result = [].__iter__() + + analysis_ts_db_cursor = self.analysis_timeseries_db.find( + self._get_query(analysis_ts_db_keys, time_query, geo_query)) + if sort_key is None: + analysis_ts_db_result = analysis_ts_db_cursor + else: + analysis_ts_db_result = analysis_ts_db_cursor.sort(sort_key, pymongo.ASCENDING) return itertools.chain(orig_ts_db_result, analysis_ts_db_result) def get_entry_at_ts(self, key, ts_key, ts): @@ -155,10 +169,11 @@ def insert(self, entry): logging.debug("insert called") if type(entry) == dict: entry = ecwe.Entry(entry) - if "user_id" not in entry: + if "user_id" not in entry or entry["user_id"] is None: entry["user_id"] = self.user_id - elif entry["user_id"] != self.user_id: - raise AttributeError("Saving entry for %s in timeseries for %s" % (entry["user_id"], self.user_id)) + if entry["user_id"] != self.user_id: + raise AttributeError("Saving entry %s for %s in timeseries for %s" % + (entry, entry["user_id"], self.user_id)) else: logging.debug("entry was fine, no need to fix it")