From 29f960f264c2bb959f65c07faafb07d20402ad5a Mon Sep 17 00:00:00 2001 From: matthewc2003 Date: Thu, 17 Oct 2024 13:29:49 -0700 Subject: [PATCH] Add resource specification fields to HTEX (#3638) Adds the parameter 'priority' as a valid entry in the resource spec dict. Necessary for changing the pending_task_queue to a different structure than queue.Queue. Also passes resource spec to the interchange. --- parsl/executors/high_throughput/executor.py | 20 ++++++++++--------- .../test_resource_spec_validation.py | 7 +++++++ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index fb38c0121e..ab1498efc4 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -346,15 +346,17 @@ def worker_logdir(self): return self.logdir def validate_resource_spec(self, resource_specification: dict): - """HTEX does not support *any* resource_specification options and - will raise InvalidResourceSpecification is any are passed to it""" + """HTEX supports the following *Optional* resource specifications: + priority: lower value is higher priority""" if resource_specification: - raise InvalidResourceSpecification( - set(resource_specification.keys()), - ("HTEX does not support the supplied resource_specifications. " - "For MPI applications consider using the MPIExecutor. " - "For specifications for core count/memory/walltime, consider using WorkQueueExecutor.") - ) + acceptable_fields = {'priority'} + keys = set(resource_specification.keys()) + invalid_keys = keys - acceptable_fields + if invalid_keys: + message = "Task resource specification only accepts these types of resources: {}".format( + ', '.join(acceptable_fields)) + logger.error(message) + raise InvalidResourceSpecification(set(invalid_keys), message) return def initialize_scaling(self): @@ -662,7 +664,7 @@ def submit(self, func, resource_specification, *args, **kwargs): except TypeError: raise SerializationError(func.__name__) - msg = {"task_id": task_id, "buffer": fn_buf} + msg = {"task_id": task_id, "resource_spec": resource_specification, "buffer": fn_buf} # Post task to the outgoing queue self.outgoing_q.put(msg) diff --git a/parsl/tests/test_htex/test_resource_spec_validation.py b/parsl/tests/test_htex/test_resource_spec_validation.py index 5587891650..2a6d577416 100644 --- a/parsl/tests/test_htex/test_resource_spec_validation.py +++ b/parsl/tests/test_htex/test_resource_spec_validation.py @@ -30,6 +30,13 @@ def test_resource_spec_validation(): assert ret_val is None +@pytest.mark.local +def test_resource_spec_validation_one_key(): + htex = HighThroughputExecutor() + ret_val = htex.validate_resource_spec({"priority": 2}) + assert ret_val is None + + @pytest.mark.local def test_resource_spec_validation_bad_keys(): htex = HighThroughputExecutor()