Skip to content

Commit

Permalink
Implement test_set_task_result_legacy
Browse files Browse the repository at this point in the history
Test getting extends ProtocolDAGResults as if they were stored through
the old pdr.to_dict() -> json -> utf-8 encoded format. The new test
can be removed in the next major release that drops the old format.
  • Loading branch information
ianmkenney committed Dec 30, 2024
1 parent 8ee5349 commit cce6e8d
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 6 deletions.
2 changes: 1 addition & 1 deletion alchemiscale/storage/objectstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def push_protocoldagresult(
OBJECT_FILENAME,
)

response = self._store_bytes(location, protocoldagresult)
self._store_bytes(location, protocoldagresult)

return ProtocolDAGResultRef(
location=location,
Expand Down
115 changes: 111 additions & 4 deletions alchemiscale/tests/integration/compute/client/test_compute_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import pytest
import json
import os
from datetime import datetime
from time import sleep

from gufe.tokenization import GufeTokenizable
from gufe.tokenization import GufeTokenizable, JSON_HANDLER

from alchemiscale.models import ScopedKey
from alchemiscale.compute import client
from alchemiscale.storage.models import TaskStatusEnum, ComputeServiceID

from alchemiscale.models import ScopedKey
from alchemiscale.storage.models import TaskStatusEnum, ComputeServiceID, ProtocolDAGResultRef
from alchemiscale.tests.integration.compute.utils import get_compute_settings_override


Expand Down Expand Up @@ -299,3 +301,108 @@ def test_set_task_result(

assert transformation2 == transformation_
assert extends_protocoldagresult2 == protocoldagresults[0]

def test_set_task_result_legacy(
self,
scope_test,
n4js_preloaded,
compute_client: client.AlchemiscaleComputeClient,
compute_service_id,
network_tyk2,
transformation,
protocoldagresults,
uvicorn_server,
s3os_server,
):
# register compute service id
compute_client.register(compute_service_id)

an_sk = ScopedKey(gufe_key=network_tyk2.key, **scope_test.dict())
tf_sk = ScopedKey(gufe_key=transformation.key, **scope_test.dict())
taskhub_sk = n4js_preloaded.get_taskhub(an_sk)

# claim our first task
task_sks = compute_client.claim_taskhub_tasks(
taskhub_sk, compute_service_id=compute_service_id
)

# get the transformation corresponding to this task
(
transformation_,
extends_protocoldagresult,
) = compute_client.retrieve_task_transformation(task_sks[0])

assert transformation_ == transformation
assert extends_protocoldagresult is None

# push a result for the task
# pdr_sk = compute_client.set_task_result(task_sks[0], protocoldagresults[0])

protocoldagresult = protocoldagresults[0]
task_sk = task_sks[0]

# we need to replicate the behavior of set_task_result:
#
# pdr_sk = compute_client.set_task_result(task_sks[0], protocoldagresults[0])
#
# This involves pushing the protocoldagresult in the legacy
# to_dict() -> json -> utf-8 encode form, set the task result
# in the statestore, set the task to complete in the
# statestore
#
#
# step 1: Push the protocoldagresult. This needs to be done
# manually since the old behavior was overwritten.

pdr_bytes_push = json.dumps(protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder).encode("utf-8")
route = "results" if protocoldagresult.ok() else "failures"

location = os.path.join(
"protocoldagresult",
*tf_sk.scope.to_tuple(),
tf_sk.gufe_key,
route,
protocoldagresult.key,
"obj.json",
)

s3os_server._store_bytes(location, pdr_bytes_push)

pdrr = ProtocolDAGResultRef(
location=location,
obj_key=protocoldagresult.key,
scope=tf_sk.scope,
ok=protocoldagresult.ok(),
datetime_created=datetime.utcnow(),
creator=None,
)

# step 2: set the task result in the statestore to reflect the
# protocoldagresult in the objectstore

result_sk = n4js_preloaded.set_task_result(
task=task_sk, protocoldagresultref=pdrr
)

# step 3: set the task to complete in the statestore

if pdrr.ok:
n4js_preloaded.set_task_complete(tasks=[task_sk])
else:
n4js_preloaded.set_task_error(tasks=[task_sk])

# continue normally and show the protocoldagresult stored in
# the legacy format is properly fetched and decoded

# create a task that extends the one we just "performed"
task_sk2 = n4js_preloaded.create_task(tf_sk, extends=task_sks[0])

# get the transformation and the protocoldagresult for the task this extends
# no need to claim to actually do this
(
transformation2,
extends_protocoldagresult2,
) = compute_client.retrieve_task_transformation(task_sk2)

assert transformation2 == transformation_
assert extends_protocoldagresult2 == protocoldagresults[0]
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,7 @@ def test_get_transformation_and_network_results_json(
transformation_sk.gufe_key,
"results",
protocoldagresult.key,
"obj",
"obj.json",
)

s3os_server._store_bytes(location, pdr_jb)
Expand Down

0 comments on commit cce6e8d

Please sign in to comment.