Skip to content

Commit

Permalink
Bunch of fixes to the standard timeseries
Browse files Browse the repository at this point in the history
- convert get_sort_key to an instance method so that there can be separate
  implementations for the aggregate and user specific timeseries
- handle the case in which there is no entry for a particular _id. but then
  where did we get the _id from? we never delete objects. is this a wierd hack
  that should be removed and investigated further?
- deal with searching If there are no entries in the orig_ts_db or
  analysis_ts_db, return an empty list. Why do we deal with orig_ts_db and
  analysis_ts_db differently? In one case, we check to see if there are any
  keys, in the other, we just return all entries.
- Also we only deal with the `sort_key is None` case for the analysis DB, which
  makes sense because we currently only aggregate analysed data, but may be
  good to fix correctly long-term
- handle the case where the user_id for the entry is None by setting it to the
  current timeseries user_id. So we only raise the error if the entry has an
  entry and it is different
  • Loading branch information
shankari committed Jun 24, 2016
1 parent 453b6a8 commit b7f835a
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ def _get_query(self, key_list = None, time_query = None, geo_query = None,
ret_query.update(extra_query)
return ret_query

@staticmethod
def _get_sort_key(time_query = None):
def _get_sort_key(self, time_query = None):
if time_query is None:
return "metadata.write_ts"
else:
Expand All @@ -82,7 +81,11 @@ def _to_df_entry(entry):
return ret_val

def get_entry_from_id(self, key, entry_id):
return ecwe.Entry(self.get_timeseries_db(key).find_one({"_id": entry_id}))
entry_doc = self.get_timeseries_db(key).find_one({"_id": entry_id})
if entry_doc is None:
return None
else:
return ecwe.Entry(entry_doc)

def _split_key_list(self, key_list):
if key_list is None:
Expand All @@ -100,12 +103,23 @@ def find_entries(self, key_list = None, time_query = None, geo_query = None,
(self._get_query(key_list, time_query, geo_query,
extra_query_list), sort_key))
(orig_ts_db_keys, analysis_ts_db_keys) = self._split_key_list(key_list)
orig_ts_db_result = self.timeseries_db.find(
logging.debug("orig_ts_db_keys = %s, analysis_ts_db_keys = %s" %
(orig_ts_db_keys, analysis_ts_db_keys))
# workaround for https://github.com/e-mission/e-mission-server/issues/271
# during the migration
if orig_ts_db_keys is None or len(orig_ts_db_keys) > 0:
orig_ts_db_result = self.timeseries_db.find(
self._get_query(orig_ts_db_keys, time_query, geo_query)).sort(
sort_key, pymongo.ASCENDING)
analysis_ts_db_result = self.analysis_timeseries_db.find(
self._get_query(analysis_ts_db_keys, time_query, geo_query)).sort(
sort_key, pymongo.ASCENDING)
else:
orig_ts_db_result = [].__iter__()

analysis_ts_db_cursor = self.analysis_timeseries_db.find(
self._get_query(analysis_ts_db_keys, time_query, geo_query))
if sort_key is None:
analysis_ts_db_result = analysis_ts_db_cursor
else:
analysis_ts_db_result = analysis_ts_db_cursor.sort(sort_key, pymongo.ASCENDING)
return itertools.chain(orig_ts_db_result, analysis_ts_db_result)

def get_entry_at_ts(self, key, ts_key, ts):
Expand Down Expand Up @@ -155,10 +169,11 @@ def insert(self, entry):
logging.debug("insert called")
if type(entry) == dict:
entry = ecwe.Entry(entry)
if "user_id" not in entry:
if "user_id" not in entry or entry["user_id"] is None:
entry["user_id"] = self.user_id
elif entry["user_id"] != self.user_id:
raise AttributeError("Saving entry for %s in timeseries for %s" % (entry["user_id"], self.user_id))
if entry["user_id"] != self.user_id:
raise AttributeError("Saving entry %s for %s in timeseries for %s" %
(entry, entry["user_id"], self.user_id))
else:
logging.debug("entry was fine, no need to fix it")

Expand Down

0 comments on commit b7f835a

Please sign in to comment.