Skip to content

Hitting RuntimeWarning: coroutine 'Queue.get' was never awaited #646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
tanmayv25 opened this issue Jan 2, 2025 · 6 comments · May be fixed by #687
Open

Hitting RuntimeWarning: coroutine 'Queue.get' was never awaited #646

tanmayv25 opened this issue Jan 2, 2025 · 6 comments · May be fixed by #687
Assignees
Labels
accepted The defect or proposal as been accepted bug Confirmed reproducible bug defect Suspected defect such as a bug or regression

Comments

@tanmayv25
Copy link

Observed behavior

In our application we launch multiple asyncio tasks each calling into fetch to a specific subject.
For example, take two subscriptions. We launch two tasks calling fetch on two different subjects.
whichever fetch returns first, we acknowledge those requests and cancel the fetch from the other subject.

        tasks = [
            asyncio.create_task(
                subscription.fetch(batch=number_requests, timeout=timeout)
            )
            for subscription in [subscription_1 , subscription_2]
        ]

        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

        for task in pending:
            task.cancel()

When we run this snippet, occasionally we run into the following runtime warning:

/usr/lib/python3.10/asyncio/base_events.py:1910: RuntimeWarning: coroutine 'Queue.get' was never awaited
  handle = None  # Needed to break cycles when an exception occurs.
Object allocated at (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/nats/aio/subscription.py", lineno 186
	asyncio.wait_for(self._pending_queue.get(), timeout)

I was able to get to a partial fix by propagating cancellation to the nested task in subscription.py.

 diff -u20 /home/tmp/subscription_old.py /home/tmp/subscription.py
--- /home/tmp/subscription_old.py   	2024-12-30 23:18:55.901624643 +0000
+++ /home/tmp/subscription.py   2024-12-30 23:17:47.930214654 +0000
@@ -185,40 +185,42 @@
     	try:
         	future = asyncio.create_task(asyncio.wait_for(self._pending_queue.get(), timeout))
         	self._pending_next_msgs_calls[task_name] = future
         	msg = await future
     	except asyncio.TimeoutError:
         	if self._conn.is_closed:
             	raise errors.ConnectionClosedError
         	raise errors.TimeoutError
     	except asyncio.CancelledError:
         	if self._conn.is_closed:
             	raise errors.ConnectionClosedError
         	raise
     	else:
         	self._pending_size -= len(msg.data)
         	# For sync subscriptions we will consider a message
         	# to be done once it has been consumed by the client
         	# regardless of whether it has been processed.
         	self._pending_queue.task_done()
         	return msg
     	finally:
+        	future.cancel()
+        	await future
         	self._pending_next_msgs_calls.pop(task_name, None)

I would like to make sure that we are performing a clean cancellations of the fetch calls. These warnings seems like code smells.

Expected behavior

No runtime warnings when triggering a manual cancellation. The signal is properly propagated to the nested cancellation.

Server and client version

Nats Server

6] 2025/01/02 20:23:54.324754 [INF] Starting nats-server
[206] 2025/01/02 20:23:54.325072 [INF]   Version:  2.10.18
[206] 2025/01/02 20:23:54.325082 [INF]   Git:      [57d23ac]
[206] 2025/01/02 20:23:54.325088 [INF]   Name:     ND7V5NY3GONPHSP4DRERQWRBHIXWCLF6AOAYYOKGD7BTH2XUVGK73CNR
[206] 2025/01/02 20:23:54.325100 [INF]   Node:     uL9gFp6h
[206] 2025/01/02 20:23:54.325106 [INF]   ID:       ND7V5NY3GONPHSP4DRERQWRBHIXWCLF6AOAYYOKGD7BTH2XUVGK73CNR

Nats Client

Latest version on PyPI.

version 2.9.0 

Host environment

No response

Steps to reproduce

No response

@tanmayv25 tanmayv25 added the defect Suspected defect such as a bug or regression label Jan 2, 2025
@caspervonb caspervonb self-assigned this Feb 11, 2025
@caspervonb caspervonb added bug Confirmed reproducible bug accepted The defect or proposal as been accepted labels Feb 11, 2025
@ff137
Copy link
Contributor

ff137 commented Feb 13, 2025

We are encountering this issue too

@caspervonb
Copy link
Collaborator

Been looking into it, having a hard time reproducing but I do see it in CI runs.

@ff137
Copy link
Contributor

ff137 commented Feb 19, 2025

I believe it's most likely to appear with many concurrent subscriptions

@knutroy
Copy link

knutroy commented Apr 22, 2025

This issue manifests when an asyncio task running nats.client.Subscription.next_msg() is cancelled immediately after having created the asyncio.wait_for(self._pending_queue.get(), timeout) task, but before the latter task has had a chance to run.

I am able to reliably reproduce the issue using the following new test function:

    @async_test
    async def test_fetch_being_cancelled_early(self):
        nc = NATS()
        await nc.connect()

        js = nc.jetstream()
        await js.add_stream(name="test", subjects=["test.a"])
        sub = await js.pull_subscribe("test.a", "test", stream="test")

        task = asyncio.create_task(sub.fetch())

        # Allow "task" to reach the point where it has just finished calling
        # asyncio.create_task() inside nats.client.Subscription.next_msg().
        await asyncio.sleep(0)

        # Cancel the call to sub.fetch() to provoke issue #646.
        task.cancel()
        with self.assertRaises(asyncio.CancelledError):
            await task

        await nc.close()

I have looked at two options for fixing the issue:

Option 1:

--- a/nats/aio/subscription.py
+++ b/nats/aio/subscription.py
@@ -181,9 +181,10 @@ class Subscription:
             )
 
         task_name = str(uuid4())
+        pending_get = self._pending_queue.get()
         try:
             future = asyncio.create_task(
-                asyncio.wait_for(self._pending_queue.get(), timeout)
+                asyncio.wait_for(pending_get, timeout)
             )
             self._pending_next_msgs_calls[task_name] = future
             msg = await future
@@ -203,6 +204,7 @@ class Subscription:
             self._pending_queue.task_done()
             return msg
         finally:
+            pending_get.close()
             self._pending_next_msgs_calls.pop(task_name, None)
 
     def _start(self, error_cb):

Option 2:

--- a/nats/aio/subscription.py
+++ b/nats/aio/subscription.py
@@ -172,6 +172,9 @@ class Subscription:
             msg = await sub.next_msg(timeout=1)
 
         """
+        async def timed_get() -> Msg:
+            return await asyncio.wait_for(self._pending_queue.get(), timeout)
+
         if self._conn.is_closed:
             raise errors.ConnectionClosedError
 
@@ -182,9 +185,7 @@ class Subscription:
 
         task_name = str(uuid4())
         try:
-            future = asyncio.create_task(
-                asyncio.wait_for(self._pending_queue.get(), timeout)
-            )
+            future = asyncio.create_task(timed_get())
             self._pending_next_msgs_calls[task_name] = future
             msg = await future
         except asyncio.TimeoutError:

@ff137
Copy link
Contributor

ff137 commented Apr 22, 2025

Thanks @knutroy for looking into this.

Have you tried the suggestion in @tanmayv25's OP? It seems to make the most sense to me, because it ensures the future is awaited, even if cancelled.

Calling .close(), as in Option 1, will skip running the coroutine. Not sure which is best.


Note: I consulted GPT to understand the distinction between the different approaches, and it believes that the approach in OP is correct, and just requires suppressing an expected CancelledError:

     	try: ...
     	finally:
     	    future.cancel()
     	    with contextlib.suppress(asyncio.CancelledError):
     	   	await future
     	    self._pending_next_msgs_calls.pop(task_name, None)

But of course, that's just GPT. I'll try run my own tests in due time

@knutroy
Copy link

knutroy commented Apr 22, 2025

@tanmayv25 patch:

--- a/nats/aio/subscription.py
+++ b/nats/aio/subscription.py
@@ -203,6 +203,8 @@ class Subscription:
             self._pending_queue.task_done()
             return msg
         finally:
+            future.cancel()
+            await future
             self._pending_next_msgs_calls.pop(task_name, None)
 
     def _start(self, error_cb):

has no effect on the runtime warning when I test it using test_fetch_being_cancelled_early() from my previous post.

My understanding is that the core of the issue is that one task is calling self._pending_queue.get() and then passes the returned coroutine object to another task which gets cancelled before getting to the point of awaiting that coroutine object.

Option 1 in my previous post ensures the coroutine object gets closed by the original task in case the other task that was supposed to await it never got around to doing so due to cancellation. If already closed (the common case), the extra call to close has no effect.

Option 2 in my previous post introduces a new coroutine function causing the call to self._pending_queue.get() to be performed by the new task, so that the returned coroutine object is guaranteed to be awaited by that same task. This last option is maybe the cleanest one?

knutroy added a commit to knutroy/nats.py that referenced this issue Apr 22, 2025
…ing.

This could happen when the coroutine object returned from
asyncio.Queue.get() was handed over to a newly created task which was then
immediately cancelled so that the coroutine object was never awaited.

Instead, let the newly created task itself make the call to
asyncio.Queue.get() before immediately awaiting it.

Resolves nats-io#646.

Signed-off-by: Knut Aksel Røysland <knutroy@ifi.uio.no>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted The defect or proposal as been accepted bug Confirmed reproducible bug defect Suspected defect such as a bug or regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants