-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Python API update taskhandler and worker (#463)
- Loading branch information
Showing
5 changed files
with
291 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,9 @@ | ||
from .worker import ArmoniKWorker | ||
from .taskhandler import TaskHandler | ||
from .seqlogger import ClefLogger | ||
|
||
__all__ = [ | ||
'ArmoniKWorker', | ||
'TaskHandler', | ||
'ClefLogger', | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
import datetime | ||
import logging | ||
import warnings | ||
|
||
from .conftest import all_rpc_called, rpc_called, get_client, data_folder | ||
from armonik.common import TaskDefinition, TaskOptions | ||
from armonik.worker import TaskHandler | ||
from armonik.protogen.worker.agent_service_pb2_grpc import AgentStub | ||
from armonik.protogen.common.worker_common_pb2 import ProcessRequest | ||
from armonik.protogen.common.objects_pb2 import Configuration | ||
|
||
|
||
logging.basicConfig() | ||
logging.getLogger().setLevel(logging.INFO) | ||
|
||
|
||
class TestTaskHandler: | ||
|
||
request =ProcessRequest( | ||
communication_token="token", | ||
session_id="session-id", | ||
task_id="task-id", | ||
expected_output_keys=["result-id"], | ||
payload_id="payload-id", | ||
data_dependencies=["dd-id"], | ||
data_folder=data_folder, | ||
configuration=Configuration(data_chunk_max_size=8000), | ||
task_options=TaskOptions( | ||
max_duration=datetime.timedelta(seconds=1), | ||
priority=1, | ||
max_retries=1 | ||
).to_message() | ||
) | ||
|
||
def test_taskhandler_init(self): | ||
task_handler = TaskHandler(self.request, get_client("Agent")) | ||
|
||
assert task_handler.session_id == "session-id" | ||
assert task_handler.task_id == "task-id" | ||
assert task_handler.task_options == TaskOptions( | ||
max_duration=datetime.timedelta(seconds=1), | ||
priority=1, | ||
max_retries=1, | ||
partition_id='', | ||
application_name='', | ||
application_version='', | ||
application_namespace='', | ||
application_service='', | ||
engine_type='', | ||
options={} | ||
) | ||
assert task_handler.token == "token" | ||
assert task_handler.expected_results == ["result-id"] | ||
assert task_handler.configuration == Configuration(data_chunk_max_size=8000) | ||
assert task_handler.payload_id == "payload-id" | ||
assert task_handler.data_folder == data_folder | ||
assert task_handler.payload == "payload".encode() | ||
assert task_handler.data_dependencies == {"dd-id": "dd".encode()} | ||
|
||
def test_create_task(self): | ||
with warnings.catch_warnings(record=True) as w: | ||
# Cause all warnings to always be triggered. | ||
warnings.simplefilter("always") | ||
|
||
task_handler = TaskHandler(self.request, get_client("Agent")) | ||
tasks, errors = task_handler.create_tasks([TaskDefinition( | ||
payload=b"payload", | ||
expected_output_ids=["result-id"], | ||
data_dependencies=[])]) | ||
|
||
assert issubclass(w[-1].category, DeprecationWarning) | ||
assert rpc_called("Agent", "CreateTask") | ||
assert tasks == [] | ||
assert errors == [] | ||
|
||
def test_submit_tasks(self): | ||
task_handler = TaskHandler(self.request, get_client("Agent")) | ||
tasks = task_handler.submit_tasks([TaskDefinition(payload_id="payload-id", | ||
expected_output_ids=["result-id"], | ||
data_dependencies=[])] | ||
) | ||
|
||
assert rpc_called("Agent", "SubmitTasks") | ||
assert tasks is None | ||
|
||
def test_send_results(self): | ||
task_handler = TaskHandler(self.request, get_client("Agent")) | ||
resuls = task_handler.send_results({"result-id": b"result data"}) | ||
assert rpc_called("Agent", "NotifyResultData") | ||
assert resuls is None | ||
|
||
def test_create_result_metadata(self): | ||
task_handler = TaskHandler(self.request, get_client("Agent")) | ||
results = task_handler.create_results_metadata(["result-name"]) | ||
|
||
assert rpc_called("Agent", "CreateResultsMetaData") | ||
# TODO: Mock must be updated to return something and so that changes the following assertions | ||
assert results == {} | ||
|
||
def test_create_results(self): | ||
task_handler = TaskHandler(self.request, get_client("Agent")) | ||
results = task_handler.create_results({"result-name": b"test data"}) | ||
|
||
assert rpc_called("Agent", "CreateResults") | ||
assert results == {} | ||
|
||
def test_service_fully_implemented(self): | ||
assert all_rpc_called("Agent", missings=["GetCommonData", "GetDirectData", "GetResourceData"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import datetime | ||
import grpc | ||
import logging | ||
import os | ||
import pytest | ||
|
||
from .conftest import data_folder, grpc_endpoint | ||
from armonik.worker import ArmoniKWorker, TaskHandler, ClefLogger | ||
from armonik.common import Output, TaskOptions | ||
from armonik.protogen.common.objects_pb2 import Empty, Configuration | ||
from armonik.protogen.common.worker_common_pb2 import ProcessRequest | ||
|
||
|
||
def do_nothing(_: TaskHandler) -> Output: | ||
return Output() | ||
|
||
|
||
def throw_error(_: TaskHandler) -> Output: | ||
raise ValueError("TestError") | ||
|
||
|
||
def return_error(_: TaskHandler) -> Output: | ||
return Output("TestError") | ||
|
||
|
||
def return_and_send(th: TaskHandler) -> Output: | ||
th.send_results({th.expected_results[0]: b"result"}) | ||
return Output() | ||
|
||
|
||
class TestWorker: | ||
|
||
request = ProcessRequest( | ||
communication_token="token", | ||
session_id="session-id", | ||
task_id="task-id", | ||
expected_output_keys=["result-id"], | ||
payload_id="payload-id", | ||
data_dependencies=["dd-id"], | ||
data_folder=data_folder, | ||
configuration=Configuration(data_chunk_max_size=8000), | ||
task_options=TaskOptions( | ||
max_duration=datetime.timedelta(seconds=1), | ||
priority=1, | ||
max_retries=1 | ||
).to_message() | ||
) | ||
|
||
def test_do_nothing(self): | ||
with grpc.insecure_channel(grpc_endpoint) as agent_channel: | ||
worker = ArmoniKWorker(agent_channel, do_nothing, logger=ClefLogger("TestLogger", level=logging.CRITICAL)) | ||
reply = worker.Process(self.request, None) | ||
assert Output(reply.output.error.details if reply.output.WhichOneof("type") == "error" else None).success | ||
worker.HealthCheck(Empty(), None) | ||
|
||
def test_should_return_none(self): | ||
with grpc.insecure_channel(grpc_endpoint) as agent_channel: | ||
worker = ArmoniKWorker(agent_channel, throw_error, logger=ClefLogger("TestLogger", level=logging.CRITICAL)) | ||
reply = worker.Process(self.request, None) | ||
assert reply is None | ||
|
||
def test_should_error(self): | ||
with grpc.insecure_channel(grpc_endpoint) as agent_channel: | ||
worker = ArmoniKWorker(agent_channel, return_error, logger=ClefLogger("TestLogger", level=logging.CRITICAL)) | ||
reply = worker.Process(self.request, None) | ||
output = Output(reply.output.error.details if reply.output.WhichOneof("type") == "error" else None) | ||
assert not output.success | ||
assert output.error == "TestError" | ||
|
||
def test_should_write_result(self): | ||
with grpc.insecure_channel(grpc_endpoint) as agent_channel: | ||
worker = ArmoniKWorker(agent_channel, return_and_send, logger=ClefLogger("TestLogger", level=logging.DEBUG)) | ||
reply = worker.Process(self.request, None) | ||
assert reply is not None | ||
output = Output(reply.output.error.details if reply.output.WhichOneof("type") == "error" else None) | ||
assert output.success | ||
assert os.path.exists(os.path.join(data_folder, self.request.expected_output_keys[0])) | ||
with open(os.path.join(data_folder, self.request.expected_output_keys[0]), "rb") as f: | ||
value = f.read() | ||
assert len(value) > 0 |