Skip to content

Commit

Permalink
refactor S3Store to allow extending for OpenDataStore
Browse files Browse the repository at this point in the history
  • Loading branch information
kbuma committed Nov 22, 2023
1 parent 0443064 commit 8b6ac1e
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions src/maggma/stores/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def query(
else:
try:
# TODO: This is ugly and unsafe, do some real checking before pulling data
data = self.s3_bucket.Object(self.sub_dir + str(doc[self.key])).get()["Body"].read()
data = self.s3_bucket.Object(self._get_full_key_path(doc[self.key])).get()["Body"].read()
except botocore.exceptions.ClientError as e:
# If a client error is thrown, then check that it was a NoSuchKey or NoSuchBucket error.
# If it was a NoSuchKey error, then the object does not exist.
Expand All @@ -199,13 +199,16 @@ def query(
raise e

if self.unpack_data:
data = self._unpack(data=data, compressed=doc.get("compression", "") == "zlib")
data = self._read_data(data=data, compress_header=doc.get("compression", ""))

if self.last_updated_field in doc:
data[self.last_updated_field] = doc[self.last_updated_field]

yield data

def _read_data(self, data: bytes, compress_header: str):
return self._unpack(data=data, compressed=compress_header == "zlib")

@staticmethod
def _unpack(data: bytes, compressed: bool):
if compressed:
Expand Down Expand Up @@ -307,6 +310,9 @@ def update(
else:
additional_metadata = list(additional_metadata)

self._write_to_s3_and_index(docs, key, additional_metadata)

def _write_to_s3_and_index(self, docs, key, additional_metadata):
with ThreadPoolExecutor(max_workers=self.s3_workers) as pool:
fs = {
pool.submit(
Expand Down Expand Up @@ -366,6 +372,15 @@ def _get_resource_and_bucket(self):

return resource, bucket

def _get_full_key_path(self, id):
return self.sub_dir + str(id)

def _get_compression_function(self):
return zlib.compress

def _get_decompression_function(self):
return zlib.decompress

def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
"""
Write the data to s3 and return the metadata to be inserted into the index db.
Expand Down Expand Up @@ -393,11 +408,7 @@ def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
if self.compress:
# Compress with zlib if chosen
search_doc["compression"] = "zlib"
data = zlib.compress(data)

if self.last_updated_field in doc:
# need this conversion for aws metadata insert
search_doc[self.last_updated_field] = str(to_isoformat_ceil_ms(doc[self.last_updated_field]))
data = self._get_compression_function()(data)

# keep a record of original keys, in case these are important for the individual researcher
# it is not expected that this information will be used except in disaster recovery
Expand All @@ -407,7 +418,7 @@ def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
search_doc["s3-to-mongo-keys"] = dumps(s3_to_mongo_keys)
s3_bucket.upload_fileobj(
Fileobj=BytesIO(data),
Key=self.sub_dir + str(doc[self.key]),
Key=self._get_full_key_path(str(doc[self.key])),
ExtraArgs={"Metadata": {s3_to_mongo_keys[k]: str(v) for k, v in search_doc.items()}},
)

Expand Down Expand Up @@ -452,7 +463,7 @@ def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
# Can remove up to 1000 items at a time via boto
to_remove_chunks = list(grouper(to_remove, n=1000))
for chunk_to_remove in to_remove_chunks:
objlist = [{"Key": f"{self.sub_dir}{obj}"} for obj in chunk_to_remove]
objlist = [{"Key": self._get_full_key_path(obj)} for obj in chunk_to_remove]
self.s3_bucket.delete_objects(Delete={"Objects": objlist})

@property
Expand Down Expand Up @@ -486,11 +497,11 @@ def rebuild_index_from_s3_data(self, **kwargs):
bucket = self.s3_bucket
objects = bucket.objects.filter(Prefix=self.sub_dir)
for obj in objects:
key_ = self.sub_dir + obj.key
key_ = self._get_full_key_path(obj.key)
data = self.s3_bucket.Object(key_).get()["Body"].read()

if self.compress:
data = zlib.decompress(data)
data = self._get_decompression_function()(data)
unpacked_data = msgpack.unpackb(data, raw=False)
self.update(unpacked_data, **kwargs)

Expand All @@ -504,7 +515,7 @@ def rebuild_metadata_from_index(self, index_query: Optional[dict] = None):
"""
qq = {} if index_query is None else index_query
for index_doc in self.index.query(qq):
key_ = self.sub_dir + index_doc[self.key]
key_ = self._get_full_key_path(index_doc[self.key])
s3_object = self.s3_bucket.Object(key_)
new_meta = {self._sanitize_key(k): v for k, v in s3_object.metadata.items()}
for k, v in index_doc.items():
Expand Down

0 comments on commit 8b6ac1e

Please sign in to comment.