diff --git a/.github/workflows/tutorial.yaml b/.github/workflows/hourly.yaml similarity index 72% rename from .github/workflows/tutorial.yaml rename to .github/workflows/hourly.yaml index f00089a12..4abfb9d7c 100644 --- a/.github/workflows/tutorial.yaml +++ b/.github/workflows/hourly.yaml @@ -1,4 +1,4 @@ -name: tutorial_test +name: hourly on: schedule: @@ -8,44 +8,30 @@ on: tags: required: false description: "manual test" - push: - branches: - - "*" - tags: - - "*" jobs: - test: - strategy: - matrix: - python-version: [3.7] + tutorial_test: runs-on: ubuntu-latest - steps: - uses: actions/checkout@v2 with: ref: main - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v1 + - uses: actions/setup-python@v1 with: - python-version: ${{ matrix.python-version }} - + python-version: 3.7 - name: Install dependencies for funcx-sdk and test requirements run: | python -m pip install --upgrade pip setuptools wheel + python -m pip install ./funcx_sdk python -m pip install -r funcx_sdk/test-requirements.txt - pip list - name: Check for vulnerabilities in libraries run: | pip install safety - pip freeze | safety check - - name: Test sdk by just importing - run: | - python -m pip install funcx - python -c "from funcx.sdk.client import FuncXClient" + safety check - name: Run smoke tests to check liveness of hosted services run: | pytest -v funcx_endpoint/tests/smoke_tests --api-client-id ${{ secrets.API_CLIENT_ID }} --api-client-secret ${{ secrets.API_CLIENT_SECRET }} + # FIXME: make this send to a listhost or Slack - name: Send mail if: ${{ failure() }} uses: dawidd6/action-send-mail@v3 @@ -57,4 +43,4 @@ jobs: subject: ${{ github.repository }} - Tutorial test ${{ job.status }} to: ryan.chard@gmail.com,rchard@anl.gov,chard@uchicago.edu,yadudoc1729@gmail.com,josh@globus.org,bengal1@illinois.edu,benc@hawaga.org.uk,sirosen@globus.org,uriel@globus.org from: funcX Tests # - body: The ${{ github.repository }} test ${{ github.workflow }} exited with status - ${{ job.status }}! \ No newline at end of file + body: The ${{ github.repository }} test ${{ github.workflow }} exited with status - ${{ job.status }}! diff --git a/.github/workflows/smoke_test.yaml b/.github/workflows/smoke_test.yaml index 9e0c8d2e6..6aa569e9a 100644 --- a/.github/workflows/smoke_test.yaml +++ b/.github/workflows/smoke_test.yaml @@ -1,60 +1,34 @@ name: smoke_test on: - schedule: - - cron: "0 * * * *" - workflow_dispatch: - inputs: - tags: - required: false - description: "manual test" push: + paths: + - 'funcx_endpoint/tests/smoke_tests/**' branches: - "*" tags: - "*" + pull_request: + paths: + - 'funcx_endpoint/tests/smoke_tests/**' jobs: test: - strategy: - matrix: - python-version: [3.7] runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + - uses: actions/setup-python@v1 with: - ref: main - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v1 - with: - python-version: ${{ matrix.python-version }} - + python-version: 3.7 - name: Install dependencies for funcx-sdk and test requirements run: | python -m pip install --upgrade pip setuptools wheel + python -m pip install ./funcx_sdk python -m pip install -r funcx_sdk/test-requirements.txt - pip list - - name: Check for vulnerabilities in libraries - run: | - pip install safety - pip freeze | safety check - name: Test sdk by just importing run: | - python -m pip install funcx python -c "from funcx.sdk.client import FuncXClient" - name: Run smoke tests to check liveness of hosted services run: | pytest -v funcx_endpoint/tests/smoke_tests --api-client-id ${{ secrets.API_CLIENT_ID }} --api-client-secret ${{ secrets.API_CLIENT_SECRET }} - - name: Send mail - if: ${{ failure() }} - uses: dawidd6/action-send-mail@v3 - with: - server_address: smtp.gmail.com - server_port: 465 - username: ${{secrets.MAIL_USERNAME}} - password: ${{secrets.MAIL_PASSWORD}} - subject: ${{ github.repository }} - Tutorial test ${{ job.status }} - to: ryan.chard@gmail.com,rchard@anl.gov,chard@uchicago.edu,yadudoc1729@gmail.com,josh@globus.org,bengal1@illinois.edu,benc@hawaga.org.uk,sirosen@globus.org,uriel@globus.org - from: funcX Tests # - body: The ${{ github.repository }} test ${{ github.workflow }} exited with status - ${{ job.status }}! \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index ada5e4193..b1b7f29e0 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -40,7 +40,7 @@ results. funcX securely communicates with remote endpoints, waits for resources to become available, and can even retry execution upon failure. funcX stores results (or errors) in the cloud-hosted service until they are retrieved by the user. -.. image:: img/funcx-model.png +.. image:: img/funcX-model.png Using funcX diff --git a/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py b/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py index 5643625cb..e222445e0 100644 --- a/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py +++ b/funcx_endpoint/funcx_endpoint/endpoint/results_ack.py @@ -3,7 +3,9 @@ import os import pickle -logger = logging.getLogger(__name__) +# The logger path needs to start with endpoint. while the current path +# start with funcx_endpoint.endpoint. +logger = logging.getLogger("endpoint.results_ack") class ResultsAckHandler(): @@ -81,6 +83,9 @@ def persist(self): def load(self): """ Load unacked results from disk """ - if os.path.exists(self.data_path): - with open(self.data_path, 'rb') as fp: - self.unacked_results = pickle.load(fp) + try: + if os.path.exists(self.data_path): + with open(self.data_path, 'rb') as fp: + self.unacked_results = pickle.load(fp) + except pickle.UnpicklingError: + logger.warning(f"Cached results {self.data_path} appear to be corrupt. Proceeding without loading cached results") diff --git a/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py b/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py index 47517f501..f4baad479 100644 --- a/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py +++ b/funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py @@ -190,6 +190,10 @@ class HighThroughputExecutor(StatusHandlingExecutor, RepresentationMixin): and SlurmProvider interfaces to request resources from the Slurm batch scheduler. Default: LocalProvider + funcx_service_address: str + Override funcx_service_address used by the FuncXClient. If no address is specified, + the FuncXClient's default funcx_service_address is used. + Default: None """ def __init__(self, diff --git a/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py b/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py index d4a8e9ad6..5083a926a 100644 --- a/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py +++ b/funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py @@ -186,6 +186,11 @@ def __init__(self, suppress_failure : Bool When set to True, the interchange will attempt to suppress failures. Default: False + + funcx_service_address: str + Override funcx_service_address used by the FuncXClient. If no address is specified, + the FuncXClient's default funcx_service_address is used. + Default: None """ self.logdir = logdir diff --git a/funcx_endpoint/tests/smoke_tests/test_async.py b/funcx_endpoint/tests/smoke_tests/test_async.py index 2182b920d..4a9eb5afe 100644 --- a/funcx_endpoint/tests/smoke_tests/test_async.py +++ b/funcx_endpoint/tests/smoke_tests/test_async.py @@ -10,7 +10,7 @@ async def simple_task(fxc, endpoint): squared_function = fxc.register_function(squared) x = random.randint(0, 100) task = fxc.run(x, endpoint_id=endpoint, function_id=squared_function) - result = await asyncio.wait_for(task, 20) + result = await asyncio.wait_for(task, 60) assert result == squared(x), "Got wrong answer" diff --git a/funcx_endpoint/tests/smoke_tests/test_executor.py b/funcx_endpoint/tests/smoke_tests/test_executor.py index 54997c3e5..9795ebe2b 100644 --- a/funcx_endpoint/tests/smoke_tests/test_executor.py +++ b/funcx_endpoint/tests/smoke_tests/test_executor.py @@ -11,4 +11,4 @@ def test_executor_basic(fx, endpoint): x = random.randint(0, 100) fut = fx.submit(double, x, endpoint_id=endpoint) - assert fut.result(timeout=10) == x * 2, "Got wrong answer" + assert fut.result(timeout=60) == x * 2, "Got wrong answer" diff --git a/funcx_endpoint/tests/smoke_tests/test_running_functions.py b/funcx_endpoint/tests/smoke_tests/test_running_functions.py index 4714d6c97..6e78bd3f8 100644 --- a/funcx_endpoint/tests/smoke_tests/test_running_functions.py +++ b/funcx_endpoint/tests/smoke_tests/test_running_functions.py @@ -5,7 +5,7 @@ def test_run_pre_registered_function(fxc, endpoint, tutorial_funcion_id): """This test confirms that we are connected to the default production DB""" fn_id = fxc.run(endpoint_id=endpoint, function_id=tutorial_funcion_id) - time.sleep(10) + time.sleep(30) result = fxc.get_result(fn_id) assert result == "Hello World!", f"Expected result: Hello World!, got {result}"