Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

async_start example hangs/crashes on sigint #333

Open
mattzque opened this issue Mar 24, 2025 · 0 comments
Open

async_start example hangs/crashes on sigint #333

mattzque opened this issue Mar 24, 2025 · 0 comments

Comments

@mattzque
Copy link

I only slightly modified the example code (see Run your first worker):

import asyncio
from hatchet_sdk import Context, Hatchet, workflow, step

@workflow(on_events=["user:create"])
class Workflow:
    @step()
    async def step1(self, context: Context):
        pass

async def main():
    hatchet = Hatchet()
    worker = hatchet.worker("first-worker", max_runs=4)
    worker.register_workflow(Workflow())
    await worker.async_start()

if __name__ == "__main__":
    asyncio.run(main())

If this worker is sent a SIGINT signal (C-c) it won't exit, it just hangs, a second SIGINT crashes inside the signal handler and leaves behind orphaned mp subprocesses:

[INFO]  🪓 -- 2025-03-24 20:51:15,842 - ------------------------------------------
[INFO]  🪓 -- 2025-03-24 20:51:15,842 - STARTING HATCHET...
[INFO]  🪓 -- 2025-03-24 20:51:15,844 - starting runner...
^C[INFO]        🪓 -- 2025-03-24 20:51:18,556 - received signal SIGINT...
[INFO]  🪓 -- 2025-03-24 20:51:18,848 - gracefully exiting runner...
^C[INFO]        🪓 -- 2025-03-24 20:51:26,030 - received signal SIGINT...
Exception ignored in atexit callback: <function _exit_function at 0x774defeaad40>
Traceback (most recent call last):
  File "/.../multiprocessing/util.py", line 360, in _exit_function
    p.join()
  File "/.../multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../multiprocessing/popen_fork.py", line 43, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../hatchet_sdk/worker/worker.py", line 322, in _handle_exit_signal
    self.loop.create_task(self.exit_gracefully())
  File "/.../asyncio/base_events.py", line 455, in create_task
    self._check_closed()
  File "/.../asyncio/base_events.py", line 540, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
sys:1: RuntimeWarning: coroutine 'Worker.exit_gracefully' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

This seems to happen because async_start will return before the exit_gracefully task is completed so the loop is closed and the subprocesses are never stopped. I found it works as expected if I wait for pending tasks manually:

import asyncio
from hatchet_sdk import Context, Hatchet, workflow, step

@workflow(on_events=["user:create"])
class Workflow:
    @step()
    async def step1(self, context: Context):
        pass

async def main():
    hatchet = Hatchet()
    worker = hatchet.worker("first-worker", max_runs=4)
    worker.register_workflow(Workflow())
    await worker.async_start()

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        pending = asyncio.all_tasks(loop)
        loop.run_until_complete(asyncio.gather(*pending))
        loop.close()

Perhaps it could be improved or documented better.

As an aside, I noticed that in the synchronous variant (start) will never return which is a bit awkward because it makes it complicated to create and cleanup shared resources of the worker, perhaps you could offer something similar to celery signals or FastAPI's lifespan events.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant