diff --git a/distributed/worker.py b/distributed/worker.py index d29c46e296..6b3319545b 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3124,54 +3124,6 @@ def apply_function_simple( return msg -async def apply_function_async( - function, - args, - kwargs, - time_delay, -): - """Run a function, collect information - - Returns - ------- - msg: dictionary with status, result/error, timings, etc.. - """ - ident = threading.get_ident() - try: - with context_meter.meter("thread-noncpu", func=time) as m: - result = await function(*args, **kwargs) - except (SystemExit, KeyboardInterrupt): - # Special-case these, just like asyncio does all over the place. They will pass - # through `fail_hard` and `_handle_stimulus_from_task`, and eventually be caught - # by special-case logic in asyncio: - # https://github.com/python/cpython/blob/v3.9.4/Lib/asyncio/events.py#L81-L82 - # Any other `BaseException` types would ultimately be ignored by asyncio if - # raised here, after messing up the worker state machine along their way. - raise - except BaseException as e: - # NOTE: this includes `CancelledError`! Since it's a user task, that's _not_ a - # reason to shut down the worker. - # Users _shouldn't_ use `BaseException`s, but if they do, we can assume they - # aren't a reason to shut down the whole system (since we allow the - # system-shutting-down `SystemExit` and `KeyboardInterrupt` to pass through) - msg = error_message(e) - msg["op"] = "task-erred" - msg["actual-exception"] = e - else: - msg = { - "op": "task-finished", - "status": "OK", - "result": result, - "nbytes": sizeof(result), - "type": type(result) if result is not None else None, - } - - msg["start"] = m.start + time_delay - msg["stop"] = m.stop + time_delay - msg["thread"] = ident - return msg - - def apply_function_actor( function, args, kwargs, execution_state, key, active_threads, active_threads_lock ):