Skip to content

Commit

Permalink
Merge pull request #335 from AxFoundation/update_mongo_storage
Browse files Browse the repository at this point in the history
Create seperate doc for new chunk
  • Loading branch information
JoranAngevaare authored Oct 21, 2020
2 parents ecb7c4b + 1d8ea94 commit 114c284
Showing 1 changed file with 62 additions and 25 deletions.
87 changes: 62 additions & 25 deletions strax/storage/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from strax import StorageFrontend, StorageBackend, Saver
from datetime import datetime
from pytz import utc as py_utc

export, __all__ = strax.exporter()


Expand All @@ -36,22 +35,28 @@ def __init__(self, uri, database, col_name=None):
def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
"""See strax.Backend"""
query = backend_key_to_query(backend_key)
chunk_name = f'chunk_{chunk_info["chunk_i"]}'
chunk_i = chunk_info["chunk_i"]

# Query for the chunk and project the chunk info
doc = self.db[self.col_name].find_one(
{**query, chunk_name: {"$exists": True}},
{f"{chunk_name}": 1})
{**query, "chunk_i": chunk_i},
{f"data": 1})

# Unpack info about this chunk from the query. Return empty if not available.
if doc is None or chunk_name not in doc:
if doc is None:
# Did not find the data
return np.array([], dtype=dtype)
else:
chunk_doc = doc[chunk_name]
chunk_doc = doc.get('data', None)
if chunk_doc is None:
raise ValueError(f'Doc for chunk_{chunk_i} in wrong format:\n{doc}')

# Convert JSON to numpy
result = np.zeros(len(chunk_doc), dtype=dtype)
for i, d in enumerate(chunk_doc):
chunk_len = len(chunk_doc)
result = np.zeros(chunk_len, dtype=dtype)
for i in range(chunk_len):
for key in np.dtype(dtype).names:
result[i][key] = d['data'][key]
result[i][key] = chunk_doc[i][key]
return result

def _saver(self, key, metadata):
Expand Down Expand Up @@ -122,7 +127,6 @@ def __init__(self, key, metadata, col):
for k, rep in (
('run_id', int), ('data_type', str), ('lineage_hash', str)):
basic_meta[k.replace('run_id', 'number')] = rep(self.md[k])
basic_meta['metadata'] = self.md
# Add datetime objects as candidates for TTL collections. Either can
# be used according to the preference of the user to index.
# Two entries can be used:
Expand All @@ -133,46 +137,79 @@ def __init__(self, key, metadata, col):
# _save_chunk_metadata for the first chunk. Nevertheless we need an
# object in case there e.g. is no chunk.
basic_meta['run_start_time'] = datetime.now(py_utc)
# If available later update with this value:
self.run_start = None
# This info should be added to all of the associated documents
self.basic_md = basic_meta

# For the metadata copy this too:
meta_data = basic_meta.copy()
meta_data['metadata'] = self.md

# Save object id to write other data to
self.id = self.col.insert_one(basic_meta).inserted_id
# Save object_ids for fast querying and updates
self.id_md = self.col.insert_one(meta_data).inserted_id
# Also save all the chunks
self.ids_chunk = {}

def _save_chunk(self, data, chunk_info, executor):
def _save_chunk(self, data, chunk_info, executor=None):
"""see strax.Saver"""
chunk_number = chunk_info['chunk_i']
chunk_i = chunk_info['chunk_i']

aggregate_data = []
# Remove the numpy structures and parse the data. The dtype information
# is saved with the metadata so don't worry
for row in data:
ins = {}
for key in list(data.dtype.names):
ins[key] = row[key]
ins = remove_np(ins)
self.col.update_one({'_id': self.id},
{'$addToSet': {f'chunk_{chunk_number}':
{"data": ins}}
})
aggregate_data.append(ins)

# Get the document to update, if none available start a new one for
# this chunk
chunk_id = self.ids_chunk.get(chunk_i, None)
if chunk_id is not None:
self.col.update_one({'_id': chunk_id},
{'$addToSet': {f'data': aggregate_data}})
else:
# Start a new document, update it with the proper information
doc = self.basic_md.copy()
doc['write_time'] = datetime.now(py_utc)
doc['chunk_i'] = chunk_i
doc["data"] = aggregate_data

chunk_id = self.col.insert_one(doc).inserted_id
self.ids_chunk[chunk_i] = chunk_id

return dict(), None

def _save_chunk_metadata(self, chunk_info):
"""see strax.Saver"""
# For the first chunk we update the run_start_time
# For the first chunk we get the run_start_time and update the run-metadata file
if int(chunk_info['chunk_i']) == 0:
t0 = datetime.fromtimestamp(chunk_info['start']/1e9).replace(tzinfo=py_utc)
self.col.update_one({'_id': self.id},
{'$set':
{'run_start_time': t0}})
self.run_start = datetime.fromtimestamp(
chunk_info['start']/1e9).replace(tzinfo=py_utc)

self.col.update_one({'_id': self.id},
self.col.update_one({'_id': self.id_md},
{'$addToSet':
{'metadata.chunks': chunk_info}})

def _close(self):
"""see strax.Saver"""
# First update the run-starts of all of the chunk-documents as this is
# a TTL index -candidate
if self.run_start is not None:
update = {'run_start_time': self.run_start}
query = {k: v for k, v in self.basic_md.items()
if k in ('number', 'data_type', 'lineage_hash')}
self.col.update_many(query, {'$set': update})

# Update the metadata
update = {f'metadata.{k}': v
for k, v in self.md.items()
if k in ('writing_ended', 'exception')}
self.col.update_one({'_id': self.id}, {'$set': update})
# Also update all of the chunk-documents with the run_start_time
self.col.update_one({'_id': self.id_md}, {'$set': update})


def backend_key_to_query(backend_key):
Expand Down

0 comments on commit 114c284

Please sign in to comment.