-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow jobs to time out #1005
Comments
Currently working on it in this branch on my fork. |
If I am to add timeout support, it needs to be implemented across all the built-in executors, or at least where a job can be reasonably terminated by the executor. |
I did it using anyio.fail_after in each executor. If you are interested, here are my implementations (not tested currently): class QtJobExecutor(JobExecutor):
# ...
async def run_job(self, func: Callable[..., T_Retval], job: Job) -> Any:
timeout_seconds = (
job.timeout.total_seconds() if job.timeout is not None else None
)
future: Future[T_Retval] = Future()
event = anyio.Event()
self._signals.run_job.emit((func, job, future, event))
try:
with anyio.fail_after(timeout_seconds):
await event.wait()
except TimeoutError:
raise JobTimedOutError from None
return future.result(0)
class AsyncJobExecutor(JobExecutor):
# ...
async def run_job(self, func: Callable[..., Any], job: Job) -> Any:
# Convert timeout to seconds if it's a timedelta
timeout_seconds = (
job.timeout.total_seconds()
if isinstance(job.timeout, timedelta)
else job.timeout
)
async def wrapper():
retval = func(*job.args, **job.kwargs)
if isawaitable(retval):
retval = await retval
return retval
try:
with anyio.fail_after(timeout_seconds):
return await wrapper()
except TimeoutError:
raise JobTimedOutError from None
class ProcessPoolJobExecutor(JobExecutor):
# ...
async def run_job(self, func: Callable[..., Any], job: Job) -> Any:
timeout_seconds = (
job.timeout.total_seconds() if job.timeout is not None else None
)
wrapped = partial(func, *job.args, **job.kwargs)
try:
with fail_after(timeout_seconds):
return await to_process.run_sync(
wrapped, cancellable=True, limiter=self._limiter
)
except TimeoutError:
raise JobTimedOutError from None
class ThreadPoolJobExecutor(JobExecutor):
# ...
async def run_job(self, func: Callable[..., Any], job: Job) -> Any:
timeout_seconds = (
job.timeout.total_seconds() if job.timeout is not None else None
)
wrapped = partial(func, *job.args, **job.kwargs)
try:
with fail_after(timeout_seconds):
return await to_thread.run_sync(wrapped, limiter=self._limiter)
except TimeoutError:
raise JobTimedOutError from None As I was eager to contribute to this project, I looked at the Roadmap and found this open item, so I thought I might as well start working on it. If you now say you rather want to get 4.0 out as soon as possible and not add a lot more features, I would be completely fine with that. I'd just love to hear then what are the features I should work on. As I said above, these implementations not tested at all, but I am pretty sure every one of them except the QtScheduler will work, as the other code also is just using anyio. The QtScheduler, I just don't have any clue. I just did the same as in the other examples. I should mention that the tasks here are not cancelled or anything. That needs to be added in the future. |
Currently in my branch, the existing test are passing. Before adding any new tests, the timeout attribute needs to be added to the appropriate APIs. |
I was planning to defer job timeouts to v4.1. That doesn't decrease the value of your contribution, it just changes the timeline of its merging. |
I just realized how angry that sounded, sorry… |
I will be able to focus more on APScheduler after I get the next major release of the Asphalt framework out, which should happen within the week. I don't think any major features are missing from the v4.0 beta, it's just a matter of fixing critical bugs. To that end, I may need help with testing and accurate bug reporting. |
Things to check first
Feature description
This is mentioned in the Roadmap (#465). I am just opening this issue as a place to discuss this feature and how to realize it as I am currently trying to program something like this.
In the Roadmap, the point I am referring to is 'Timeouts for jobs'. I will precise now how I would try to realize this. If you have any other wishes/ideas/suggestions on how this feature should be implemented: That's exactly the reason I created this issue (-:
I would add a timeout parameter to the Job structure and would handle the timeout in the different job executors. For example, here is an example of how this could work with the
AsyncJobExecutor
:The
JobOutcome
should receive a new state calledtimeout
which should be set if theJobTimedOutError
(or we could also use theTimeoutError
directly which anyio uses) is raised. The scheduler needs to catch that and handle it appropriately.Use case
As this is in the roadmap, I think the use case is clear.
The text was updated successfully, but these errors were encountered: