Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply linear backoff strategy to Client smoke tests #1746

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions smoke_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import os
import time
import typing as t

import pytest
from globus_compute_sdk import Client
Expand Down Expand Up @@ -228,10 +229,39 @@ def tutorial_function_id(compute_test_config):


@pytest.fixture
def submit_function_and_get_result(compute_client):
def submit_fn(
endpoint_id, func=None, func_args=None, func_kwargs=None, initial_sleep=0
):
def timeout_s():
return 60


class LinearBackoff:
def __init__(self, timeout_s: int, base_delay_s: int):
self.timeout_s = timeout_s
self.delay_s = base_delay_s
self.start_time: float | None = None

def backoff(self):
if self.start_time is None:
self.start_time = time.monotonic()

elapsed_s = time.monotonic() - self.start_time
assert elapsed_s < self.timeout_s, f"Hit {self.timeout_s} second test timeout"

remaining_s = max(self.timeout_s - elapsed_s, 0)
self.delay_s = round(min(self.delay_s, remaining_s))
time.sleep(self.delay_s)

self.delay_s += 1
return True


@pytest.fixture(scope="function")
def linear_backoff(timeout_s: int):
return LinearBackoff(timeout_s, 1).backoff


@pytest.fixture
def submit_function_and_get_result(compute_client: Client, linear_backoff: t.Callable):
def submit_fn(endpoint_id, func=None, func_args=None, func_kwargs=None):
if callable(func):
func_id = compute_client.register_function(func)
else:
Expand All @@ -246,17 +276,11 @@ def submit_fn(
*func_args, endpoint_id=endpoint_id, function_id=func_id, **func_kwargs
)

if initial_sleep:
time.sleep(initial_sleep)

result = None
response = None
for attempt in range(10):
while linear_backoff():
response = compute_client.get_task(task_id)
if response.get("pending") is False:
result = response.get("result")
else:
time.sleep(attempt)
break

return FuncResult(func_id, task_id, result, response)

Expand Down
28 changes: 11 additions & 17 deletions smoke_tests/tests/test_running_functions.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import concurrent.futures
import time
import typing as t

import globus_compute_sdk as gc
import pytest
from globus_compute_sdk import Executor
from globus_compute_sdk import Client, Executor
from packaging.version import Version

try:
from globus_compute_sdk.errors import TaskPending
except ImportError:
from globus_compute_sdk.utils.errors import TaskPending


sdk_version = Version(gc.version.__version__)


Expand All @@ -35,7 +34,7 @@ def ohai():


@pytest.mark.skipif(sdk_version.release < (2, 2, 5), reason="batch.add iface updated")
def test_batch(compute_client, endpoint):
def test_batch(compute_client: Client, endpoint: str, linear_backoff: t.Callable):
"""Test batch submission and get_batch_result"""

double_fn_id = compute_client.register_function(double)
Expand All @@ -49,9 +48,7 @@ def test_batch(compute_client, endpoint):
batch_res = compute_client.batch_run(endpoint, batch)
tasks = [t for tl in batch_res["tasks"].values() for t in tl]

total = 0
for _i in range(12):
time.sleep(5)
while linear_backoff():
results = compute_client.get_batch_result(tasks)
try:
total = sum(results[tid]["result"] for tid in results)
Expand All @@ -62,33 +59,30 @@ def test_batch(compute_client, endpoint):
assert total == 2 * (sum(inputs)), "Batch run results do not add up"


def test_wait_on_new_hello_world_func(compute_client, endpoint):
def test_wait_on_new_hello_world_func(
compute_client: Client, endpoint, linear_backoff: t.Callable
):
func_id = compute_client.register_function(ohai)
task_id = compute_client.run(endpoint_id=endpoint, function_id=func_id)

got_result = False
for _ in range(30):
while linear_backoff():
try:
result = compute_client.get_result(task_id)
got_result = True
break
except TaskPending:
time.sleep(1)
pass

assert got_result
assert result == "ohai"


def test_executor(compute_client, endpoint, tutorial_function_id):
def test_executor(compute_client, endpoint, tutorial_function_id, timeout_s: int):
"""Test using Executor to retrieve results."""
res = compute_client.web_client.get_version()
assert res.http_status == 200, f"Received {res.http_status} instead!"

num_tasks = 10
submit_count = 2 # we've had at least one bug that prevented executor re-use

# 30s sometimes fails, trying with 60 to see if that makes a difference
timeout_s = 60

# use client on newer versions and funcx_client on older
try:
gce = Executor(endpoint_id=endpoint, client=compute_client)
Expand Down
Loading