diff --git a/strax/storage/mongo.py b/strax/storage/mongo.py index 41e0d1882..2e27eda85 100644 --- a/strax/storage/mongo.py +++ b/strax/storage/mongo.py @@ -14,7 +14,6 @@ from strax import StorageFrontend, StorageBackend, Saver from datetime import datetime from pytz import utc as py_utc - export, __all__ = strax.exporter() @@ -36,22 +35,28 @@ def __init__(self, uri, database, col_name=None): def _read_chunk(self, backend_key, chunk_info, dtype, compressor): """See strax.Backend""" query = backend_key_to_query(backend_key) - chunk_name = f'chunk_{chunk_info["chunk_i"]}' + chunk_i = chunk_info["chunk_i"] + # Query for the chunk and project the chunk info doc = self.db[self.col_name].find_one( - {**query, chunk_name: {"$exists": True}}, - {f"{chunk_name}": 1}) + {**query, "chunk_i": chunk_i}, + {f"data": 1}) # Unpack info about this chunk from the query. Return empty if not available. - if doc is None or chunk_name not in doc: + if doc is None: + # Did not find the data return np.array([], dtype=dtype) else: - chunk_doc = doc[chunk_name] + chunk_doc = doc.get('data', None) + if chunk_doc is None: + raise ValueError(f'Doc for chunk_{chunk_i} in wrong format:\n{doc}') + # Convert JSON to numpy - result = np.zeros(len(chunk_doc), dtype=dtype) - for i, d in enumerate(chunk_doc): + chunk_len = len(chunk_doc) + result = np.zeros(chunk_len, dtype=dtype) + for i in range(chunk_len): for key in np.dtype(dtype).names: - result[i][key] = d['data'][key] + result[i][key] = chunk_doc[i][key] return result def _saver(self, key, metadata): @@ -122,7 +127,6 @@ def __init__(self, key, metadata, col): for k, rep in ( ('run_id', int), ('data_type', str), ('lineage_hash', str)): basic_meta[k.replace('run_id', 'number')] = rep(self.md[k]) - basic_meta['metadata'] = self.md # Add datetime objects as candidates for TTL collections. Either can # be used according to the preference of the user to index. # Two entries can be used: @@ -133,14 +137,25 @@ def __init__(self, key, metadata, col): # _save_chunk_metadata for the first chunk. Nevertheless we need an # object in case there e.g. is no chunk. basic_meta['run_start_time'] = datetime.now(py_utc) + # If available later update with this value: + self.run_start = None + # This info should be added to all of the associated documents + self.basic_md = basic_meta + + # For the metadata copy this too: + meta_data = basic_meta.copy() + meta_data['metadata'] = self.md - # Save object id to write other data to - self.id = self.col.insert_one(basic_meta).inserted_id + # Save object_ids for fast querying and updates + self.id_md = self.col.insert_one(meta_data).inserted_id + # Also save all the chunks + self.ids_chunk = {} - def _save_chunk(self, data, chunk_info, executor): + def _save_chunk(self, data, chunk_info, executor=None): """see strax.Saver""" - chunk_number = chunk_info['chunk_i'] + chunk_i = chunk_info['chunk_i'] + aggregate_data = [] # Remove the numpy structures and parse the data. The dtype information # is saved with the metadata so don't worry for row in data: @@ -148,31 +163,53 @@ def _save_chunk(self, data, chunk_info, executor): for key in list(data.dtype.names): ins[key] = row[key] ins = remove_np(ins) - self.col.update_one({'_id': self.id}, - {'$addToSet': {f'chunk_{chunk_number}': - {"data": ins}} - }) + aggregate_data.append(ins) + + # Get the document to update, if none available start a new one for + # this chunk + chunk_id = self.ids_chunk.get(chunk_i, None) + if chunk_id is not None: + self.col.update_one({'_id': chunk_id}, + {'$addToSet': {f'data': aggregate_data}}) + else: + # Start a new document, update it with the proper information + doc = self.basic_md.copy() + doc['write_time'] = datetime.now(py_utc) + doc['chunk_i'] = chunk_i + doc["data"] = aggregate_data + + chunk_id = self.col.insert_one(doc).inserted_id + self.ids_chunk[chunk_i] = chunk_id + return dict(), None def _save_chunk_metadata(self, chunk_info): """see strax.Saver""" - # For the first chunk we update the run_start_time + # For the first chunk we get the run_start_time and update the run-metadata file if int(chunk_info['chunk_i']) == 0: - t0 = datetime.fromtimestamp(chunk_info['start']/1e9).replace(tzinfo=py_utc) - self.col.update_one({'_id': self.id}, - {'$set': - {'run_start_time': t0}}) + self.run_start = datetime.fromtimestamp( + chunk_info['start']/1e9).replace(tzinfo=py_utc) - self.col.update_one({'_id': self.id}, + self.col.update_one({'_id': self.id_md}, {'$addToSet': {'metadata.chunks': chunk_info}}) def _close(self): """see strax.Saver""" + # First update the run-starts of all of the chunk-documents as this is + # a TTL index -candidate + if self.run_start is not None: + update = {'run_start_time': self.run_start} + query = {k: v for k, v in self.basic_md.items() + if k in ('number', 'data_type', 'lineage_hash')} + self.col.update_many(query, {'$set': update}) + + # Update the metadata update = {f'metadata.{k}': v for k, v in self.md.items() if k in ('writing_ended', 'exception')} - self.col.update_one({'_id': self.id}, {'$set': update}) + # Also update all of the chunk-documents with the run_start_time + self.col.update_one({'_id': self.id_md}, {'$set': update}) def backend_key_to_query(backend_key):