Skip to content

Commit

Permalink
add integration test for gpu scheduling/avoidance (ray-project#18729)
Browse files Browse the repository at this point in the history
  • Loading branch information
sasha-s authored Sep 18, 2021
1 parent 64c2598 commit 57edc0c
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions python/ray/tests/test_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,64 @@ def f(s, ref):
wait_for_condition(lambda: not fetches_queued())


def build_cluster(num_cpu_nodes, num_gpu_nodes):
cluster = ray.cluster_utils.Cluster()
gpu_ids = [
cluster.add_node(num_cpus=2, num_gpus=1).unique_id
for _ in range(num_gpu_nodes)
]
cpu_ids = [
cluster.add_node(num_cpus=1).unique_id for _ in range(num_cpu_nodes)
]
cluster.wait_for_nodes()
return cluster, cpu_ids, gpu_ids


def test_gpu(monkeypatch):
monkeypatch.setenv("RAY_scheduler_avoid_gpu_nodes", "1")
n = 5

cluster, cpu_node_ids, gpu_node_ids = build_cluster(n, n)
try:
ray.init(address=cluster.address)

@ray.remote(num_cpus=1)
class Actor1:
def __init__(self):
pass

def get_location(self):
return ray.worker.global_worker.node.unique_id

@ray.remote
def task_cpu(num_cpus=0.5):
time.sleep(10)
return ray.worker.global_worker.node.unique_id

@ray.remote(num_returns=2, num_gpus=0.5)
def launcher():
a = Actor1.remote()
task_results = [task_cpu.remote() for _ in range(n)]
actor_results = [a.get_location.remote() for _ in range(n)]
return ray.get(task_results + actor_results
), ray.worker.global_worker.node.unique_id

r = launcher.remote()

ids, launcher_id = ray.get(r)

assert launcher_id in gpu_node_ids, \
"expected launcher task to be scheduled on GPU nodes"

for node_id in ids:
assert node_id in cpu_node_ids, \
"expected non-GPU tasks/actors to be scheduled on" \
"non-GPU nodes."
finally:
ray.shutdown()
cluster.shutdown()


if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

0 comments on commit 57edc0c

Please sign in to comment.