Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow jobs to time out #1005

Open
1 task done
HomerusJa opened this issue Dec 21, 2024 · 7 comments
Open
1 task done

Allow jobs to time out #1005

HomerusJa opened this issue Dec 21, 2024 · 7 comments

Comments

@HomerusJa
Copy link

HomerusJa commented Dec 21, 2024

Things to check first

  • I have searched the existing issues and didn't find my feature already requested there

Feature description

This is mentioned in the Roadmap (#465). I am just opening this issue as a place to discuss this feature and how to realize it as I am currently trying to program something like this.

In the Roadmap, the point I am referring to is 'Timeouts for jobs'. I will precise now how I would try to realize this. If you have any other wishes/ideas/suggestions on how this feature should be implemented: That's exactly the reason I created this issue (-:

I would add a timeout parameter to the Job structure and would handle the timeout in the different job executors. For example, here is an example of how this could work with the AsyncJobExecutor:

class AsyncJobExecutor(JobExecutor):
    """
    Executes functions directly on the event loop thread.

    If the function returns a coroutine object (or another kind of awaitable), that is
    awaited on and its return value is used as the job's return value.
    """

    async def run_job(self, func: Callable[..., Any], job: Job) -> Any:
        # Convert timeout to seconds if it's a timedelta
        timeout_seconds = job.timeout.total_seconds() if isinstance(job.timeout, timedelta) else job.timeout

        async def wrapper():
            retval = func(*job.args, **job.kwargs)
            if isawaitable(retval):
                retval = await retval
            return retval

        try:
            async with anyio.fail_after(timeout_seconds):
                return await wrapper()
        except TimeoutError:
            raise JobTimedOutError from None

The JobOutcome should receive a new state called timeout which should be set if the JobTimedOutError (or we could also use the TimeoutError directly which anyio uses) is raised. The scheduler needs to catch that and handle it appropriately.

Use case

As this is in the roadmap, I think the use case is clear.

@HomerusJa
Copy link
Author

Currently working on it in this branch on my fork.

@agronholm
Copy link
Owner

If I am to add timeout support, it needs to be implemented across all the built-in executors, or at least where a job can be reasonably terminated by the executor.

@HomerusJa
Copy link
Author

I did it using anyio.fail_after in each executor. If you are interested, here are my implementations (not tested currently):

class QtJobExecutor(JobExecutor):
    # ...
    async def run_job(self, func: Callable[..., T_Retval], job: Job) -> Any:
        timeout_seconds = (
            job.timeout.total_seconds() if job.timeout is not None else None
        )

        future: Future[T_Retval] = Future()
        event = anyio.Event()
        self._signals.run_job.emit((func, job, future, event))
        try:
            with anyio.fail_after(timeout_seconds):
                await event.wait()
        except TimeoutError:
            raise JobTimedOutError from None
        return future.result(0)

class AsyncJobExecutor(JobExecutor):
    # ...
    async def run_job(self, func: Callable[..., Any], job: Job) -> Any:
        # Convert timeout to seconds if it's a timedelta
        timeout_seconds = (
            job.timeout.total_seconds()
            if isinstance(job.timeout, timedelta)
            else job.timeout
        )

        async def wrapper():
            retval = func(*job.args, **job.kwargs)
            if isawaitable(retval):
                retval = await retval
            return retval

        try:
            with anyio.fail_after(timeout_seconds):
                return await wrapper()
        except TimeoutError:
            raise JobTimedOutError from None

class ProcessPoolJobExecutor(JobExecutor):
    # ...
    async def run_job(self, func: Callable[..., Any], job: Job) -> Any:
        timeout_seconds = (
            job.timeout.total_seconds() if job.timeout is not None else None
        )

        wrapped = partial(func, *job.args, **job.kwargs)
        try:
            with fail_after(timeout_seconds):
                return await to_process.run_sync(
                    wrapped, cancellable=True, limiter=self._limiter
                )
        except TimeoutError:
            raise JobTimedOutError from None

class ThreadPoolJobExecutor(JobExecutor):
    # ...
    async def run_job(self, func: Callable[..., Any], job: Job) -> Any:
        timeout_seconds = (
            job.timeout.total_seconds() if job.timeout is not None else None
        )
        wrapped = partial(func, *job.args, **job.kwargs)

        try:
            with fail_after(timeout_seconds):
                return await to_thread.run_sync(wrapped, limiter=self._limiter)
        except TimeoutError:
            raise JobTimedOutError from None

As I was eager to contribute to this project, I looked at the Roadmap and found this open item, so I thought I might as well start working on it. If you now say you rather want to get 4.0 out as soon as possible and not add a lot more features, I would be completely fine with that. I'd just love to hear then what are the features I should work on.

As I said above, these implementations not tested at all, but I am pretty sure every one of them except the QtScheduler will work, as the other code also is just using anyio. The QtScheduler, I just don't have any clue. I just did the same as in the other examples.

I should mention that the tasks here are not cancelled or anything. That needs to be added in the future.

@HomerusJa
Copy link
Author

Currently in my branch, the existing test are passing. Before adding any new tests, the timeout attribute needs to be added to the appropriate APIs.

@agronholm
Copy link
Owner

As I was eager to contribute to this project, I looked at the Roadmap and found this open item, so I thought I might as well start working on it. If you now say you rather want to get 4.0 out as soon as possible and not add a lot more features, I would be completely fine with that. I'd just love to hear then what are the features I should work on.

I was planning to defer job timeouts to v4.1. That doesn't decrease the value of your contribution, it just changes the timeline of its merging.

@HomerusJa
Copy link
Author

HomerusJa commented Dec 23, 2024

I just realized how angry that sounded, sorry…
So what are the features I should work on if I want to help the most at getting 4.0 out of the door?

@agronholm
Copy link
Owner

I will be able to focus more on APScheduler after I get the next major release of the Asphalt framework out, which should happen within the week. I don't think any major features are missing from the v4.0 beta, it's just a matter of fixing critical bugs. To that end, I may need help with testing and accurate bug reporting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants