-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for cancelling tasks in HighThroughputExecutor #606
Conversation
…ancel returns a redundant result
* Replace worker when task is cancelled via terminating worker * Remove task from internal tracking * Minor refactoring
This pull request has been linked to Shortcut Story #9621: Executor support for cancel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a once-through read through of this. I haven't thought at all about how race conditions affect what's going on, and I would like to spend some more time on that because I think there is interesting stuff there.
funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py
Outdated
Show resolved
Hide resolved
funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py
Outdated
Show resolved
Hide resolved
funcx_endpoint/funcx_endpoint/executors/high_throughput/funcx_manager.py
Outdated
Show resolved
Hide resolved
funcx_endpoint/funcx_endpoint/executors/high_throughput/funcx_manager.py
Outdated
Show resolved
Hide resolved
funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py
Outdated
Show resolved
Hide resolved
funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py
Outdated
Show resolved
Hide resolved
* Fixed bad doc string from TaskCancelled exception * Updated signal handler in funcx_worker to exit with log message on receiving SIGTERM * Updated FuncXFuture to use cancel from parent class.
A task_id is put on the task_cancel_pending_trap when it not found on a manager. It is possible that the task is already complete, in-flight, or waiting dispatch in the interchange's queues.
Codecov Report
@@ Coverage Diff @@
## main #606 +/- ##
==========================================
- Coverage 42.22% 41.52% -0.70%
==========================================
Files 33 33
Lines 3313 3453 +140
==========================================
+ Hits 1399 1434 +35
- Misses 1914 2019 +105
Continue to review full report at Codecov.
|
funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py
Outdated
Show resolved
Hide resolved
funcx_endpoint/funcx_endpoint/executors/high_throughput/funcx_worker.py
Outdated
Show resolved
Hide resolved
funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py
Outdated
Show resolved
Hide resolved
funcx_endpoint/funcx_endpoint/executors/high_throughput/interchange.py
Outdated
Show resolved
Hide resolved
|
||
if found_task is False: | ||
logger.debug(f"Task:{task_id} not found on managers, moving task_cancel message onto trap") | ||
self.task_cancel_pending_trap[task_id] = task_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this task is not on a manager, why does the handling of pending_trap include sending a message to a manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question, this sent me on a wild goose chase to figure out to piece together logic from a month ago. The interchange uses the _ready_manager_queue
to track tasks in the logic for scheduling tasks, dispatching, and handling results. It's far easier to have the task make its way to the funcx_manager and then get canceled there with an exception in the result package that works it's way through the result path. This makes sure that the task gets wiped from the interchange cleanly. If you just capture the task and discard it, the scheduling gets messed up and no further tasks get scheduled as the managers appear to be busy with tasks.
* Both task cancel and task completion can call remove_task on the manager, this is now protected by a lock. * Remove_task now updates self.task_done_counter * Removed unnecessary logging
@benclifford I believe the issues that you noticed are all fixed now. Could you give this a try again, please? |
this has passed 46 iterations of |
right now I'm trying to understand what effect switching |
The suppress_failure check that remains after PR #637 was removed in the Parsl fork of HighThroughputExecutor by Parsl/parsl#1671 which removes the I think it would make sense to remove the suppress_failure option entirely. |
funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py
Outdated
Show resolved
Hide resolved
funcx_endpoint/funcx_endpoint/executors/high_throughput/executor.py
Outdated
Show resolved
Hide resolved
""" | ||
|
||
ret_value = future._cancel() | ||
logger.debug("Sending cancel of task_id:{future.task_id} to interchange") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this ^ log isn't true if ret_value is false
|
||
for task in tasks: | ||
task_id = task["task_id"] | ||
logger.debug(f"[MAIN] Task {task_id} is now WAITING_FOR_LAUNCH") | ||
self.task_status_deltas[task_id] = TaskStatusCode.WAITING_FOR_LAUNCH | ||
if self.task_cancel_pending_trap and task_id in self.task_cancel_pending_trap: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this if is the same as just if task_id in self.task_cancel_pending_trap
right? the first clause isn't catching anything else?
@@ -658,7 +676,8 @@ def submit(self, func, *args, container_id: str = 'RAW', task_id: str = None, ** | |||
ser_code + ser_params) | |||
|
|||
self.submit_raw(payload.pack()) | |||
self.tasks[task_id] = Future() | |||
self.tasks[task_id] = HTEXFuture(self) | |||
self.tasks[task_id].task_id = task_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a very slight race condition here I think (but I've seen this sort of thing trigger in parsl in the DFK when tasks complete fast). I think the task can complete any time including-and-after line 678, before self.tasks[task_id] is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(this is also true before this PR #606)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crossref Parsl/parsl#2033
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok this is confusing me. This codepath isn't reached when using the funcx endpoint as far as I can see. Is def submit()
just here for if you're using the htex component on your local machine without the rest of funcx?
self.worker_procs.update(worker_proc) | ||
self.task_worker_map.pop(task_id) | ||
self.remove_task(task_id) | ||
self.task_cancel_lock.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm much more comfortable with locks being used in a with
block rather than explicitly released on only some of the code paths - eg when any of the above lines raise an exception, the lock will not be released. At the moment, I think such an exception would kill off the main Manager thread anyway, so this is a bit irrelevant right now - but if the manager thread ever gets made resilient to exceptions happening inside itself, this is probably a deadlock that will happen.
except Exception as e: | ||
logger.exception(f"Raise exception, handling: {e}") | ||
result_package = {'task_id': task_id, | ||
'container_id': worker_type, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm very confused in reading funcx code in general about what is the difference between container_id and a worker_type. are they the same thing? (what docker calls an image id?)
@@ -417,17 +486,18 @@ def poll_funcx_task_socket(self, test=False): | |||
self.worker_map.register_worker(w_id, reg_info['worker_type']) | |||
|
|||
elif m_type == b'TASK_RET': | |||
# We lock because the following steps are also shared by task_cancel | |||
self.task_cancel_lock.acquire() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is more a lock to do with "result processing" than with cancellation, then?
try: | ||
self.remove_task(task_id) | ||
except KeyError: | ||
logger.exception(f"Task:{task_id} missing in task structure") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are the circumstances that this would be missing in the task structure? It shouldn't have been removed by cancellation code because that is locked, right?
|
||
Please note that a return value of True does not guarantee that your function will not | ||
execute at all, but it does guarantee that the future will be in a cancelled state. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe useful to note (here or elsewhere?) there that best_effort_cancel
does not wait until a task has been cancelled before putting the future in a cancelled state and returning: that is, a user can observe that the future says it is cancelled on the submit side, while the corresponding user code is actually still running.
ok, I feel like I'm getting bogged down in the general chaos of the htex code rather than the specific cancel change this PR Introduces. I'm marking this approved with the proviso that at the very least, I haven't paid any attention to behaviour when this cancel code moves towards being exposed via normal funcx interfaces. |
* Fix linting on funcx_sdk/funcx/tests/ * Simplify instructions for installing endpoint secrets to cluster * Fix CI and the way dependencies are specified test and dev are extras for installing tools, no multiple requirements files scattered across the repo. CI workflows should not be installing multiple distinct applications into a single venv and then trying to test. This muddies the waters and makes the purpose and requirements of each CI step unclear. Therefore, in addition to putting the requirement data where it really belongs, this fixes the workflows to specify a better isolated test configuration. Workflow jobs are used more thoughtfully to produce isolated VMs for distinct build and testing processes. New daily tests run Monday-through-Friday in the AM for US timezones, doing `safety check` and little else. Safety checking on dependencies is done separately for funcx-sdk and funcx-endpoint. The CI workflow for testing PRs and branches also does `safety check`, but importantly the hourly test does not. Notifications from the daily and hourly workflows are done in a separate dedicated job. This means that new jobs can be added upstream without needing distinct or rewritten notification logic. The absence of any pytest run for the funcx-sdk package is more obvious and identified properly as a problem. It is paired with the "import test" so that when enabled, the import test removal will be localized to the same area of the workflow file. Linting is a dedicated job which runs as part of the CI workflow and uses pre-commit on the full repo. No additional lint steps (e.g. one-off flake8 runs) are used or should be needed. Testing documentation is updated minimally to refer to the installation of extras, rather than requirements files. The doc site requirements for some (undocumented) reason are installing funcx-sdk test requirements. This has been removed. Any documentation requirements can continue to be specified in the doc directory, or under a new `[docs]` extra if necessary. It should not "inherit" the requirements of the testing process. The codecov integration does not deliver value in the absence of good unit testing and a test matrix of size greater than 1. Therefore, until such a time as codecov is useful and a worthwhile configuration can be identified, the integration has been removed. The funcx-endpoint setup.py file was autoformatted with `black` to simplify the formatting of the requirements data. No workflow steps are allowed to use `actions/checkout@master`. This is unnecessary and dangerous usage. `checkout@v2` is used -- as the GitHub Action's own documentation instructs users to do -- instead. * Port parsl BadRegistration error message clarification to funcx fork This clarification was introduced as part of Parsl/parsl#980 I have not examined the rest of that PR for relevance to this fork. * Remove unused BadRegistration exception class This looks like it was excessive copy-paste from Parsl source code which has had an unused BadRegistration class since parsl PR Parsl/parsl#1671 * Broaden htex interchange->executor error message phrasing Previously the error message claimed that the fatal error was due to version mismatch. This is not true: this same fatal error may occur in the case of a BadRegistration. This PR rephrases the error to be less misleading. I have encountered this repeatedly during review of #909, and at least I think Yadu was getting it while hacking on this code. * Expand daily build to all days * Remove internal RedisQueue object and dependents RedisQueue is now provided by funcx-common if needed. 1. Remove `funcx_endpoint.queues` 2. Remove `funcx_endpoint/tests/integration/test_redis.py` (this just tests the removed subpackage) 3. Remove `funcx_endpoint.mock_broker`, which relies on RedisQueue (3) is the most questionable of these three. However, `mock_broker` does not appear to be used anywhere. Furthermore, even if a mocked Forwarder object is needed for some reason, there doesn't appear to be any reason that it should be provided by the `funcx-endpoint` package. * Update smoke test config with version, dev env version=0.3.5 add a "dev" block to the config for ease of use. black and isort on the file * Make smoke tests check min version, not exact Check against the API response with a loose version comparison, not an equality check. This means that the smoke tests set a "floor" for what versions are allowed, but do not fail when a release is done. * Add a Polaris config example * Flake8 * Removing comment # edtb-02 * Update linting config and fix Remove flake8 ignores for various rules: only ignore the rules which must be ignored to be black-compatible. Remove exclusions from pre-commit config. * Fix subject line in hourly smoke test * Remove async testing until it can be done safely * Adding a minimal s3 smoke test * Adding funcx_endpoint to install list * Marking test for large args to skip * Adding fixme note to gh workflow, and minor fixes to tests. * Remove all use of async code from smoke tests Do not use the executor interface until it can be repaired. Use a simple polling loop to wait for results. Turn off throttling on the client so that we don't spuriously fail a job. * Begin login config rewrite Structure logging config more tightly, removing odd naming and highly dynamic, imperative configuration of loggers. The new structure is that there is a single method which can be used to reconfigure logging on an as-needed basis (e.g. to turn off logging to stdio). Logging is configured at each entrypoint, and in one special case when a daemon is being launched. All loggers are initialized in module scope as log = logging.getLogger(__name__) Any logger creation not matching this exact pattern is considered incorrect and is fixed to match. * Update logging in Interchange - Use setup_logging() to replace global logger object configured via set_file_logger() - Cleanup CLI argument passing, as it relates to logging arguments - Remove logging attributes from the Interchange object This should, for the most part, not change the semantics of any code. However, it is worth noting that it reduces the maxbytes on the logfile from 256MB to 100MB. This could be adjusted back up by allowing `setup_logging()` to take a parameter for maxbytes, but for now this is not being done. * Fix logging configuration in worker Also cleanup use of loggers in various modules where impact is low. * Fix various issues in logging refactor log_max_bytes is no longer supported and needs to be removed. logging_level similarly needs to be removed. console logging needs to be disabled within the daemon. * Remove logging helpers; fix "asyncio" logger These were originally going to be left untouched, but the fact that they were configuring logging for "asyncio" makes it hard to see a way to salvage them. The logger for "asyncio" _does not_ belong to funcx. We absolutely should not be using that logger internally for funcx-related activities. Fix the references to the "asyncio" logger to use `__name__` as is normal, and remove the imperative logging configuration helpers. * Install funcx_endpoint for smoke tests * Patch default logfile in tests to handle CI In GitHub actions, the logfile handler fails to configure. The exact cause is unclear, but it may be that there's an issue with access to the default logfile or the default log directory. Put the default log path into a tmpdir to fix this. * Raise result size limit to 10MB And fix `sys.getsizeof` on a bytestring to use `len` instead. * WSPollingTask now has a close method and handle_incoming return False on remote disconnect. The executor closes and reconnects WS on remote disconnect. * Adding a long duration test. * Relax version match constraints (#637) * Relax version requirements * Adding test scripts that launch endpoint and workers of mismatched python version for tests. * Removing the `relax_manager_version_match` option. The interchange will note an info line if there's some version mismatch and continue. This can later be problematic if the funcx versions do not match. * Updating configs and README with note to update paths to user conda. * Module import ordering fixes for isort * Black auto-fixes * Be explicit about how to create conda environments * Make update_all fail when a component fails * Adding a test that raises an exception on the worker side * use unixy exit codes * abort tests on failure rather than continuing fairly quietly - this makes a test failure more obvious * Add another test to kill worker * Minor format fix * Remove unnecessary f-string * logger -> log Co-authored-by: Ben Clifford <[email protected]> * Fix broken link to parsl docs * (Re)Apply all autofixers black, isort, and pyupgrade are all (re)converged on the files modified under the task-cancel branch. * Apply changes from 'main' to task-cancel This is the result of - `git checkout --patch main` - manual edits to make each file agree with flake8 and other linting Co-authored-by: Ben Galewsky <[email protected]> Co-authored-by: Ben Clifford <[email protected]> Co-authored-by: Yadu Nand Babuji <[email protected]> Co-authored-by: Ryan Chard <[email protected]> Co-authored-by: Wes Brewer <[email protected]> Co-authored-by: Daniel S. Katz <[email protected]>
There's a bunch of comments from Ben that still ought to be addressed. I'll do that in a separate PR. |
Description
This PR extends the behavior of
concurrent.Futures
to support cancellation even after the task is running. Previously,HighThroughputExecutor
returned futures that did not support thefuture.cancel()
method at all. Please note that cancel is a best-effort mechanism, and while a canceled future will never return the result, it does not imply that the underlying task will never run or be interrupted.Changes include:
cancel()
executor->interchange->funcx_manager->funcx_worker
Type of change
Choose which options apply, and delete the ones which do not apply.