Skip to content

Commit

Permalink
Disable MPI mode for GlobusComputeEngine
Browse files Browse the repository at this point in the history
Users are instead directed to use GlobusMPIEngine. Also raises an error
when GlobusComputeEngine receives a resource_specification, since that
only applies to MPI apps; related tests have been moved to the MPI
engine test file.
  • Loading branch information
chris-janidlo committed Jul 29, 2024
1 parent 85b6151 commit d32306b
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 94 deletions.
12 changes: 12 additions & 0 deletions compute_endpoint/globus_compute_endpoint/engines/globus_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ def __init__(
if this option is enabled. Default: False
""" # noqa: E501

if kwargs.get("enable_mpi_mode") or kwargs.get("mpi_launcher"):
raise ValueError(
"MPI mode is not supported on GlobusComputeEngine."
" Use GlobusMPIEngine instead."
)

self.run_dir = os.getcwd()
self.label = label
self._status_report_thread = ReportingThread(target=self.report_status, args=[])
Expand Down Expand Up @@ -256,6 +263,11 @@ def _submit(
*args: t.Any,
**kwargs: t.Any,
) -> Future:
if resource_specification:
raise ValueError(
"resource_specification is not supported on GlobusComputeEngine."
" For MPI apps, use GlobusMPIEngine."
)
return self.executor.submit(func, resource_specification, *args, **kwargs)

@property
Expand Down
11 changes: 11 additions & 0 deletions compute_endpoint/globus_compute_endpoint/engines/globus_mpi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import typing as t
from concurrent.futures import Future

from globus_compute_endpoint.engines.globus_compute import (
VALID_CONTAINER_TYPES,
Expand Down Expand Up @@ -100,3 +101,13 @@ def __init__(
working_dir=working_dir,
run_in_sandbox=run_in_sandbox,
)

def _submit(
self,
func: t.Callable,
resource_specification: t.Dict,
*args: t.Any,
**kwargs: t.Any,
) -> Future:
# override submit since super rejects resource_specification
return self.executor.submit(func, resource_specification, *args, **kwargs)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import concurrent.futures
import random
import uuid

import pytest
from globus_compute_common import messagepack
from globus_compute_endpoint.engines import GlobusMPIEngine
from globus_compute_sdk.serialize import ComputeSerializer
from parsl.executors.high_throughput.mpi_prefix_composer import (
InvalidResourceSpecification,
)
from tests.utils import ez_pack_function, get_env_vars

temporary_skip = False
Expand Down Expand Up @@ -70,3 +74,42 @@ def test_env_vars(engine_runner, nodeslist, tmp_path):

assert inner_result["PARSL_NUM_NODES"] == str(num_nodes)
assert inner_result["PARSL_MPI_PREFIX"].startswith("mpiexec")


def test_mpi_res_spec(engine_runner, nodeslist):
"""Test passing valid MPI Resource spec to mpi enabled engine"""
engine = engine_runner(GlobusMPIEngine)

for i in range(1, len(nodeslist) + 1):
res_spec = {"num_nodes": i, "num_ranks": random.randint(1, 8)}

future = engine._submit(get_env_vars, resource_specification=res_spec)
assert isinstance(future, concurrent.futures.Future)
env_vars = future.result()
provisioned_nodes = env_vars["PARSL_MPI_NODELIST"].strip().split(",")

assert len(provisioned_nodes) == res_spec["num_nodes"]
for prov_node in provisioned_nodes:
assert prov_node in nodeslist
assert env_vars["PARSL_NUM_NODES"] == str(res_spec["num_nodes"])
assert env_vars["PARSL_NUM_RANKS"] == str(res_spec["num_ranks"])
assert env_vars["PARSL_MPI_PREFIX"].startswith("mpiexec")


def test_no_mpi_res_spec(engine_runner):
"""Test passing valid MPI Resource spec to mpi enabled engine"""
engine = engine_runner(GlobusMPIEngine)

res_spec = None
with pytest.raises(AttributeError):
engine._submit(get_env_vars, resource_specification=res_spec)


def test_bad_mpi_res_spec(engine_runner):
"""Test passing valid MPI Resource spec to mpi enabled engine"""
engine = engine_runner(GlobusMPIEngine)

res_spec = {"FOO": "BAR"}

with pytest.raises(InvalidResourceSpecification):
engine._submit(get_env_vars, resource_specification=res_spec)
21 changes: 21 additions & 0 deletions compute_endpoint/tests/unit/test_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,24 @@ def test_gcengine_exception_report_from_bad_state():
assert result.task_id == task_id
assert result.error_details.code == "RemoteExecutionError"
assert "ZeroDivisionError" in result.data


def test_gcengine_rejects_mpi_mode(randomstring):
with pytest.raises(ValueError) as pyt_exc_1:
GlobusComputeEngine(enable_mpi_mode=True)

assert "is not supported" in str(pyt_exc_1)

with pytest.raises(ValueError) as pyt_exc_2:
GlobusComputeEngine(mpi_launcher=randomstring())

assert "is not supported" in str(pyt_exc_2)


def test_gcengine_rejects_resource_specification():
with pytest.raises(ValueError) as pyt_exc:
GlobusComputeEngine().submit(
"task_id", packed_task=b"packed_task", resource_specification={"foo": "bar"}
).result()

assert "is not supported" in str(pyt_exc)

0 comments on commit d32306b

Please sign in to comment.