Skip to content

Commit

Permalink
Apply linear backoff strategy to Client smoke tests
Browse files Browse the repository at this point in the history
Applying a uniform linear backoff strategy with a 60-second timeout
improves test stability and simplifies debugging.
  • Loading branch information
rjmello committed Dec 9, 2024
1 parent f65eaa1 commit b1a47f3
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 29 deletions.
48 changes: 36 additions & 12 deletions smoke_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import os
import time
import typing as t

import pytest
from globus_compute_sdk import Client
Expand Down Expand Up @@ -228,10 +229,39 @@ def tutorial_function_id(compute_test_config):


@pytest.fixture
def submit_function_and_get_result(compute_client):
def submit_fn(
endpoint_id, func=None, func_args=None, func_kwargs=None, initial_sleep=0
):
def timeout_s():
return 60


class LinearBackoff:
def __init__(self, timeout_s: int, base_delay_s: int):
self.timeout_s = timeout_s
self.delay_s = base_delay_s
self.start_time: float | None = None

def backoff(self):
if self.start_time is None:
self.start_time = time.monotonic()

elapsed_s = time.monotonic() - self.start_time
assert elapsed_s < self.timeout_s, f"Hit {self.timeout_s} second test timeout"

remaining_s = max(self.timeout_s - elapsed_s, 0)
self.delay_s = round(min(self.delay_s, remaining_s))
time.sleep(self.delay_s)

self.delay_s += 1
return True


@pytest.fixture(scope="function")
def linear_backoff(timeout_s: int):
return LinearBackoff(timeout_s, 1).backoff


@pytest.fixture
def submit_function_and_get_result(compute_client: Client, linear_backoff: t.Callable):
def submit_fn(endpoint_id, func=None, func_args=None, func_kwargs=None):
if callable(func):
func_id = compute_client.register_function(func)
else:
Expand All @@ -246,17 +276,11 @@ def submit_fn(
*func_args, endpoint_id=endpoint_id, function_id=func_id, **func_kwargs
)

if initial_sleep:
time.sleep(initial_sleep)

result = None
response = None
for attempt in range(10):
while linear_backoff():
response = compute_client.get_task(task_id)
if response.get("pending") is False:
result = response.get("result")
else:
time.sleep(attempt)
break

return FuncResult(func_id, task_id, result, response)

Expand Down
28 changes: 11 additions & 17 deletions smoke_tests/tests/test_running_functions.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import concurrent.futures
import time
import typing as t

import globus_compute_sdk as gc
import pytest
from globus_compute_sdk import Executor
from globus_compute_sdk import Client, Executor
from packaging.version import Version

try:
from globus_compute_sdk.errors import TaskPending
except ImportError:
from globus_compute_sdk.utils.errors import TaskPending


sdk_version = Version(gc.version.__version__)


Expand All @@ -35,7 +34,7 @@ def ohai():


@pytest.mark.skipif(sdk_version.release < (2, 2, 5), reason="batch.add iface updated")
def test_batch(compute_client, endpoint):
def test_batch(compute_client: Client, endpoint: str, linear_backoff: t.Callable):
"""Test batch submission and get_batch_result"""

double_fn_id = compute_client.register_function(double)
Expand All @@ -49,9 +48,7 @@ def test_batch(compute_client, endpoint):
batch_res = compute_client.batch_run(endpoint, batch)
tasks = [t for tl in batch_res["tasks"].values() for t in tl]

total = 0
for _i in range(12):
time.sleep(5)
while linear_backoff():
results = compute_client.get_batch_result(tasks)
try:
total = sum(results[tid]["result"] for tid in results)
Expand All @@ -62,33 +59,30 @@ def test_batch(compute_client, endpoint):
assert total == 2 * (sum(inputs)), "Batch run results do not add up"


def test_wait_on_new_hello_world_func(compute_client, endpoint):
def test_wait_on_new_hello_world_func(
compute_client: Client, endpoint, linear_backoff: t.Callable
):
func_id = compute_client.register_function(ohai)
task_id = compute_client.run(endpoint_id=endpoint, function_id=func_id)

got_result = False
for _ in range(30):
while linear_backoff():
try:
result = compute_client.get_result(task_id)
got_result = True
break
except TaskPending:
time.sleep(1)
pass

assert got_result
assert result == "ohai"


def test_executor(compute_client, endpoint, tutorial_function_id):
def test_executor(compute_client, endpoint, tutorial_function_id, timeout_s: int):
"""Test using Executor to retrieve results."""
res = compute_client.web_client.get_version()
assert res.http_status == 200, f"Received {res.http_status} instead!"

num_tasks = 10
submit_count = 2 # we've had at least one bug that prevented executor re-use

# 30s sometimes fails, trying with 60 to see if that makes a difference
timeout_s = 60

# use client on newer versions and funcx_client on older
try:
gce = Executor(endpoint_id=endpoint, client=compute_client)
Expand Down

0 comments on commit b1a47f3

Please sign in to comment.