Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Decouple create worker vs pop worker request. #47694

Merged
merged 14 commits into from
Oct 10, 2024
20 changes: 11 additions & 9 deletions python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@
# for asyncio
try_install_uvloop()

# If the worker setup function is configured, run it.
worker_process_setup_hook_key = os.getenv(
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR
)
if worker_process_setup_hook_key:
error = load_and_execute_setup_hook(worker_process_setup_hook_key)
if error is not None:
ray._private.worker.global_worker.core_worker.drain_and_exit_worker(
"system", error
)

raylet_ip_address = args.raylet_ip_address
if raylet_ip_address is None:
raylet_ip_address = args.node_ip_address
Expand Down Expand Up @@ -276,15 +287,6 @@
module_names_to_import = args.worker_preload_modules.split(",")
ray._private.utils.try_import_each_module(module_names_to_import)

# If the worker setup function is configured, run it.
worker_process_setup_hook_key = os.getenv(
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR
)
if worker_process_setup_hook_key:
error = load_and_execute_setup_hook(worker_process_setup_hook_key)
if error is not None:
worker.core_worker.drain_and_exit_worker("system", error)

if mode == ray.WORKER_MODE:
worker.main_loop()
elif mode in [ray.RESTORE_WORKER_MODE, ray.SPILL_WORKER_MODE]:
Expand Down
52 changes: 52 additions & 0 deletions python/ray/tests/test_node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,58 @@
from ray._private.utils import get_num_cpus
import time
import sys
import filelock

# The first worker with such env var starts up normally, while the second worker hangs.
# This is achieved with file locks. The first worker creates a file and acquires a lock
# and never releases (don't learn from this example). The second worker tries to acquire
# and hangs forever.
leaking_global_lock = None


def _hook():
global leaking_global_lock

if os.environ.get("HELLO") != "WORLD":
print(f"HELLO={os.environ.get('HELLO')}, skip")
return

# Creates a file in $PWD to indicate the hook is executed.
hook_file_path = "ray_hook_ok.lock"
leaking_global_lock = filelock.FileLock(hook_file_path)
print(f"acquiring lock for {hook_file_path}")
leaking_global_lock.acquire()
print(f"acquired lock for {hook_file_path}")


def test_can_reuse_released_workers(ray_start_cluster):
"""
In this test, a worker startup hook is set so that only 1 worker can start, and all
subsequent workers will hang in runtime start up forever. We issue 2 tasks and test
that the second task can still be scheduled on the first worker released from the
first task, i.e. the task is not binded to the worker that it requested to start.
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=2)
ray.init(
address=cluster.address,
runtime_env={"worker_process_setup_hook": "ray.tests.test_node_manager._hook"},
)

# A runtime env is set to avoid interference from prestarted workers.
@ray.remote(runtime_env={"env_vars": {"HELLO": "WORLD"}})
def f():
# Sleep for a while to make sure o2 also requests a worker.
time.sleep(1)
print(f"pid={os.getpid()}, env HELLO={os.environ.get('HELLO')}")
return os.getpid()

o1 = f.remote()
o2 = f.remote()

# A worker needs 5s to start.
pids = ray.get([o1, o2], timeout=8)
assert pids[0] == pids[1]


# This tests the queue transitions for infeasible tasks. This has been an issue
Expand Down
Loading