From 67f7964cdd60ffe15965b2ac89d310b233178c16 Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Mon, 25 Nov 2024 12:35:29 -0700 Subject: [PATCH 01/20] Add zstd compression to set_task_result in compute service - Update env files to include zstandard - Update set_task_result in compute api and client to handle base64 encoded data. Rather than JSON serialize the ProtocolDAGResult (PDR) and use this is a the intermediate format, instead: 1) create a keyed chain representation of the PDR 2) JSON serialize this representation 3) compress the utf-8 encoded bytes with zstandard 4) encode with base64 - Use the above base64 encoded data as the intermediate format and reverse the operations above to recover the PDR. --- alchemiscale/compute/api.py | 14 +++++++++++--- alchemiscale/compute/client.py | 19 +++++++++++++++---- devtools/conda-envs/alchemiscale-client.yml | 1 + devtools/conda-envs/alchemiscale-compute.yml | 1 + devtools/conda-envs/alchemiscale-server.yml | 1 + devtools/conda-envs/test.yml | 1 + 6 files changed, 30 insertions(+), 7 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 9337055b..a75854a2 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -9,10 +9,12 @@ import json from datetime import datetime, timedelta import random +import base64 from fastapi import FastAPI, APIRouter, Body, Depends from fastapi.middleware.gzip import GZipMiddleware -from gufe.tokenization import GufeTokenizable, JSON_HANDLER +from gufe.tokenization import GufeTokenizable, JSON_HANDLER, KeyedChain +import zstandard as zstd from ..base.api import ( QueryGUFEHandler, @@ -328,8 +330,14 @@ def set_task_result( task_sk = ScopedKey.from_str(task_scoped_key) validate_scopes(task_sk.scope, token) - pdr = json.loads(protocoldagresult, cls=JSON_HANDLER.decoder) - pdr = GufeTokenizable.from_dict(pdr) + # decode b64 and decompress the zstd bytes back into json + protocoldagresult = base64.b64decode(protocoldagresult) + decompressor = zstd.ZstdDecompressor() + protocoldagresult = decompressor.decompress(protocoldagresult) + + pdr_keyed_chain_rep = json.loads(protocoldagresult, cls=JSON_HANDLER.decoder) + pdr_keyed_chain = KeyedChain.from_keyed_chain_rep(pdr_keyed_chain_rep) + pdr = pdr_keyed_chain.to_gufe() tf_sk, _ = n4js.get_task_transformation( task=task_scoped_key, diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index 901a7516..b703459b 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -9,11 +9,14 @@ import json from urllib.parse import urljoin from functools import wraps +import base64 import requests from requests.auth import HTTPBasicAuth -from gufe.tokenization import GufeTokenizable, JSON_HANDLER +import zstandard as zstd + +from gufe.tokenization import GufeTokenizable, JSON_HANDLER, KeyedChain from gufe import Transformation from gufe.protocols import ProtocolDAGResult @@ -128,10 +131,18 @@ def set_task_result( protocoldagresult: ProtocolDAGResult, compute_service_id=Optional[ComputeServiceID], ) -> ScopedKey: + + keyed_chain_rep = KeyedChain.from_gufe(protocoldagresult).to_keyed_chain_rep() + json_rep = json.dumps(keyed_chain_rep, cls=JSON_HANDLER.encoder) + json_bytes = json_rep.encode("utf-8") + + compressor = zstd.ZstdCompressor() + compressed = compressor.compress(json_bytes) + + base64_encoded = base64.b64encode(compressed).decode("utf-8") + data = dict( - protocoldagresult=json.dumps( - protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder - ), + protocoldagresult=base64_encoded, compute_service_id=str(compute_service_id), ) diff --git a/devtools/conda-envs/alchemiscale-client.yml b/devtools/conda-envs/alchemiscale-client.yml index 6f2ae9be..81cfd63f 100644 --- a/devtools/conda-envs/alchemiscale-client.yml +++ b/devtools/conda-envs/alchemiscale-client.yml @@ -15,6 +15,7 @@ dependencies: - httpx - pydantic<2.0 - async-lru + - zstandard ## user client - rich diff --git a/devtools/conda-envs/alchemiscale-compute.yml b/devtools/conda-envs/alchemiscale-compute.yml index f93cd1f3..cd39cce2 100644 --- a/devtools/conda-envs/alchemiscale-compute.yml +++ b/devtools/conda-envs/alchemiscale-compute.yml @@ -15,6 +15,7 @@ dependencies: - httpx - pydantic<2.0 - async-lru + - zstandard # openmm protocols - feflow=0.1.0 diff --git a/devtools/conda-envs/alchemiscale-server.yml b/devtools/conda-envs/alchemiscale-server.yml index ae871cca..00102ab5 100644 --- a/devtools/conda-envs/alchemiscale-server.yml +++ b/devtools/conda-envs/alchemiscale-server.yml @@ -10,6 +10,7 @@ dependencies: # alchemiscale dependencies - gufe=1.1.0 - openfe=1.2.0 + - zstandard - requests - click diff --git a/devtools/conda-envs/test.yml b/devtools/conda-envs/test.yml index fc447422..d55ac617 100644 --- a/devtools/conda-envs/test.yml +++ b/devtools/conda-envs/test.yml @@ -11,6 +11,7 @@ dependencies: - openfe>=1.2.0 - pydantic<2.0 - async-lru + - zstandard ## state store - neo4j-python-driver From 41ad1744b8a973e509777f7849e0286572b00120 Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Mon, 2 Dec 2024 14:12:45 -0700 Subject: [PATCH 02/20] Use request and manually process payload --- alchemiscale/compute/api.py | 13 +++++++++---- alchemiscale/compute/client.py | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index a75854a2..c394bb1c 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -11,7 +11,7 @@ import random import base64 -from fastapi import FastAPI, APIRouter, Body, Depends +from fastapi import FastAPI, APIRouter, Body, Depends, Request from fastapi.middleware.gzip import GZipMiddleware from gufe.tokenization import GufeTokenizable, JSON_HANDLER, KeyedChain import zstandard as zstd @@ -318,15 +318,20 @@ def retrieve_task_transformation( # TODO: support compression performed client-side @router.post("/tasks/{task_scoped_key}/results", response_model=ScopedKey) -def set_task_result( +async def set_task_result( task_scoped_key, *, - protocoldagresult: str = Body(embed=True), - compute_service_id: Optional[str] = Body(embed=True), + request: Request, n4js: Neo4jStore = Depends(get_n4js_depends), s3os: S3ObjectStore = Depends(get_s3os_depends), token: TokenData = Depends(get_token_data_depends), ): + body = await request.body() + body_ = json.loads(body.decode("utf-8"), cls=JSON_HANDLER.decoder) + + protocoldagresult = body_['protocoldagresult'] + compute_service_id = body_['compute_service_id'] + task_sk = ScopedKey.from_str(task_scoped_key) validate_scopes(task_sk.scope, token) diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index b703459b..eb022759 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -129,7 +129,7 @@ def set_task_result( self, task: ScopedKey, protocoldagresult: ProtocolDAGResult, - compute_service_id=Optional[ComputeServiceID], + compute_service_id: Optional[ComputeServiceID] = None, ) -> ScopedKey: keyed_chain_rep = KeyedChain.from_gufe(protocoldagresult).to_keyed_chain_rep() From 54cd184879d5852714bb5ec8c66738d7d9048f86 Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Mon, 2 Dec 2024 15:03:01 -0700 Subject: [PATCH 03/20] Add compression module and send send bytes from API path as latin-1 Use more bytes Move compression and decompression functions to new module Use latin-1 decoded bytes --- alchemiscale/compression.py | 60 +++++++++++++++++++ alchemiscale/compute/api.py | 26 +++----- alchemiscale/compute/client.py | 20 +++---- alchemiscale/interface/api.py | 8 +-- alchemiscale/interface/client.py | 8 +-- alchemiscale/security/auth.py | 1 - alchemiscale/storage/objectstore.py | 46 +++++++------- .../interface/client/test_client.py | 3 +- .../integration/storage/test_objectstore.py | 11 ++-- 9 files changed, 112 insertions(+), 71 deletions(-) create mode 100644 alchemiscale/compression.py diff --git a/alchemiscale/compression.py b/alchemiscale/compression.py new file mode 100644 index 00000000..2004095a --- /dev/null +++ b/alchemiscale/compression.py @@ -0,0 +1,60 @@ +from gufe.tokenization import GufeTokenizable, JSON_HANDLER +import json +import zstandard as zstd + + +def compress_gufe_zstd(gufe_object: GufeTokenizable) -> bytes: + """Compress a GufeTokenizable using zstandard compression. + + After the GufeTokenizable is converted to a KeyedChain, it's + serialized into JSON using the gufe provided + JSON_HANDLER.encoder. The resulting string is utf-8 encoded and + compressed with the zstandard compressor. These bytes are returned + by the function. + + Parameters + ---------- + gufe_object: GufeTokenizable + The GufeTokenizable to compress. + + Returns + ------- + bytes + Compressed byte form of the the GufeTokenizable. + """ + keyed_chain_rep = gufe_object.to_keyed_chain() + json_rep = json.dumps(keyed_chain_rep, cls=JSON_HANDLER.encoder) + json_bytes = json_rep.encode("utf-8") + + compressor = zstd.ZstdCompressor() + compressed_gufe = compressor.compress(json_bytes) + + return compressed_gufe + + +def decompress_gufe_zstd(compressed_bytes: bytes) -> GufeTokenizable: + """Decompress a zstandard compressed GufeTokenizable. + + The bytes encoding a zstandard compressed GufeTokenizable are + decompressed and decoded using the gufe provided + JSON_HANDLER.decoder. It is assumed that the decompressed bytes + are utf-8 encoded. + + Parameters + ---------- + compressed_bytes: bytes + The compressed byte form of a GufeTokenizable. + + Returns + ------- + GufeTokenizable + The decompressed GufeTokenizable. + """ + decompressor = zstd.ZstdDecompressor() + decompressed_gufe: bytes = decompressor.decompress(compressed_bytes) + + keyed_chain_rep = json.loads( + decompressed_gufe.decode("utf-8"), cls=JSON_HANDLER.decoder + ) + gufe_object = GufeTokenizable.from_keyed_chain(keyed_chain_rep) + return gufe_object diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index c394bb1c..869a5281 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -9,7 +9,6 @@ import json from datetime import datetime, timedelta import random -import base64 from fastapi import FastAPI, APIRouter, Body, Depends, Request from fastapi.middleware.gzip import GZipMiddleware @@ -31,6 +30,7 @@ gufe_to_json, GzipRoute, ) +from ..compression import decompress_gufe_zstd from ..settings import ( get_base_api_settings, get_compute_api_settings, @@ -298,18 +298,17 @@ def retrieve_task_transformation( # we keep this as a string to avoid useless deserialization/reserialization here try: - pdr: str = s3os.pull_protocoldagresult( - pdr_sk, transformation_sk, return_as="json", ok=True + pdr_bytes: bytes = s3os.pull_protocoldagresult( + pdr_sk, transformation_sk, ok=True ) except: # if we fail to get the object with the above, fall back to # location-based retrieval - pdr: str = s3os.pull_protocoldagresult( + pdr_bytes: bytes = s3os.pull_protocoldagresult( location=protocoldagresultref.location, - return_as="json", ok=True, ) - + pdr = pdr_bytes.decode("latin-1") else: pdr = None @@ -329,20 +328,13 @@ async def set_task_result( body = await request.body() body_ = json.loads(body.decode("utf-8"), cls=JSON_HANDLER.decoder) - protocoldagresult = body_['protocoldagresult'] - compute_service_id = body_['compute_service_id'] + protocoldagresult_ = body_["protocoldagresult"] + compute_service_id = body_["compute_service_id"] task_sk = ScopedKey.from_str(task_scoped_key) validate_scopes(task_sk.scope, token) - # decode b64 and decompress the zstd bytes back into json - protocoldagresult = base64.b64decode(protocoldagresult) - decompressor = zstd.ZstdDecompressor() - protocoldagresult = decompressor.decompress(protocoldagresult) - - pdr_keyed_chain_rep = json.loads(protocoldagresult, cls=JSON_HANDLER.decoder) - pdr_keyed_chain = KeyedChain.from_keyed_chain_rep(pdr_keyed_chain_rep) - pdr = pdr_keyed_chain.to_gufe() + pdr = decompress_gufe_zstd(protocoldagresult_) tf_sk, _ = n4js.get_task_transformation( task=task_scoped_key, @@ -351,7 +343,7 @@ async def set_task_result( # push the ProtocolDAGResult to the object store protocoldagresultref: ProtocolDAGResultRef = s3os.push_protocoldagresult( - pdr, transformation=tf_sk, creator=compute_service_id + protocoldagresult_, transformation=tf_sk, creator=compute_service_id ) # push the reference to the state store diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index eb022759..7a7c498d 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -9,7 +9,6 @@ import json from urllib.parse import urljoin from functools import wraps -import base64 import requests from requests.auth import HTTPBasicAuth @@ -25,6 +24,7 @@ AlchemiscaleBaseClientError, json_to_gufe, ) +from ..compression import compress_gufe_zstd, decompress_gufe_zstd from ..models import Scope, ScopedKey from ..storage.models import TaskHub, Task, ComputeServiceID, TaskStatusEnum @@ -120,9 +120,14 @@ def retrieve_task_transformation( f"/tasks/{task}/transformation/gufe" ) + if protocoldagresult is not None: + protocoldagresult = decompress_gufe_zstd( + protocoldagresult.encode("latin-1") + ) + return ( json_to_gufe(transformation), - json_to_gufe(protocoldagresult) if protocoldagresult is not None else None, + protocoldagresult, ) def set_task_result( @@ -132,17 +137,8 @@ def set_task_result( compute_service_id: Optional[ComputeServiceID] = None, ) -> ScopedKey: - keyed_chain_rep = KeyedChain.from_gufe(protocoldagresult).to_keyed_chain_rep() - json_rep = json.dumps(keyed_chain_rep, cls=JSON_HANDLER.encoder) - json_bytes = json_rep.encode("utf-8") - - compressor = zstd.ZstdCompressor() - compressed = compressor.compress(json_bytes) - - base64_encoded = base64.b64encode(compressed).decode("utf-8") - data = dict( - protocoldagresult=base64_encoded, + protocoldagresult=compress_gufe_zstd(protocoldagresult), compute_service_id=str(compute_service_id), ) diff --git a/alchemiscale/interface/api.py b/alchemiscale/interface/api.py index 5b6aeb1e..fd6db0c7 100644 --- a/alchemiscale/interface/api.py +++ b/alchemiscale/interface/api.py @@ -1046,17 +1046,15 @@ def get_protocoldagresult( # we leave each ProtocolDAGResult in string form to avoid # deserializing/reserializing here; just passing through to client try: - pdr: str = s3os.pull_protocoldagresult( - pdr_sk, transformation_sk, return_as="json", ok=ok - ) + pdr_bytes: str = s3os.pull_protocoldagresult(pdr_sk, transformation_sk, ok=ok) except Exception: # if we fail to get the object with the above, fall back to # location-based retrieval - pdr: str = s3os.pull_protocoldagresult( + pdr_bytes: str = s3os.pull_protocoldagresult( location=protocoldagresultref.location, - return_as="json", ok=ok, ) + pdr = pdr_bytes.decode("latin-1") return [pdr] diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index 7bd1311f..151c1a35 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -22,6 +22,7 @@ AlchemiscaleBaseClientError, use_session, ) +from ..compression import decompress_gufe_zstd from ..models import Scope, ScopedKey from ..storage.models import ( TaskStatusEnum, @@ -1352,14 +1353,11 @@ def get_tasks_priority( async def _async_get_protocoldagresult( self, protocoldagresultref, transformation, route, compress ): - pdr_json = await self._get_resource_async( + pdr_compressed_latin1 = await self._get_resource_async( f"/transformations/{transformation}/{route}/{protocoldagresultref}", compress=compress, ) - - pdr = GufeTokenizable.from_dict( - json.loads(pdr_json[0], cls=JSON_HANDLER.decoder) - ) + pdr = decompress_gufe_zstd(pdr_compressed_latin1[0].encode("latin-1")) return pdr diff --git a/alchemiscale/security/auth.py b/alchemiscale/security/auth.py index 78c4ac0e..f5c2cc92 100644 --- a/alchemiscale/security/auth.py +++ b/alchemiscale/security/auth.py @@ -5,7 +5,6 @@ """ import secrets -import base64 import hashlib from datetime import datetime, timedelta from typing import Optional, Union diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index 1ac76bbe..c1e007f2 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -12,8 +12,10 @@ from boto3.session import Session from functools import lru_cache +import zstandard as zstd + from gufe.protocols import ProtocolDAGResult -from gufe.tokenization import JSON_HANDLER, GufeTokenizable +from gufe.tokenization import JSON_HANDLER, GufeTokenizable, KeyedChain from ..models import ScopedKey, Scope from .models import ProtocolDAGResultRef @@ -193,7 +195,7 @@ def _get_filename(self, location): def push_protocoldagresult( self, - protocoldagresult: ProtocolDAGResult, + protocoldagresult: bytes, transformation: ScopedKey, creator: Optional[str] = None, ) -> ProtocolDAGResultRef: @@ -213,7 +215,16 @@ def push_protocoldagresult( Reference to the serialized `ProtocolDAGResult` in the object store. """ - ok = protocoldagresult.ok() + + decompressor = zstd.ZstdDecompressor() + decompressed_pdr = decompressor.decompress(protocoldagresult) + + pdr_keyed_chain_rep = json.loads( + decompressed_pdr.decode("utf-8"), cls=JSON_HANDLER.decoder + ) + pdr_keyed_chain = KeyedChain.from_keyed_chain_rep(pdr_keyed_chain_rep) + pdr = pdr_keyed_chain.to_gufe() + ok = pdr.ok() route = "results" if ok else "failures" # build `location` based on gufe key @@ -222,19 +233,15 @@ def push_protocoldagresult( *transformation.scope.to_tuple(), transformation.gufe_key, route, - protocoldagresult.key, + pdr.key, "obj.json", ) - # TODO: add support for compute client-side compressed protocoldagresults - pdr_jb = json.dumps( - protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder - ).encode("utf-8") - response = self._store_bytes(location, pdr_jb) + response = self._store_bytes(location, protocoldagresult) return ProtocolDAGResultRef( location=location, - obj_key=protocoldagresult.key, + obj_key=pdr.key, scope=transformation.scope, ok=ok, datetime_created=datetime.utcnow(), @@ -246,9 +253,8 @@ def pull_protocoldagresult( protocoldagresult: Optional[ScopedKey] = None, transformation: Optional[ScopedKey] = None, location: Optional[str] = None, - return_as="gufe", ok=True, - ) -> Union[ProtocolDAGResult, dict, str]: + ) -> bytes: """Pull the `ProtocolDAGResult` corresponding to the given `ProtocolDAGResultRef`. Parameters @@ -263,9 +269,6 @@ def pull_protocoldagresult( location The full path in the object store to the ProtocolDAGResult. If provided, this will be used to retrieve it. - return_as : ['gufe', 'dict', 'json'] - Form in which to return result; this is provided to avoid - unnecessary deserializations where desired. Returns ------- @@ -297,15 +300,6 @@ def pull_protocoldagresult( ## TODO: want organization alongside `obj.json` of `ProtocolUnit` gufe_keys ## for any file objects stored in the same space + pdr_bytes = self._get_bytes(location) - pdr_j = self._get_bytes(location).decode("utf-8") - - # TODO: add support for interface client-side decompression - if return_as == "gufe": - pdr = GufeTokenizable.from_dict(json.loads(pdr_j, cls=JSON_HANDLER.decoder)) - elif return_as == "dict": - pdr = json.loads(pdr_j, cls=JSON_HANDLER.decoder) - elif return_as == "json": - pdr = pdr_j - - return pdr + return pdr_bytes diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 7146c50f..e665fb20 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -8,6 +8,7 @@ from gufe.protocols.protocoldag import execute_DAG import networkx as nx +from alchemiscale.compression import compress_gufe_zstd from alchemiscale.models import ScopedKey, Scope from alchemiscale.storage.models import TaskStatusEnum, NetworkStateEnum from alchemiscale.storage.cypher import cypher_list_from_scoped_keys @@ -1850,7 +1851,7 @@ def _execute_tasks(tasks, n4js, s3os_server): protocoldagresults.append(protocoldagresult) protocoldagresultref = s3os_server.push_protocoldagresult( - protocoldagresult, transformation=transformation_sk + compress_gufe_zstd(protocoldagresult), transformation=transformation_sk ) n4js.set_task_result( diff --git a/alchemiscale/tests/integration/storage/test_objectstore.py b/alchemiscale/tests/integration/storage/test_objectstore.py index f8c4fcd2..bb7a1177 100644 --- a/alchemiscale/tests/integration/storage/test_objectstore.py +++ b/alchemiscale/tests/integration/storage/test_objectstore.py @@ -3,6 +3,7 @@ import pytest +from alchemiscale.compression import compress_gufe_zstd, decompress_gufe_zstd from alchemiscale.models import ScopedKey from alchemiscale.storage.objectstore import S3ObjectStore from alchemiscale.storage.models import ProtocolDAGResultRef @@ -21,7 +22,7 @@ def test_push_protocolresult( # try to push the result objstoreref: ProtocolDAGResultRef = s3os.push_protocoldagresult( - protocoldagresults[0], transformation=transformation_sk + compress_gufe_zstd(protocoldagresults[0]), transformation=transformation_sk ) assert objstoreref.obj_key == protocoldagresults[0].key @@ -38,7 +39,7 @@ def test_pull_protocolresult( transformation_sk = ScopedKey(gufe_key=transformation.key, **scope_test.dict()) objstoreref: ProtocolDAGResultRef = s3os.push_protocoldagresult( - protocoldagresults[0], transformation=transformation_sk + compress_gufe_zstd(protocoldagresults[0]), transformation=transformation_sk ) # round trip it @@ -46,13 +47,15 @@ def test_pull_protocolresult( tf_sk = ScopedKey( gufe_key=protocoldagresults[0].transformation_key, **scope_test.dict() ) - pdr = s3os.pull_protocoldagresult(sk, tf_sk) + pdr = decompress_gufe_zstd(s3os.pull_protocoldagresult(sk, tf_sk)) assert pdr.key == protocoldagresults[0].key assert pdr.protocol_unit_results == pdr.protocol_unit_results # test location-based pull - pdr = s3os.pull_protocoldagresult(location=objstoreref.location) + pdr = decompress_gufe_zstd( + s3os.pull_protocoldagresult(location=objstoreref.location) + ) assert pdr.key == protocoldagresults[0].key assert pdr.protocol_unit_results == pdr.protocol_unit_results From f2da1fc3bb71b51e9ba9b56cff9f64c3e2b084c1 Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Wed, 11 Dec 2024 10:25:39 -0700 Subject: [PATCH 04/20] Use compression module in objectstore --- alchemiscale/compute/api.py | 2 +- alchemiscale/compute/client.py | 2 +- alchemiscale/storage/objectstore.py | 12 +++--------- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 869a5281..16848065 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -12,7 +12,7 @@ from fastapi import FastAPI, APIRouter, Body, Depends, Request from fastapi.middleware.gzip import GZipMiddleware -from gufe.tokenization import GufeTokenizable, JSON_HANDLER, KeyedChain +from gufe.tokenization import GufeTokenizable, JSON_HANDLER import zstandard as zstd from ..base.api import ( diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index 7a7c498d..805657b9 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -15,7 +15,7 @@ import zstandard as zstd -from gufe.tokenization import GufeTokenizable, JSON_HANDLER, KeyedChain +from gufe.tokenization import GufeTokenizable, JSON_HANDLER from gufe import Transformation from gufe.protocols import ProtocolDAGResult diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index c1e007f2..101aff95 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -15,8 +15,9 @@ import zstandard as zstd from gufe.protocols import ProtocolDAGResult -from gufe.tokenization import JSON_HANDLER, GufeTokenizable, KeyedChain +from gufe.tokenization import JSON_HANDLER, GufeTokenizable +from ..compression import decompress_gufe_zstd from ..models import ScopedKey, Scope from .models import ProtocolDAGResultRef from ..settings import S3ObjectStoreSettings, get_s3objectstore_settings @@ -216,14 +217,7 @@ def push_protocoldagresult( """ - decompressor = zstd.ZstdDecompressor() - decompressed_pdr = decompressor.decompress(protocoldagresult) - - pdr_keyed_chain_rep = json.loads( - decompressed_pdr.decode("utf-8"), cls=JSON_HANDLER.decoder - ) - pdr_keyed_chain = KeyedChain.from_keyed_chain_rep(pdr_keyed_chain_rep) - pdr = pdr_keyed_chain.to_gufe() + pdr = decompress_gufe_zstd(protocoldagresult) ok = pdr.ok() route = "results" if ok else "failures" From 18131f16d29f0d8c6ae371b57e2e61152df020f9 Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Fri, 13 Dec 2024 15:26:46 -0700 Subject: [PATCH 05/20] Attempt to decompress object store objects in a try block If a decompression error is raised, assume that the original data was never compressed. --- alchemiscale/compute/client.py | 21 +++--- alchemiscale/interface/client.py | 13 +++- alchemiscale/storage/objectstore.py | 4 +- .../interface/client/test_client.py | 64 ++++++++++++++++++- 4 files changed, 89 insertions(+), 13 deletions(-) diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index 805657b9..b2bfe747 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -115,20 +115,25 @@ def get_task_transformation(self, task: ScopedKey) -> ScopedKey: def retrieve_task_transformation( self, task: ScopedKey - ) -> Tuple[Transformation, Optional[ProtocolDAGResult]]: + ) -> tuple[Transformation, ProtocolDAGResult | None]: transformation, protocoldagresult = self._get_resource( f"/tasks/{task}/transformation/gufe" ) if protocoldagresult is not None: - protocoldagresult = decompress_gufe_zstd( - protocoldagresult.encode("latin-1") - ) - return ( - json_to_gufe(transformation), - protocoldagresult, - ) + protocoldagresult_bytes = protocoldagresult.encode("latin-1") + + try: + # Attempt to decompress the ProtocolDAGResult object + protocoldagresult = decompress_gufe_zstd( + protocoldagresult_bytes + ) + except zstd.ZstdError: + # If decompression fails, assume it's a UTF-8 encoded JSON string + protocoldagresult = json_to_gufe(protocoldagresult_bytes.decode('utf-8')) + + return json_to_gufe(transformation), protocoldagresult def set_task_result( self, diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index 151c1a35..1bd7b14c 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -15,11 +15,13 @@ from gufe import AlchemicalNetwork, Transformation, ChemicalSystem from gufe.tokenization import GufeTokenizable, JSON_HANDLER, KeyedChain from gufe.protocols import ProtocolResult, ProtocolDAGResult +import zstandard as zstd from ..base.client import ( AlchemiscaleBaseClient, AlchemiscaleBaseClientError, + json_to_gufe, use_session, ) from ..compression import decompress_gufe_zstd @@ -1353,11 +1355,18 @@ def get_tasks_priority( async def _async_get_protocoldagresult( self, protocoldagresultref, transformation, route, compress ): - pdr_compressed_latin1 = await self._get_resource_async( + pdr_latin1_decoded = await self._get_resource_async( f"/transformations/{transformation}/{route}/{protocoldagresultref}", compress=compress, ) - pdr = decompress_gufe_zstd(pdr_compressed_latin1[0].encode("latin-1")) + + try: + # Attempt to decompress the ProtocolDAGResult object + pdr_bytes = pdr_latin1_decoded[0].encode("latin-1") + pdr = decompress_gufe_zstd(pdr_bytes) + except zstd.ZstdError: + # If decompress fails, assume it's a UTF-8 encoded JSON string + pdr = json_to_gufe(pdr_bytes.decode('utf-8')) return pdr diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index 101aff95..0480bf8f 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -228,7 +228,7 @@ def push_protocoldagresult( transformation.gufe_key, route, pdr.key, - "obj.json", + "obj", ) response = self._store_bytes(location, protocoldagresult) @@ -289,7 +289,7 @@ def pull_protocoldagresult( transformation.gufe_key, route, protocoldagresult.gufe_key, - "obj.json", + "obj", ) ## TODO: want organization alongside `obj.json` of `ProtocolUnit` gufe_keys diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index e665fb20..3ad66971 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -1,10 +1,12 @@ import pytest from time import sleep +import os from pathlib import Path from itertools import chain +import json from gufe import AlchemicalNetwork -from gufe.tokenization import TOKENIZABLE_REGISTRY, GufeKey +from gufe.tokenization import TOKENIZABLE_REGISTRY, GufeKey, JSON_HANDLER from gufe.protocols.protocoldag import execute_DAG import networkx as nx @@ -1860,6 +1862,66 @@ def _execute_tasks(tasks, n4js, s3os_server): return protocoldagresults + def test_get_transformation_and_network_results_json( + self, + scope_test, + n4js_preloaded, + s3os_server, + user_client: client.AlchemiscaleClient, + network_tyk2, + tmpdir, + ): + n4js = n4js_preloaded + + # select the transformation we want to compute + an = network_tyk2 + transformation = list(t for t in an.edges if "_solvent" in t.name)[0] + + network_sk = user_client.get_scoped_key(an, scope_test) + transformation_sk = user_client.get_scoped_key(transformation, scope_test) + + # user client : create three independent tasks for the transformation + user_client.create_tasks(transformation_sk, count=3) + + # user client : action the tasks for execution + all_tasks = user_client.get_transformation_tasks(transformation_sk) + actioned_tasks = user_client.action_tasks(all_tasks, network_sk) + + # execute the actioned tasks and push results directly using statestore and object store + with tmpdir.as_cwd(): + protocoldagresults = self._execute_tasks(actioned_tasks, n4js, s3os_server) + # overwrite what's in the object store + for protocoldagresult in protocoldagresults: + pdr_jb = json.dumps( + protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder + ).encode("utf-8") + + location = os.path.join( + "protocoldagresult", + *transformation_sk.scope.to_tuple(), + transformation_sk.gufe_key, + "results", + protocoldagresult.key, + "obj", + ) + + s3os_server._store_bytes(location, pdr_jb) + + # clear local gufe registry of pdr objects + # not critical, but ensures we see the objects that are deserialized + # instead of our instances already in memory post-pull + for pdr in protocoldagresults: + TOKENIZABLE_REGISTRY.pop(pdr.key, None) + + # get back protocoldagresults instead + protocoldagresults_r = user_client.get_transformation_results( + transformation_sk, return_protocoldagresults=True + ) + + assert set(protocoldagresults_r) == set(protocoldagresults) + + pass + def test_get_transformation_and_network_results( self, scope_test, From 8ee534941be708749b78d2a06470513ea5cc23f2 Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Wed, 18 Dec 2024 14:55:54 -0700 Subject: [PATCH 06/20] Use fixed filename for compressed object --- alchemiscale/compute/client.py | 8 ++++---- alchemiscale/interface/client.py | 2 +- alchemiscale/storage/objectstore.py | 7 +++++-- .../integration/interface/client/test_client.py | 14 +++++++------- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index b2bfe747..65581043 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -126,12 +126,12 @@ def retrieve_task_transformation( try: # Attempt to decompress the ProtocolDAGResult object - protocoldagresult = decompress_gufe_zstd( - protocoldagresult_bytes - ) + protocoldagresult = decompress_gufe_zstd(protocoldagresult_bytes) except zstd.ZstdError: # If decompression fails, assume it's a UTF-8 encoded JSON string - protocoldagresult = json_to_gufe(protocoldagresult_bytes.decode('utf-8')) + protocoldagresult = json_to_gufe( + protocoldagresult_bytes.decode("utf-8") + ) return json_to_gufe(transformation), protocoldagresult diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index 1bd7b14c..b14f640a 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -1366,7 +1366,7 @@ async def _async_get_protocoldagresult( pdr = decompress_gufe_zstd(pdr_bytes) except zstd.ZstdError: # If decompress fails, assume it's a UTF-8 encoded JSON string - pdr = json_to_gufe(pdr_bytes.decode('utf-8')) + pdr = json_to_gufe(pdr_bytes.decode("utf-8")) return pdr diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index 0480bf8f..089d25a4 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -22,6 +22,9 @@ from .models import ProtocolDAGResultRef from ..settings import S3ObjectStoreSettings, get_s3objectstore_settings +# default filename for object store files +OBJECT_FILENAME = "obj.json.zst" + @lru_cache() def get_s3os(settings: S3ObjectStoreSettings, endpoint_url=None) -> "S3ObjectStore": @@ -228,7 +231,7 @@ def push_protocoldagresult( transformation.gufe_key, route, pdr.key, - "obj", + OBJECT_FILENAME, ) response = self._store_bytes(location, protocoldagresult) @@ -289,7 +292,7 @@ def pull_protocoldagresult( transformation.gufe_key, route, protocoldagresult.gufe_key, - "obj", + OBJECT_FILENAME, ) ## TODO: want organization alongside `obj.json` of `ProtocolUnit` gufe_keys diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 3ad66971..2e9af8e5 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -1863,13 +1863,13 @@ def _execute_tasks(tasks, n4js, s3os_server): return protocoldagresults def test_get_transformation_and_network_results_json( - self, - scope_test, - n4js_preloaded, - s3os_server, - user_client: client.AlchemiscaleClient, - network_tyk2, - tmpdir, + self, + scope_test, + n4js_preloaded, + s3os_server, + user_client: client.AlchemiscaleClient, + network_tyk2, + tmpdir, ): n4js = n4js_preloaded From b32f62df85281583b86bac9169c31fafc51240e9 Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Thu, 26 Dec 2024 11:50:19 -0700 Subject: [PATCH 07/20] Implement test_set_task_result_legacy Test getting extends ProtocolDAGResults as if they were stored through the old pdr.to_dict() -> json -> utf-8 encoded format. The new test can be removed in the next major release that drops the old format. --- alchemiscale/storage/objectstore.py | 2 +- .../compute/client/test_compute_client.py | 121 +++++++++++++++++- .../interface/client/test_client.py | 2 +- 3 files changed, 119 insertions(+), 6 deletions(-) diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index 089d25a4..d0e7ec3d 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -234,7 +234,7 @@ def push_protocoldagresult( OBJECT_FILENAME, ) - response = self._store_bytes(location, protocoldagresult) + self._store_bytes(location, protocoldagresult) return ProtocolDAGResultRef( location=location, diff --git a/alchemiscale/tests/integration/compute/client/test_compute_client.py b/alchemiscale/tests/integration/compute/client/test_compute_client.py index 99777c41..ab23e8ef 100644 --- a/alchemiscale/tests/integration/compute/client/test_compute_client.py +++ b/alchemiscale/tests/integration/compute/client/test_compute_client.py @@ -1,12 +1,18 @@ import pytest +import json +import os +from datetime import datetime from time import sleep -from gufe.tokenization import GufeTokenizable +from gufe.tokenization import GufeTokenizable, JSON_HANDLER -from alchemiscale.models import ScopedKey from alchemiscale.compute import client -from alchemiscale.storage.models import TaskStatusEnum, ComputeServiceID - +from alchemiscale.models import ScopedKey +from alchemiscale.storage.models import ( + TaskStatusEnum, + ComputeServiceID, + ProtocolDAGResultRef, +) from alchemiscale.tests.integration.compute.utils import get_compute_settings_override @@ -299,3 +305,110 @@ def test_set_task_result( assert transformation2 == transformation_ assert extends_protocoldagresult2 == protocoldagresults[0] + + def test_set_task_result_legacy( + self, + scope_test, + n4js_preloaded, + compute_client: client.AlchemiscaleComputeClient, + compute_service_id, + network_tyk2, + transformation, + protocoldagresults, + uvicorn_server, + s3os_server, + ): + # register compute service id + compute_client.register(compute_service_id) + + an_sk = ScopedKey(gufe_key=network_tyk2.key, **scope_test.dict()) + tf_sk = ScopedKey(gufe_key=transformation.key, **scope_test.dict()) + taskhub_sk = n4js_preloaded.get_taskhub(an_sk) + + # claim our first task + task_sks = compute_client.claim_taskhub_tasks( + taskhub_sk, compute_service_id=compute_service_id + ) + + # get the transformation corresponding to this task + ( + transformation_, + extends_protocoldagresult, + ) = compute_client.retrieve_task_transformation(task_sks[0]) + + assert transformation_ == transformation + assert extends_protocoldagresult is None + + # push a result for the task + # pdr_sk = compute_client.set_task_result(task_sks[0], protocoldagresults[0]) + + protocoldagresult = protocoldagresults[0] + task_sk = task_sks[0] + + # we need to replicate the behavior of set_task_result: + # + # pdr_sk = compute_client.set_task_result(task_sks[0], protocoldagresults[0]) + # + # This involves pushing the protocoldagresult in the legacy + # to_dict() -> json -> utf-8 encode form, set the task result + # in the statestore, set the task to complete in the + # statestore + # + # + # step 1: Push the protocoldagresult. This needs to be done + # manually since the old behavior was overwritten. + + pdr_bytes_push = json.dumps( + protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder + ).encode("utf-8") + route = "results" if protocoldagresult.ok() else "failures" + + location = os.path.join( + "protocoldagresult", + *tf_sk.scope.to_tuple(), + tf_sk.gufe_key, + route, + protocoldagresult.key, + "obj.json", + ) + + s3os_server._store_bytes(location, pdr_bytes_push) + + pdrr = ProtocolDAGResultRef( + location=location, + obj_key=protocoldagresult.key, + scope=tf_sk.scope, + ok=protocoldagresult.ok(), + datetime_created=datetime.utcnow(), + creator=None, + ) + + # step 2: set the task result in the statestore to reflect the + # protocoldagresult in the objectstore + + result_sk = n4js_preloaded.set_task_result( + task=task_sk, protocoldagresultref=pdrr + ) + + # step 3: set the task to complete in the statestore + + if pdrr.ok: + n4js_preloaded.set_task_complete(tasks=[task_sk]) + else: + n4js_preloaded.set_task_error(tasks=[task_sk]) + + # continue normally and show the protocoldagresult stored in + # the legacy format is properly fetched and decoded + + # create a task that extends the one we just "performed" + task_sk2 = n4js_preloaded.create_task(tf_sk, extends=task_sks[0]) + + # get the transformation and the protocoldagresult for the task this extends + # no need to claim to actually do this + ( + transformation2, + extends_protocoldagresult2, + ) = compute_client.retrieve_task_transformation(task_sk2) + + assert transformation2 == transformation_ + assert extends_protocoldagresult2 == protocoldagresults[0] diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 2e9af8e5..2c39551c 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -1902,7 +1902,7 @@ def test_get_transformation_and_network_results_json( transformation_sk.gufe_key, "results", protocoldagresult.key, - "obj", + "obj.json", ) s3os_server._store_bytes(location, pdr_jb) From 6f3ce3c3122a5cf9712071a9de55733f16fcf670 Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Mon, 30 Dec 2024 16:04:25 -0700 Subject: [PATCH 08/20] Separate executing tasks and pushing results in TestClient To allow for better and clearer testing of result pushing and pulling, the act of executing a task and pushing its results were separated. --- .../interface/client/test_client.py | 196 ++++++++++++------ 1 file changed, 134 insertions(+), 62 deletions(-) diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 2c39551c..2333ee68 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -1,4 +1,5 @@ import pytest +from datetime import datetime from time import sleep import os from pathlib import Path @@ -12,7 +13,11 @@ from alchemiscale.compression import compress_gufe_zstd from alchemiscale.models import ScopedKey, Scope -from alchemiscale.storage.models import TaskStatusEnum, NetworkStateEnum +from alchemiscale.storage.models import ( + ProtocolDAGResultRef, + NetworkStateEnum, + TaskStatusEnum, +) from alchemiscale.storage.cypher import cypher_list_from_scoped_keys from alchemiscale.interface import client from alchemiscale.tests.integration.interface.utils import ( @@ -1798,6 +1803,56 @@ def test_set_tasks_priority_missing_tasks( ### results + @staticmethod + def _execute_task(task_scoped_key, n4js, shared_basedir=None, scratch_basedir=None): + + shared_basedir = shared_basedir or Path("shared").absolute() + shared_basedir.mkdir(exist_ok=True) + + scratch_basedir = scratch_basedir or Path("scratch").absolute() + scratch_basedir.mkdir(exist_ok=True) + + ( + transformation_sk, + extends_protocoldagresultref_sk, + ) = n4js.get_task_transformation(task=task_scoped_key, return_gufe=False) + + transformation = n4js.get_gufe(transformation_sk) + if extends_protocoldagresultref_sk: + extends_protocoldagresultref = n4js.get_gufe( + extends_protocoldagresultref_sk + ) + extends_protocoldagresult = s3os_server.pull_protocoldagresult( + extends_protocoldagresultref, transformation_sk + ) + else: + extends_protocoldagresult = None + + protocoldag = transformation.create( + extends=extends_protocoldagresult, + name=str(task_scoped_key), + ) + + shared = shared_basedir / str(protocoldag.key) + shared.mkdir() + + scratch = scratch_basedir / str(protocoldag.key) + scratch.mkdir() + + protocoldagresult = execute_DAG( + protocoldag, + shared_basedir=shared, + scratch_basedir=scratch, + raise_error=False, + ) + + assert protocoldagresult.transformation_key == transformation.key + + if extends_protocoldagresult: + assert protocoldagresult.extends_key == extends_protocoldagresult.key + + return protocoldagresult + @staticmethod def _execute_tasks(tasks, n4js, s3os_server): shared_basedir = Path("shared").absolute() @@ -1807,60 +1862,85 @@ def _execute_tasks(tasks, n4js, s3os_server): protocoldagresults = [] for task_sk in tasks: - if task_sk is None: - continue - # get the transformation and extending protocoldagresult as if we # were a compute service - ( - transformation_sk, - extends_protocoldagresultref_sk, - ) = n4js.get_task_transformation(task=task_sk, return_gufe=False) - - transformation = n4js.get_gufe(transformation_sk) - if extends_protocoldagresultref_sk: - extends_protocoldagresultref = n4js.get_gufe( - extends_protocoldagresultref_sk - ) - extends_protocoldagresult = s3os_server.pull_protocoldagresult( - extends_protocoldagresultref, transformation_sk - ) - else: - extends_protocoldagresult = None - - protocoldag = transformation.create( - extends=extends_protocoldagresult, - name=str(task_sk), + protocoldagresult = TestClient._execute_task( + task_sk, + n4js, + shared_basedir=shared_basedir, + scratch_basedir=scratch_basedir, ) + protocoldagresults.append(protocoldagresult) - shared = shared_basedir / str(protocoldag.key) - shared.mkdir() + return protocoldagresults - scratch = scratch_basedir / str(protocoldag.key) - scratch.mkdir() + @staticmethod + def _push_result(task_scoped_key, protocoldagresult, n4js, s3os_server): + transformation_sk, _ = n4js.get_task_transformation( + task_scoped_key, return_gufe=False + ) + protocoldagresultref = s3os_server.push_protocoldagresult( + compress_gufe_zstd(protocoldagresult), transformation=transformation_sk + ) + n4js.set_task_result( + task=task_scoped_key, protocoldagresultref=protocoldagresultref + ) + return protocoldagresultref - protocoldagresult = execute_DAG( - protocoldag, - shared_basedir=shared, - scratch_basedir=scratch, - raise_error=False, - ) + # TODO: remove in next major version when to_dict json is no longer supported + @staticmethod + def _push_result_legacy(task_scoped_key, protocoldagresult, n4js, s3os_server): + transformation_scoped_key, _ = n4js.get_task_transformation( + task_scoped_key, return_gufe=False + ) + pdr_jb = json.dumps( + protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder + ).encode("utf-8") + + ok = protocoldagresult.ok() + route = "results" if ok else "failures" + + location = os.path.join( + "protocoldagresult", + *transformation_scoped_key.scope.to_tuple(), + transformation_scoped_key.gufe_key, + route, + protocoldagresult.key, + "obj.json", + ) - assert protocoldagresult.transformation_key == transformation.key - if extends_protocoldagresult: - assert protocoldagresult.extends_key == extends_protocoldagresult.key + s3os_server._store_bytes(location, pdr_jb) - protocoldagresults.append(protocoldagresult) + protocoldagresultref = ProtocolDAGResultRef( + location=location, + obj_key=protocoldagresult.key, + scope=transformation_scoped_key.scope, + ok=ok, + datetime_created=datetime.utcnow(), + creator=None, + ) + n4js.set_task_result( + task=task_scoped_key, protocoldagresultref=protocoldagresultref + ) - protocoldagresultref = s3os_server.push_protocoldagresult( - compress_gufe_zstd(protocoldagresult), transformation=transformation_sk - ) + return protocoldagresultref - n4js.set_task_result( - task=task_sk, protocoldagresultref=protocoldagresultref + @staticmethod + def _push_results( + task_scoped_keys, protocoldagresults, n4js, s3os_server, legacy=True + ): + push_function = ( + TestClient._push_result_legacy if legacy else TestClient._push_result + ) + protocoldagresultrefs = [] + for task_scoped_key, protocoldagresult in zip( + task_scoped_keys, protocoldagresults + ): + protocoldagresultref = push_function( + task_scoped_key, protocoldagresult, n4js, s3os_server ) - - return protocoldagresults + protocoldagresultrefs.append(protocoldagresultref) + return protocoldagresultrefs def test_get_transformation_and_network_results_json( self, @@ -1890,22 +1970,10 @@ def test_get_transformation_and_network_results_json( # execute the actioned tasks and push results directly using statestore and object store with tmpdir.as_cwd(): protocoldagresults = self._execute_tasks(actioned_tasks, n4js, s3os_server) - # overwrite what's in the object store - for protocoldagresult in protocoldagresults: - pdr_jb = json.dumps( - protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder - ).encode("utf-8") - - location = os.path.join( - "protocoldagresult", - *transformation_sk.scope.to_tuple(), - transformation_sk.gufe_key, - "results", - protocoldagresult.key, - "obj.json", - ) - - s3os_server._store_bytes(location, pdr_jb) + protocoldagresultrefs = self._push_results( + actioned_tasks, protocoldagresults, n4js, s3os_server, legacy=True + ) + assert len(protocoldagresultrefs) == len(protocoldagresults) == 3 # clear local gufe registry of pdr objects # not critical, but ensures we see the objects that are deserialized @@ -1920,8 +1988,6 @@ def test_get_transformation_and_network_results_json( assert set(protocoldagresults_r) == set(protocoldagresults) - pass - def test_get_transformation_and_network_results( self, scope_test, @@ -1950,6 +2016,9 @@ def test_get_transformation_and_network_results( # execute the actioned tasks and push results directly using statestore and object store with tmpdir.as_cwd(): protocoldagresults = self._execute_tasks(actioned_tasks, n4js, s3os_server) + self._push_results( + actioned_tasks, protocoldagresults, n4js, s3os_server, legacy=True + ) # clear local gufe registry of pdr objects # not critical, but ensures we see the objects that are deserialized @@ -2049,6 +2118,7 @@ def test_get_transformation_and_network_failures( # execute the actioned tasks and push results directly using statestore and object store with tmpdir.as_cwd(): protocoldagresults = self._execute_tasks(actioned_tasks, n4js, s3os_server) + self._push_results(actioned_tasks, protocoldagresults, n4js, s3os_server) # clear local gufe registry of pdr objects # not critical, but ensures we see the objects that are deserialized @@ -2122,6 +2192,7 @@ def test_get_task_results( # execute the actioned tasks and push results directly using statestore and object store with tmpdir.as_cwd(): protocoldagresults = self._execute_tasks(actioned_tasks, n4js, s3os_server) + self._push_results(actioned_tasks, protocoldagresults, n4js, s3os_server) # clear local gufe registry of pdr objects # not critical, but ensures we see the objects that are deserialized @@ -2188,6 +2259,7 @@ def test_get_task_failures( # execute the actioned tasks and push results directly using statestore and object store with tmpdir.as_cwd(): protocoldagresults = self._execute_tasks(actioned_tasks, n4js, s3os_server) + self._push_results(actioned_tasks, protocoldagresults, n4js, s3os_server) # clear local gufe registry of pdr objects # not critical, but ensures we see the objects that are deserialized From 63f998234e209e05a0a0ea11841fb03c2cce4dfd Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Mon, 30 Dec 2024 20:18:27 -0700 Subject: [PATCH 09/20] Clear leftover state before testing legacy PDR pull Code coverage was artificially low due to run test run order. A reset and reinitialization of the s3os_server shows the correct results. --- .../tests/integration/compute/client/test_compute_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/alchemiscale/tests/integration/compute/client/test_compute_client.py b/alchemiscale/tests/integration/compute/client/test_compute_client.py index ab23e8ef..09c9081e 100644 --- a/alchemiscale/tests/integration/compute/client/test_compute_client.py +++ b/alchemiscale/tests/integration/compute/client/test_compute_client.py @@ -306,6 +306,7 @@ def test_set_task_result( assert transformation2 == transformation_ assert extends_protocoldagresult2 == protocoldagresults[0] + # TODO: Remove in next major release where old to_dict protocoldagresults storage is removed def test_set_task_result_legacy( self, scope_test, @@ -316,8 +317,9 @@ def test_set_task_result_legacy( transformation, protocoldagresults, uvicorn_server, - s3os_server, + s3os_server_fresh, ): + s3os_server = s3os_server_fresh # register compute service id compute_client.register(compute_service_id) From 6985ca7d1c8dfa7c9c20168036a4d3265541c36e Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Tue, 31 Dec 2024 11:04:20 -0700 Subject: [PATCH 10/20] Parameterize test_get_transformation_and_network_results It's more robust to paramterize the old tests to use the legacy kwarg for pushing results rather than writing a new test that covers less of the codebase. --- .../interface/client/test_client.py | 59 +++---------------- 1 file changed, 9 insertions(+), 50 deletions(-) diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 2333ee68..f53f7f5a 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -1887,7 +1887,7 @@ def _push_result(task_scoped_key, protocoldagresult, n4js, s3os_server): ) return protocoldagresultref - # TODO: remove in next major version when to_dict json is no longer supported + # TODO: remove in next major version when to_dict json storage is no longer supported @staticmethod def _push_result_legacy(task_scoped_key, protocoldagresult, n4js, s3os_server): transformation_scoped_key, _ = n4js.get_task_transformation( @@ -1925,9 +1925,10 @@ def _push_result_legacy(task_scoped_key, protocoldagresult, n4js, s3os_server): return protocoldagresultref + # TODO: remove legacy kwarg when to_dict json storage is no longer supported @staticmethod def _push_results( - task_scoped_keys, protocoldagresults, n4js, s3os_server, legacy=True + task_scoped_keys, protocoldagresults, n4js, s3os_server, legacy=False ): push_function = ( TestClient._push_result_legacy if legacy else TestClient._push_result @@ -1942,62 +1943,20 @@ def _push_results( protocoldagresultrefs.append(protocoldagresultref) return protocoldagresultrefs - def test_get_transformation_and_network_results_json( - self, - scope_test, - n4js_preloaded, - s3os_server, - user_client: client.AlchemiscaleClient, - network_tyk2, - tmpdir, - ): - n4js = n4js_preloaded - - # select the transformation we want to compute - an = network_tyk2 - transformation = list(t for t in an.edges if "_solvent" in t.name)[0] - - network_sk = user_client.get_scoped_key(an, scope_test) - transformation_sk = user_client.get_scoped_key(transformation, scope_test) - - # user client : create three independent tasks for the transformation - user_client.create_tasks(transformation_sk, count=3) - - # user client : action the tasks for execution - all_tasks = user_client.get_transformation_tasks(transformation_sk) - actioned_tasks = user_client.action_tasks(all_tasks, network_sk) - - # execute the actioned tasks and push results directly using statestore and object store - with tmpdir.as_cwd(): - protocoldagresults = self._execute_tasks(actioned_tasks, n4js, s3os_server) - protocoldagresultrefs = self._push_results( - actioned_tasks, protocoldagresults, n4js, s3os_server, legacy=True - ) - assert len(protocoldagresultrefs) == len(protocoldagresults) == 3 - - # clear local gufe registry of pdr objects - # not critical, but ensures we see the objects that are deserialized - # instead of our instances already in memory post-pull - for pdr in protocoldagresults: - TOKENIZABLE_REGISTRY.pop(pdr.key, None) - - # get back protocoldagresults instead - protocoldagresults_r = user_client.get_transformation_results( - transformation_sk, return_protocoldagresults=True - ) - - assert set(protocoldagresults_r) == set(protocoldagresults) - + # TODO: remove mark and legacy parameter when to_dict json storage is no longer supported + @pytest.mark.parametrize("legacy", [True, False]) def test_get_transformation_and_network_results( self, scope_test, n4js_preloaded, - s3os_server, + s3os_server_fresh, user_client: client.AlchemiscaleClient, network_tyk2, tmpdir, + legacy, ): n4js = n4js_preloaded + s3os_server = s3os_server_fresh # select the transformation we want to compute an = network_tyk2 @@ -2017,7 +1976,7 @@ def test_get_transformation_and_network_results( with tmpdir.as_cwd(): protocoldagresults = self._execute_tasks(actioned_tasks, n4js, s3os_server) self._push_results( - actioned_tasks, protocoldagresults, n4js, s3os_server, legacy=True + actioned_tasks, protocoldagresults, n4js, s3os_server, legacy=legacy ) # clear local gufe registry of pdr objects From cda35416616c4c5411bf6e1ae9692acaedff00e7 Mon Sep 17 00:00:00 2001 From: David Dotson <dotsdl@gmail.com> Date: Wed, 22 Jan 2025 22:40:05 -0700 Subject: [PATCH 11/20] Small docstring adjustment --- alchemiscale/compression.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/alchemiscale/compression.py b/alchemiscale/compression.py index 2004095a..eb3a692e 100644 --- a/alchemiscale/compression.py +++ b/alchemiscale/compression.py @@ -20,7 +20,7 @@ def compress_gufe_zstd(gufe_object: GufeTokenizable) -> bytes: Returns ------- bytes - Compressed byte form of the the GufeTokenizable. + Compressed byte form of the GufeTokenizable. """ keyed_chain_rep = gufe_object.to_keyed_chain() json_rep = json.dumps(keyed_chain_rep, cls=JSON_HANDLER.encoder) @@ -40,6 +40,8 @@ def decompress_gufe_zstd(compressed_bytes: bytes) -> GufeTokenizable: JSON_HANDLER.decoder. It is assumed that the decompressed bytes are utf-8 encoded. + This is the inverse operation of `compress_gufe_zstd`. + Parameters ---------- compressed_bytes: bytes From 0a1513859e6ada35cec14a744df8d989b2af2bb5 Mon Sep 17 00:00:00 2001 From: David Dotson <dotsdl@gmail.com> Date: Wed, 22 Jan 2025 22:42:39 -0700 Subject: [PATCH 12/20] Merge fixes --- alchemiscale/compute/api.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 3ffb8891..54e5ca6a 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -13,11 +13,8 @@ from fastapi import FastAPI, APIRouter, Body, Depends, Request from fastapi.middleware.gzip import GZipMiddleware from gufe.tokenization import GufeTokenizable, JSON_HANDLER -<<<<<<< HEAD import zstandard as zstd -======= from gufe.protocols import ProtocolDAGResult ->>>>>>> main from ..base.api import ( QueryGUFEHandler, @@ -338,7 +335,7 @@ async def set_task_result( task_sk = ScopedKey.from_str(task_scoped_key) validate_scopes(task_sk.scope, token) - pdr = decompress_gufe_zstd(protocoldagresult_) + pdr: ProtocolDAGResult = decompress_gufe_zstd(protocoldagresult_) tf_sk, _ = n4js.get_task_transformation( task=task_scoped_key, From 8b65c72f5b19a1b84e9a9b9c51d85e4f4aa4319c Mon Sep 17 00:00:00 2001 From: David Dotson <dotsdl@gmail.com> Date: Thu, 23 Jan 2025 21:08:59 -0700 Subject: [PATCH 13/20] Removed need to decompress protocoldagresult in S3ObjectStore.push_protocoldagresult --- alchemiscale/compute/api.py | 6 +++++- alchemiscale/storage/objectstore.py | 15 ++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 54e5ca6a..f81252f9 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -344,7 +344,11 @@ async def set_task_result( # push the ProtocolDAGResult to the object store protocoldagresultref: ProtocolDAGResultRef = s3os.push_protocoldagresult( - protocoldagresult_, transformation=tf_sk, creator=compute_service_id + protocoldagresult=protocoldagresult_, + protocoldagresult_ok=pdr.ok(), + protocoldagresult_gufekey=pdr.key, + transformation=tf_sk, + creator=compute_service_id ) # push the reference to the state store diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index d0e7ec3d..ca3fceee 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -15,7 +15,7 @@ import zstandard as zstd from gufe.protocols import ProtocolDAGResult -from gufe.tokenization import JSON_HANDLER, GufeTokenizable +from gufe.tokenization import JSON_HANDLER, GufeTokenizable, GufeKey from ..compression import decompress_gufe_zstd from ..models import ScopedKey, Scope @@ -200,6 +200,8 @@ def _get_filename(self, location): def push_protocoldagresult( self, protocoldagresult: bytes, + protocoldagresult_ok: bool, + protocoldagresult_gufekey: GufeKey, transformation: ScopedKey, creator: Optional[str] = None, ) -> ProtocolDAGResultRef: @@ -209,6 +211,10 @@ def push_protocoldagresult( ---------- protocoldagresult ProtocolDAGResult to store. + protocoldagresult_ok + ``True`` if ProtocolDAGResult completed successfully; ``False`` if failed. + protocoldagresult_gufekey + The GufeKey of the ProtocolDAGResult. transformation The ScopedKey of the Transformation this ProtocolDAGResult corresponds to. @@ -220,8 +226,7 @@ def push_protocoldagresult( """ - pdr = decompress_gufe_zstd(protocoldagresult) - ok = pdr.ok() + ok = protocoldagresult_ok route = "results" if ok else "failures" # build `location` based on gufe key @@ -230,7 +235,7 @@ def push_protocoldagresult( *transformation.scope.to_tuple(), transformation.gufe_key, route, - pdr.key, + protocoldagresult_gufekey, OBJECT_FILENAME, ) @@ -238,7 +243,7 @@ def push_protocoldagresult( return ProtocolDAGResultRef( location=location, - obj_key=pdr.key, + obj_key=protocoldagresult_gufekey, scope=transformation.scope, ok=ok, datetime_created=datetime.utcnow(), From 11876660b8ad2fddae01f58a350e99d833096ba5 Mon Sep 17 00:00:00 2001 From: David Dotson <dotsdl@gmail.com> Date: Thu, 23 Jan 2025 22:03:26 -0700 Subject: [PATCH 14/20] Some clarity edits --- alchemiscale/compute/client.py | 6 +++--- alchemiscale/interface/client.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index 65581043..ef170763 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -116,13 +116,13 @@ def get_task_transformation(self, task: ScopedKey) -> ScopedKey: def retrieve_task_transformation( self, task: ScopedKey ) -> tuple[Transformation, ProtocolDAGResult | None]: - transformation, protocoldagresult = self._get_resource( + transformation_json, protocoldagresult_latin1 = self._get_resource( f"/tasks/{task}/transformation/gufe" ) if protocoldagresult is not None: - protocoldagresult_bytes = protocoldagresult.encode("latin-1") + protocoldagresult_bytes = protocoldagresult_latin1.encode("latin-1") try: # Attempt to decompress the ProtocolDAGResult object @@ -133,7 +133,7 @@ def retrieve_task_transformation( protocoldagresult_bytes.decode("utf-8") ) - return json_to_gufe(transformation), protocoldagresult + return json_to_gufe(transformation_json), protocoldagresult def set_task_result( self, diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index b7841523..83c06ff8 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -1360,9 +1360,10 @@ async def _async_get_protocoldagresult( compress=compress, ) + pdr_bytes = pdr_latin1_decoded[0].encode("latin-1") + try: # Attempt to decompress the ProtocolDAGResult object - pdr_bytes = pdr_latin1_decoded[0].encode("latin-1") pdr = decompress_gufe_zstd(pdr_bytes) except zstd.ZstdError: # If decompress fails, assume it's a UTF-8 encoded JSON string From 7d3c86c72cc26a5c4123221d3a2f662dee00617a Mon Sep 17 00:00:00 2001 From: David Dotson <dotsdl@gmail.com> Date: Thu, 23 Jan 2025 22:03:52 -0700 Subject: [PATCH 15/20] Black! --- alchemiscale/compute/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 9e0defcc..754ce12b 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -348,7 +348,7 @@ async def set_task_result( protocoldagresult_ok=pdr.ok(), protocoldagresult_gufekey=pdr.key, transformation=tf_sk, - creator=compute_service_id + creator=compute_service_id, ) # push the reference to the state store From e149d09860ebf7eb3446d02226b02a7686405851 Mon Sep 17 00:00:00 2001 From: David Dotson <dotsdl@gmail.com> Date: Thu, 23 Jan 2025 23:18:31 -0700 Subject: [PATCH 16/20] CI fixes, other edits from review --- alchemiscale/storage/objectstore.py | 4 ++-- .../compute/client/test_compute_client.py | 4 +--- .../interface/client/test_client.py | 22 ++++++++++++----- .../integration/storage/test_objectstore.py | 24 ++++++++++++------- 4 files changed, 35 insertions(+), 19 deletions(-) diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index ca3fceee..c56572b4 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -210,7 +210,7 @@ def push_protocoldagresult( Parameters ---------- protocoldagresult - ProtocolDAGResult to store. + ProtocolDAGResult to store, in some bytes representation. protocoldagresult_ok ``True`` if ProtocolDAGResult completed successfully; ``False`` if failed. protocoldagresult_gufekey @@ -275,7 +275,7 @@ def pull_protocoldagresult( Returns ------- ProtocolDAGResult - The ProtocolDAGResult corresponding to the given `ProtocolDAGResultRef`. + The ProtocolDAGResult corresponding to the given `ProtocolDAGResultRef`, in a bytes representation. """ route = "results" if ok else "failures" diff --git a/alchemiscale/tests/integration/compute/client/test_compute_client.py b/alchemiscale/tests/integration/compute/client/test_compute_client.py index 09c9081e..443df3be 100644 --- a/alchemiscale/tests/integration/compute/client/test_compute_client.py +++ b/alchemiscale/tests/integration/compute/client/test_compute_client.py @@ -342,8 +342,6 @@ def test_set_task_result_legacy( assert extends_protocoldagresult is None # push a result for the task - # pdr_sk = compute_client.set_task_result(task_sks[0], protocoldagresults[0]) - protocoldagresult = protocoldagresults[0] task_sk = task_sks[0] @@ -352,7 +350,7 @@ def test_set_task_result_legacy( # pdr_sk = compute_client.set_task_result(task_sks[0], protocoldagresults[0]) # # This involves pushing the protocoldagresult in the legacy - # to_dict() -> json -> utf-8 encode form, set the task result + # to_dict() -> json -> utf-8 encoded form, set the task result # in the statestore, set the task to complete in the # statestore # diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 50a4dd07..002340fb 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -1804,7 +1804,13 @@ def test_set_tasks_priority_missing_tasks( ### results @staticmethod - def _execute_task(task_scoped_key, n4js, shared_basedir=None, scratch_basedir=None): + def _execute_task( + task_scoped_key, + n4js, + s3os_server, + shared_basedir=None, + scratch_basedir=None + ): shared_basedir = shared_basedir or Path("shared").absolute() shared_basedir.mkdir(exist_ok=True) @@ -1867,6 +1873,7 @@ def _execute_tasks(tasks, n4js, s3os_server): protocoldagresult = TestClient._execute_task( task_sk, n4js, + s3os_server, shared_basedir=shared_basedir, scratch_basedir=scratch_basedir, ) @@ -1880,7 +1887,10 @@ def _push_result(task_scoped_key, protocoldagresult, n4js, s3os_server): task_scoped_key, return_gufe=False ) protocoldagresultref = s3os_server.push_protocoldagresult( - compress_gufe_zstd(protocoldagresult), transformation=transformation_sk + compress_gufe_zstd(protocoldagresult), + protocoldagresult.ok(), + protocoldagresult.key, + transformation=transformation_sk ) n4js.set_task_result( task=task_scoped_key, protocoldagresultref=protocoldagresultref @@ -1890,7 +1900,7 @@ def _push_result(task_scoped_key, protocoldagresult, n4js, s3os_server): # TODO: remove in next major version when to_dict json storage is no longer supported @staticmethod def _push_result_legacy(task_scoped_key, protocoldagresult, n4js, s3os_server): - transformation_scoped_key, _ = n4js.get_task_transformation( + transformation_sk, _ = n4js.get_task_transformation( task_scoped_key, return_gufe=False ) pdr_jb = json.dumps( @@ -1902,8 +1912,8 @@ def _push_result_legacy(task_scoped_key, protocoldagresult, n4js, s3os_server): location = os.path.join( "protocoldagresult", - *transformation_scoped_key.scope.to_tuple(), - transformation_scoped_key.gufe_key, + *transformation_sk.scope.to_tuple(), + transformation_sk.gufe_key, route, protocoldagresult.key, "obj.json", @@ -1914,7 +1924,7 @@ def _push_result_legacy(task_scoped_key, protocoldagresult, n4js, s3os_server): protocoldagresultref = ProtocolDAGResultRef( location=location, obj_key=protocoldagresult.key, - scope=transformation_scoped_key.scope, + scope=transformation_sk.scope, ok=ok, datetime_created=datetime.utcnow(), creator=None, diff --git a/alchemiscale/tests/integration/storage/test_objectstore.py b/alchemiscale/tests/integration/storage/test_objectstore.py index bb7a1177..77f3e073 100644 --- a/alchemiscale/tests/integration/storage/test_objectstore.py +++ b/alchemiscale/tests/integration/storage/test_objectstore.py @@ -15,17 +15,21 @@ def test_delete(self, s3os: S3ObjectStore): s3os._store_bytes("_check_test", b"test_check") s3os._delete("_check_test") - def test_push_protocolresult( + def test_push_protocoldagresult( self, s3os: S3ObjectStore, protocoldagresults, transformation, scope_test ): transformation_sk = ScopedKey(gufe_key=transformation.key, **scope_test.dict()) + protocoldagresult = protocoldagresults[0] # try to push the result objstoreref: ProtocolDAGResultRef = s3os.push_protocoldagresult( - compress_gufe_zstd(protocoldagresults[0]), transformation=transformation_sk + compress_gufe_zstd(protocoldagresult), + protocoldagresult.ok(), + protocoldagresult.key, + transformation=transformation_sk ) - assert objstoreref.obj_key == protocoldagresults[0].key + assert objstoreref.obj_key == protocoldagresult.key # examine object metadata objs = list(s3os.resource.Bucket(s3os.bucket).objects.all()) @@ -33,23 +37,27 @@ def test_push_protocolresult( assert len(objs) == 1 assert objs[0].key == os.path.join(s3os.prefix, objstoreref.location) - def test_pull_protocolresult( + def test_pull_protocoldagresult( self, s3os: S3ObjectStore, protocoldagresults, transformation, scope_test ): transformation_sk = ScopedKey(gufe_key=transformation.key, **scope_test.dict()) + protocoldagresult = protocoldagresults[0] objstoreref: ProtocolDAGResultRef = s3os.push_protocoldagresult( - compress_gufe_zstd(protocoldagresults[0]), transformation=transformation_sk + compress_gufe_zstd(protocoldagresult), + protocoldagresult.ok(), + protocoldagresult.key, + transformation=transformation_sk ) # round trip it sk = ScopedKey(gufe_key=objstoreref.obj_key, **scope_test.dict()) tf_sk = ScopedKey( - gufe_key=protocoldagresults[0].transformation_key, **scope_test.dict() + gufe_key=protocoldagresult.transformation_key, **scope_test.dict() ) pdr = decompress_gufe_zstd(s3os.pull_protocoldagresult(sk, tf_sk)) - assert pdr.key == protocoldagresults[0].key + assert pdr.key == protocoldagresult.key assert pdr.protocol_unit_results == pdr.protocol_unit_results # test location-based pull @@ -57,5 +65,5 @@ def test_pull_protocolresult( s3os.pull_protocoldagresult(location=objstoreref.location) ) - assert pdr.key == protocoldagresults[0].key + assert pdr.key == protocoldagresult.key assert pdr.protocol_unit_results == pdr.protocol_unit_results From 847372943a47196ae51483495a74552dce2b2a46 Mon Sep 17 00:00:00 2001 From: David Dotson <dotsdl@gmail.com> Date: Thu, 23 Jan 2025 23:18:59 -0700 Subject: [PATCH 17/20] Black! --- .../tests/integration/interface/client/test_client.py | 10 +++------- .../tests/integration/storage/test_objectstore.py | 6 +++--- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 002340fb..09d1353b 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -1805,12 +1805,8 @@ def test_set_tasks_priority_missing_tasks( @staticmethod def _execute_task( - task_scoped_key, - n4js, - s3os_server, - shared_basedir=None, - scratch_basedir=None - ): + task_scoped_key, n4js, s3os_server, shared_basedir=None, scratch_basedir=None + ): shared_basedir = shared_basedir or Path("shared").absolute() shared_basedir.mkdir(exist_ok=True) @@ -1890,7 +1886,7 @@ def _push_result(task_scoped_key, protocoldagresult, n4js, s3os_server): compress_gufe_zstd(protocoldagresult), protocoldagresult.ok(), protocoldagresult.key, - transformation=transformation_sk + transformation=transformation_sk, ) n4js.set_task_result( task=task_scoped_key, protocoldagresultref=protocoldagresultref diff --git a/alchemiscale/tests/integration/storage/test_objectstore.py b/alchemiscale/tests/integration/storage/test_objectstore.py index 77f3e073..e9c75107 100644 --- a/alchemiscale/tests/integration/storage/test_objectstore.py +++ b/alchemiscale/tests/integration/storage/test_objectstore.py @@ -23,10 +23,10 @@ def test_push_protocoldagresult( # try to push the result objstoreref: ProtocolDAGResultRef = s3os.push_protocoldagresult( - compress_gufe_zstd(protocoldagresult), + compress_gufe_zstd(protocoldagresult), protocoldagresult.ok(), protocoldagresult.key, - transformation=transformation_sk + transformation=transformation_sk, ) assert objstoreref.obj_key == protocoldagresult.key @@ -47,7 +47,7 @@ def test_pull_protocoldagresult( compress_gufe_zstd(protocoldagresult), protocoldagresult.ok(), protocoldagresult.key, - transformation=transformation_sk + transformation=transformation_sk, ) # round trip it From d61d133f25a325f7e74417f29d0cb21cd65ce965 Mon Sep 17 00:00:00 2001 From: Ian Kenney <ianmichaelkenney@gmail.com> Date: Fri, 24 Jan 2025 09:58:18 -0700 Subject: [PATCH 18/20] Assign value to protocoldagresult --- alchemiscale/compute/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index ef170763..e4d6715f 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -120,7 +120,7 @@ def retrieve_task_transformation( f"/tasks/{task}/transformation/gufe" ) - if protocoldagresult is not None: + if (protocoldagresult := protocoldagresult_latin1) is not None: protocoldagresult_bytes = protocoldagresult_latin1.encode("latin-1") From 443e193199fb58cc30f81d9160d647731f5f1fa0 Mon Sep 17 00:00:00 2001 From: David Dotson <dotsdl@gmail.com> Date: Fri, 24 Jan 2025 11:01:30 -0700 Subject: [PATCH 19/20] Simplification --- alchemiscale/compute/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index e4d6715f..f860ce53 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -120,7 +120,7 @@ def retrieve_task_transformation( f"/tasks/{task}/transformation/gufe" ) - if (protocoldagresult := protocoldagresult_latin1) is not None: + if protocoldagresult_latin1 is not None: protocoldagresult_bytes = protocoldagresult_latin1.encode("latin-1") From 321f5d724ea5850c7f645fbe76f8d78689de1e18 Mon Sep 17 00:00:00 2001 From: David Dotson <dotsdl@gmail.com> Date: Fri, 24 Jan 2025 11:10:38 -0700 Subject: [PATCH 20/20] Revert "simplification" --- alchemiscale/compute/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index f860ce53..e4d6715f 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -120,7 +120,7 @@ def retrieve_task_transformation( f"/tasks/{task}/transformation/gufe" ) - if protocoldagresult_latin1 is not None: + if (protocoldagresult := protocoldagresult_latin1) is not None: protocoldagresult_bytes = protocoldagresult_latin1.encode("latin-1")