From 7f72302251ef96626d39e0629b32682ea154eeaa Mon Sep 17 00:00:00 2001 From: Stephen Rosen Date: Tue, 5 Oct 2021 20:26:13 +0000 Subject: [PATCH 01/11] Split 'smoke_test' and 'hourly' GH workflows 'hourly' runs on the hour and notifies email 'smoke_test' runs on push and PR and does not notify --- .../workflows/{tutorial.yaml => hourly.yaml} | 29 ++++----------- .github/workflows/smoke_test.yaml | 35 ++----------------- 2 files changed, 9 insertions(+), 55 deletions(-) rename .github/workflows/{tutorial.yaml => hourly.yaml} (72%) diff --git a/.github/workflows/tutorial.yaml b/.github/workflows/hourly.yaml similarity index 72% rename from .github/workflows/tutorial.yaml rename to .github/workflows/hourly.yaml index f00089a12..5e43d371a 100644 --- a/.github/workflows/tutorial.yaml +++ b/.github/workflows/hourly.yaml @@ -1,4 +1,4 @@ -name: tutorial_test +name: hourly on: schedule: @@ -8,44 +8,29 @@ on: tags: required: false description: "manual test" - push: - branches: - - "*" - tags: - - "*" jobs: - test: - strategy: - matrix: - python-version: [3.7] + tutorial_test: runs-on: ubuntu-latest - steps: - uses: actions/checkout@v2 with: ref: main - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v1 + - uses: actions/setup-python@v1 with: - python-version: ${{ matrix.python-version }} - + python-version: 3.7 - name: Install dependencies for funcx-sdk and test requirements run: | python -m pip install --upgrade pip setuptools wheel python -m pip install -r funcx_sdk/test-requirements.txt - pip list - name: Check for vulnerabilities in libraries run: | pip install safety - pip freeze | safety check - - name: Test sdk by just importing - run: | - python -m pip install funcx - python -c "from funcx.sdk.client import FuncXClient" + safety check - name: Run smoke tests to check liveness of hosted services run: | pytest -v funcx_endpoint/tests/smoke_tests --api-client-id ${{ secrets.API_CLIENT_ID }} --api-client-secret ${{ secrets.API_CLIENT_SECRET }} + # FIXME: make this send to a listhost or Slack - name: Send mail if: ${{ failure() }} uses: dawidd6/action-send-mail@v3 @@ -57,4 +42,4 @@ jobs: subject: ${{ github.repository }} - Tutorial test ${{ job.status }} to: ryan.chard@gmail.com,rchard@anl.gov,chard@uchicago.edu,yadudoc1729@gmail.com,josh@globus.org,bengal1@illinois.edu,benc@hawaga.org.uk,sirosen@globus.org,uriel@globus.org from: funcX Tests # - body: The ${{ github.repository }} test ${{ github.workflow }} exited with status - ${{ job.status }}! \ No newline at end of file + body: The ${{ github.repository }} test ${{ github.workflow }} exited with status - ${{ job.status }}! diff --git a/.github/workflows/smoke_test.yaml b/.github/workflows/smoke_test.yaml index 9e0c8d2e6..e894e5e39 100644 --- a/.github/workflows/smoke_test.yaml +++ b/.github/workflows/smoke_test.yaml @@ -1,13 +1,6 @@ name: smoke_test on: - schedule: - - cron: "0 * * * *" - workflow_dispatch: - inputs: - tags: - required: false - description: "manual test" push: branches: - "*" @@ -16,29 +9,17 @@ on: jobs: test: - strategy: - matrix: - python-version: [3.7] runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + - uses: actions/setup-python@v1 with: - ref: main - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v1 - with: - python-version: ${{ matrix.python-version }} - + python-version: 3.7 - name: Install dependencies for funcx-sdk and test requirements run: | python -m pip install --upgrade pip setuptools wheel python -m pip install -r funcx_sdk/test-requirements.txt - pip list - - name: Check for vulnerabilities in libraries - run: | - pip install safety - pip freeze | safety check - name: Test sdk by just importing run: | python -m pip install funcx @@ -46,15 +27,3 @@ jobs: - name: Run smoke tests to check liveness of hosted services run: | pytest -v funcx_endpoint/tests/smoke_tests --api-client-id ${{ secrets.API_CLIENT_ID }} --api-client-secret ${{ secrets.API_CLIENT_SECRET }} - - name: Send mail - if: ${{ failure() }} - uses: dawidd6/action-send-mail@v3 - with: - server_address: smtp.gmail.com - server_port: 465 - username: ${{secrets.MAIL_USERNAME}} - password: ${{secrets.MAIL_PASSWORD}} - subject: ${{ github.repository }} - Tutorial test ${{ job.status }} - to: ryan.chard@gmail.com,rchard@anl.gov,chard@uchicago.edu,yadudoc1729@gmail.com,josh@globus.org,bengal1@illinois.edu,benc@hawaga.org.uk,sirosen@globus.org,uriel@globus.org - from: funcX Tests # - body: The ${{ github.repository }} test ${{ github.workflow }} exited with status - ${{ job.status }}! \ No newline at end of file From dd36156916da7a584bc6cc75908ae3da1f478785 Mon Sep 17 00:00:00 2001 From: Stephen Rosen Date: Wed, 6 Oct 2021 17:01:12 +0000 Subject: [PATCH 02/11] smoke_test workflow to only runs on changes Using `paths`, limit this workflow to PRs and pushes which modify the smoke tests directory. --- .github/workflows/smoke_test.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/smoke_test.yaml b/.github/workflows/smoke_test.yaml index e894e5e39..7e117c286 100644 --- a/.github/workflows/smoke_test.yaml +++ b/.github/workflows/smoke_test.yaml @@ -2,10 +2,15 @@ name: smoke_test on: push: + paths: + - 'funcx_endpoint/tests/smoke_tests/**' branches: - "*" tags: - "*" + pull_request: + paths: + - 'funcx_endpoint/tests/smoke_tests/**' jobs: test: From 5b531d62196cd1a92ec0e807a22e26fe0a477aaa Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Tue, 19 Oct 2021 15:50:50 -0500 Subject: [PATCH 03/11] Adding exception handling to warn and proceed from bad results_ack files. --- funcx_endpoint/funcx_endpoint/endpoint/results_ack.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py b/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py index 5643625cb..367520a1a 100644 --- a/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py +++ b/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py @@ -81,6 +81,9 @@ def persist(self): def load(self): """ Load unacked results from disk """ - if os.path.exists(self.data_path): - with open(self.data_path, 'rb') as fp: - self.unacked_results = pickle.load(fp) + try: + if os.path.exists(self.data_path): + with open(self.data_path, 'rb') as fp: + self.unacked_results = pickle.load(fp) + except pickle.UnpicklingError: + logger.warning(f"[WARNING] Cached results {self.data_path} appear to be corrupt. Proceeding without loading cached results") From e5764b88600b364852f0ae93794c971d4331be68 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Thu, 21 Oct 2021 11:08:18 -0500 Subject: [PATCH 04/11] Updating logger path prefix, and removing extraneous [WARNING] label --- funcx_endpoint/funcx_endpoint/endpoint/results_ack.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py b/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py index 367520a1a..bf9d73184 100644 --- a/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py +++ b/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py @@ -3,7 +3,7 @@ import os import pickle -logger = logging.getLogger(__name__) +logger = logging.getLogger("endpoint." + __name__) class ResultsAckHandler(): @@ -86,4 +86,4 @@ def load(self): with open(self.data_path, 'rb') as fp: self.unacked_results = pickle.load(fp) except pickle.UnpicklingError: - logger.warning(f"[WARNING] Cached results {self.data_path} appear to be corrupt. Proceeding without loading cached results") + logger.warning(f"Cached results {self.data_path} appear to be corrupt. Proceeding without loading cached results") From d70c5fdaada15dc12dcae687bdfefea5bd5034cd Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Thu, 21 Oct 2021 11:18:08 -0500 Subject: [PATCH 05/11] Explicitly set the logger path --- funcx_endpoint/funcx_endpoint/endpoint/results_ack.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py b/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py index bf9d73184..e222445e0 100644 --- a/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py +++ b/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py @@ -3,7 +3,9 @@ import os import pickle -logger = logging.getLogger("endpoint." + __name__) +# The logger path needs to start with endpoint. while the current path +# start with funcx_endpoint.endpoint. +logger = logging.getLogger("endpoint.results_ack") class ResultsAckHandler(): From 7dcabb3deb2e61824c1d422be08076e9d7b8b4e4 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Fri, 22 Oct 2021 10:53:35 -0500 Subject: [PATCH 06/11] Extend smoke test timeouts --- funcx_endpoint/tests/smoke_tests/test_async.py | 2 +- funcx_endpoint/tests/smoke_tests/test_executor.py | 2 +- funcx_endpoint/tests/smoke_tests/test_running_functions.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/funcx_endpoint/tests/smoke_tests/test_async.py b/funcx_endpoint/tests/smoke_tests/test_async.py index 2182b920d..4a9eb5afe 100644 --- a/funcx_endpoint/tests/smoke_tests/test_async.py +++ b/funcx_endpoint/tests/smoke_tests/test_async.py @@ -10,7 +10,7 @@ async def simple_task(fxc, endpoint): squared_function = fxc.register_function(squared) x = random.randint(0, 100) task = fxc.run(x, endpoint_id=endpoint, function_id=squared_function) - result = await asyncio.wait_for(task, 20) + result = await asyncio.wait_for(task, 60) assert result == squared(x), "Got wrong answer" diff --git a/funcx_endpoint/tests/smoke_tests/test_executor.py b/funcx_endpoint/tests/smoke_tests/test_executor.py index 54997c3e5..9795ebe2b 100644 --- a/funcx_endpoint/tests/smoke_tests/test_executor.py +++ b/funcx_endpoint/tests/smoke_tests/test_executor.py @@ -11,4 +11,4 @@ def test_executor_basic(fx, endpoint): x = random.randint(0, 100) fut = fx.submit(double, x, endpoint_id=endpoint) - assert fut.result(timeout=10) == x * 2, "Got wrong answer" + assert fut.result(timeout=60) == x * 2, "Got wrong answer" diff --git a/funcx_endpoint/tests/smoke_tests/test_running_functions.py b/funcx_endpoint/tests/smoke_tests/test_running_functions.py index 4714d6c97..6e78bd3f8 100644 --- a/funcx_endpoint/tests/smoke_tests/test_running_functions.py +++ b/funcx_endpoint/tests/smoke_tests/test_running_functions.py @@ -5,7 +5,7 @@ def test_run_pre_registered_function(fxc, endpoint, tutorial_funcion_id): """This test confirms that we are connected to the default production DB""" fn_id = fxc.run(endpoint_id=endpoint, function_id=tutorial_funcion_id) - time.sleep(10) + time.sleep(30) result = fxc.get_result(fn_id) assert result == "Hello World!", f"Expected result: Hello World!, got {result}" From d58490f662c8a16797ac1780ca783c25b487e1b2 Mon Sep 17 00:00:00 2001 From: Stephen Rosen Date: Fri, 22 Oct 2021 17:38:26 +0000 Subject: [PATCH 07/11] Fix smoke-test CI to install funcx SDK Also, install from local repo, not pypi. --- .github/workflows/hourly.yaml | 1 + .github/workflows/smoke_test.yaml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/hourly.yaml b/.github/workflows/hourly.yaml index 5e43d371a..4abfb9d7c 100644 --- a/.github/workflows/hourly.yaml +++ b/.github/workflows/hourly.yaml @@ -22,6 +22,7 @@ jobs: - name: Install dependencies for funcx-sdk and test requirements run: | python -m pip install --upgrade pip setuptools wheel + python -m pip install ./funcx_sdk python -m pip install -r funcx_sdk/test-requirements.txt - name: Check for vulnerabilities in libraries run: | diff --git a/.github/workflows/smoke_test.yaml b/.github/workflows/smoke_test.yaml index 7e117c286..6aa569e9a 100644 --- a/.github/workflows/smoke_test.yaml +++ b/.github/workflows/smoke_test.yaml @@ -24,10 +24,10 @@ jobs: - name: Install dependencies for funcx-sdk and test requirements run: | python -m pip install --upgrade pip setuptools wheel + python -m pip install ./funcx_sdk python -m pip install -r funcx_sdk/test-requirements.txt - name: Test sdk by just importing run: | - python -m pip install funcx python -c "from funcx.sdk.client import FuncXClient" - name: Run smoke tests to check liveness of hosted services run: | From 294fca53855c12c95fbbc3ccbed5e6183658b674 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Mon, 25 Oct 2021 11:53:37 -0500 Subject: [PATCH 08/11] Allow funcx_service_address to be configurable from the executor If funcx_service_address is not specified the default address from the FuncXClient is used. --- .../executors/high_throughput/executor.py | 5 +++++ .../executors/high_throughput/interchange.py | 11 ++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py b/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py index c9bd2672b..217b4468f 100644 --- a/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py +++ b/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py @@ -189,6 +189,10 @@ class HighThroughputExecutor(StatusHandlingExecutor, RepresentationMixin): and SlurmProvider interfaces to request resources from the Slurm batch scheduler. Default: LocalProvider + funcx_service_address: str + Override funcx_service_address used by the FuncXClient. If no address is specified, + the FuncXClient's default funcx_service_address is used. + Default: None """ def __init__(self, @@ -230,6 +234,7 @@ def __init__(self, managed=True, interchange_local=True, passthrough=True, + funcx_service_address=None, task_status_queue=None): logger.debug("Initializing HighThroughputExecutor") diff --git a/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py b/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py index cf41c07d9..4e3f9b581 100644 --- a/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py +++ b/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py @@ -185,6 +185,11 @@ def __init__(self, suppress_failure : Bool When set to True, the interchange will attempt to suppress failures. Default: False + + funcx_service_address: str + Override funcx_service_address used by the FuncXClient. If no address is specified, + the FuncXClient's default funcx_service_address is used. + Default: None """ self.logdir = logdir @@ -259,7 +264,11 @@ def __init__(self, self.pending_task_queue = {} self.containers = {} self.total_pending_task_count = 0 - self.fxs = FuncXClient(funcx_service_address=funcx_service_address) + self.funcx_service_address = funcx_service_address + if self.funcx_service_address: + self.fxs = FuncXClient(funcx_service_address=funcx_service_address) + else: + self.fxs = FuncXClient() logger.info("Interchange address is {}".format(self.interchange_address)) self.worker_ports = worker_ports From c1a9e95516a74619b70bde06c11f5b96cd61576d Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Mon, 25 Oct 2021 12:24:56 -0500 Subject: [PATCH 09/11] Removing unnecessary variable --- .../funcx_endpoint/executors/high_throughput/interchange.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py b/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py index 4e3f9b581..0340a8651 100644 --- a/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py +++ b/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py @@ -264,8 +264,7 @@ def __init__(self, self.pending_task_queue = {} self.containers = {} self.total_pending_task_count = 0 - self.funcx_service_address = funcx_service_address - if self.funcx_service_address: + if funcx_service_address: self.fxs = FuncXClient(funcx_service_address=funcx_service_address) else: self.fxs = FuncXClient() From 72945c229d2fc497f9191b316d3661db7a7738a6 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Tue, 26 Oct 2021 10:51:42 -0500 Subject: [PATCH 10/11] Fixing the broken param setting in the executor --- .../funcx_endpoint/executors/high_throughput/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py b/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py index 217b4468f..46b7b0e50 100644 --- a/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py +++ b/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py @@ -283,7 +283,7 @@ def __init__(self, self.task_status_queue = task_status_queue # FuncX specific options - self.funcx_service_address = None + self.funcx_service_address = funcx_service_address self.container_image = container_image self.worker_mode = worker_mode self.last_response_time = time.time() From 264c854e17bf2379bc7251320afcc358b2aaf125 Mon Sep 17 00:00:00 2001 From: Michael McQuade Date: Wed, 27 Oct 2021 10:58:25 -0500 Subject: [PATCH 11/11] Fix embed of funcX model image --- docs/index.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.rst b/docs/index.rst index ada5e4193..b1b7f29e0 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -40,7 +40,7 @@ results. funcX securely communicates with remote endpoints, waits for resources to become available, and can even retry execution upon failure. funcX stores results (or errors) in the cloud-hosted service until they are retrieved by the user. -.. image:: img/funcx-model.png +.. image:: img/funcX-model.png Using funcX