-
Notifications
You must be signed in to change notification settings - Fork 203
Hitting RuntimeWarning: coroutine 'Queue.get' was never awaited #646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
We are encountering this issue too |
Been looking into it, having a hard time reproducing but I do see it in CI runs. |
I believe it's most likely to appear with many concurrent subscriptions |
This issue manifests when an I am able to reliably reproduce the issue using the following new test function: @async_test
async def test_fetch_being_cancelled_early(self):
nc = NATS()
await nc.connect()
js = nc.jetstream()
await js.add_stream(name="test", subjects=["test.a"])
sub = await js.pull_subscribe("test.a", "test", stream="test")
task = asyncio.create_task(sub.fetch())
# Allow "task" to reach the point where it has just finished calling
# asyncio.create_task() inside nats.client.Subscription.next_msg().
await asyncio.sleep(0)
# Cancel the call to sub.fetch() to provoke issue #646.
task.cancel()
with self.assertRaises(asyncio.CancelledError):
await task
await nc.close() I have looked at two options for fixing the issue: Option 1: --- a/nats/aio/subscription.py
+++ b/nats/aio/subscription.py
@@ -181,9 +181,10 @@ class Subscription:
)
task_name = str(uuid4())
+ pending_get = self._pending_queue.get()
try:
future = asyncio.create_task(
- asyncio.wait_for(self._pending_queue.get(), timeout)
+ asyncio.wait_for(pending_get, timeout)
)
self._pending_next_msgs_calls[task_name] = future
msg = await future
@@ -203,6 +204,7 @@ class Subscription:
self._pending_queue.task_done()
return msg
finally:
+ pending_get.close()
self._pending_next_msgs_calls.pop(task_name, None)
def _start(self, error_cb):
Option 2: --- a/nats/aio/subscription.py
+++ b/nats/aio/subscription.py
@@ -172,6 +172,9 @@ class Subscription:
msg = await sub.next_msg(timeout=1)
"""
+ async def timed_get() -> Msg:
+ return await asyncio.wait_for(self._pending_queue.get(), timeout)
+
if self._conn.is_closed:
raise errors.ConnectionClosedError
@@ -182,9 +185,7 @@ class Subscription:
task_name = str(uuid4())
try:
- future = asyncio.create_task(
- asyncio.wait_for(self._pending_queue.get(), timeout)
- )
+ future = asyncio.create_task(timed_get())
self._pending_next_msgs_calls[task_name] = future
msg = await future
except asyncio.TimeoutError:
|
Thanks @knutroy for looking into this. Have you tried the suggestion in @tanmayv25's OP? It seems to make the most sense to me, because it ensures the future is awaited, even if cancelled. Calling .close(), as in Option 1, will skip running the coroutine. Not sure which is best. Note: I consulted GPT to understand the distinction between the different approaches, and it believes that the approach in OP is correct, and just requires suppressing an expected CancelledError: try: ...
finally:
future.cancel()
with contextlib.suppress(asyncio.CancelledError):
await future
self._pending_next_msgs_calls.pop(task_name, None) But of course, that's just GPT. I'll try run my own tests in due time |
@tanmayv25 patch: --- a/nats/aio/subscription.py
+++ b/nats/aio/subscription.py
@@ -203,6 +203,8 @@ class Subscription:
self._pending_queue.task_done()
return msg
finally:
+ future.cancel()
+ await future
self._pending_next_msgs_calls.pop(task_name, None)
def _start(self, error_cb): has no effect on the runtime warning when I test it using My understanding is that the core of the issue is that one task is calling Option 1 in my previous post ensures the coroutine object gets closed by the original task in case the other task that was supposed to await it never got around to doing so due to cancellation. If already closed (the common case), the extra call to close has no effect. Option 2 in my previous post introduces a new coroutine function causing the call to |
…ing. This could happen when the coroutine object returned from asyncio.Queue.get() was handed over to a newly created task which was then immediately cancelled so that the coroutine object was never awaited. Instead, let the newly created task itself make the call to asyncio.Queue.get() before immediately awaiting it. Resolves nats-io#646. Signed-off-by: Knut Aksel Røysland <knutroy@ifi.uio.no>
Observed behavior
In our application we launch multiple asyncio tasks each calling into fetch to a specific subject.
For example, take two subscriptions. We launch two tasks calling fetch on two different subjects.
whichever fetch returns first, we acknowledge those requests and cancel the fetch from the other subject.
When we run this snippet, occasionally we run into the following runtime warning:
I was able to get to a partial fix by propagating cancellation to the nested task in
subscription.py
.I would like to make sure that we are performing a clean cancellations of the fetch calls. These warnings seems like code smells.
Expected behavior
No runtime warnings when triggering a manual cancellation. The signal is properly propagated to the nested cancellation.
Server and client version
Nats Server
Nats Client
Latest version on PyPI.
Host environment
No response
Steps to reproduce
No response
The text was updated successfully, but these errors were encountered: