Skip to content

fix: error handling and logging #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 54 additions & 50 deletions fastapi_websocket_pubsub/event_broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,23 @@ def __init__(
self._tasks = set()
self.listening_broadcast_channel = None

async def _connect_broadcast_channel(self):
"""Helper method to connect to broadcast channel with proper error handling"""
try:
if self.listening_broadcast_channel is None:
self.listening_broadcast_channel = self._broadcast_type(self._broadcast_url)
await self.listening_broadcast_channel.connect()
logger.info(f"Successfully connected to broadcast channel at {self._broadcast_url}")
except ConnectionError as e:
error_msg = f"Failed to connect to broadcast channel - Connection error: {str(e)}"
logger.error(error_msg)
raise ConnectionError(error_msg) from e
except Exception as e:
error_msg = f"Failed to connect to broadcast channel: {e.__class__.__name__}: {str(e)}"
logger.error(error_msg)
raise


async def __broadcast_notifications__(self, subscription: Subscription, data):
"""
Share incoming internal notifications with the entire broadcast channel
Expand Down Expand Up @@ -225,65 +242,41 @@ async def __aexit__(self, exc_type, exc, tb):
await self._context_manager.__aexit__(exc_type, exc, tb)

async def start_reader_task(self):
"""Spawn a task reading incoming broadcasts and posting them to the intreal notifier
Raises:
BroadcasterAlreadyStarted: if called more than once per context
Returns:
the spawned task
"""
# Make sure a task wasn't started already
"""Spawn a task reading incoming broadcasts with improved error handling"""
if self._subscription_task is not None:
# we already started a task for this worker process
logger.debug(
"No need for listen task, already started broadcast listen task for this notifier"
)
logger.debug("Broadcast listen task already started")
return

# Init new broadcast channel for reading
try:
if self.listening_broadcast_channel is None:
self.listening_broadcast_channel = self._broadcast_type(
self._broadcast_url
)
await self.listening_broadcast_channel.connect()
except Exception as e:
logger.error(
f"Failed to connect to broadcast channel for reading incoming events: {e}"
)
raise e
# Connect with proper error handling
await self._connect_broadcast_channel()

# Trigger the task
logger.debug("Spawning broadcast listen task")
self._subscription_task = asyncio.create_task(self.__read_notifications__())
return self._subscription_task

def get_reader_task(self):
return self._subscription_task



async def __read_notifications__(self):
"""
read incoming broadcasts and posting them to the intreal notifier
"""
"""Read incoming broadcasts with improved error handling"""
logger.debug("Starting broadcaster listener")

try:
# Subscribe to our channel
async with self.listening_broadcast_channel.subscribe(
channel=self._channel
) as subscriber:
if self.listening_broadcast_channel is None:
raise RuntimeError("Broadcast channel not initialized")

async with self.listening_broadcast_channel.subscribe(channel=self._channel) as subscriber:
async for event in subscriber:
try:
notification = BroadcastNotification.parse_raw(event.message)
# Avoid re-publishing our own broadcasts
if notification.notifier_id != self._id:
logger.debug(
"Handling incoming broadcast event: {}".format(
{
"topics": notification.topics,
"src": notification.notifier_id,
}
)
f"Handling incoming broadcast event: "
f"topics={notification.topics}, src={notification.notifier_id}"
)
# Notify subscribers of message received from broadcast

task = asyncio.create_task(
self._notifier.notify(
notification.topics,
Expand All @@ -293,17 +286,28 @@ async def __read_notifications__(self):
)

self._tasks.add(task)

def cleanup(task):
self._tasks.remove(task)

task.add_done_callback(cleanup)
except:
logger.exception("Failed handling incoming broadcast")
logger.info(
"No more events to read from subscriber (underlying connection closed)"
)
task.add_done_callback(lambda t: self._tasks.remove(t))

except Exception as e:
logger.error(f"Failed handling incoming broadcast: {str(e)}")
# Log full traceback for debugging
logger.exception("Full error traceback:")

except ConnectionError as e:
logger.error(f"Broadcast channel connection error: {str(e)}")
raise

except Exception as e:
logger.error(f"Error in broadcast listener: {e.__class__.__name__}: {str(e)}")
logger.exception("Full error traceback:")
raise

finally:
if self.listening_broadcast_channel is not None:
await self.listening_broadcast_channel.disconnect()
try:
await self.listening_broadcast_channel.disconnect()
except Exception as e:
logger.error(f"Error disconnecting broadcast channel: {str(e)}")
self.listening_broadcast_channel = None

logger.info("Broadcast listener stopped")
Loading