Skip to content

Commit

Permalink
WIP: Broker and Handler template added.
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos Bellino committed Nov 9, 2023
1 parent 4d28c73 commit acb1d0d
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 7 deletions.
2 changes: 1 addition & 1 deletion faststream/sqs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from faststream.broker.test import TestApp
from faststream.sqs.annotations import SQSBroker, SQSMessage, SQSProducer
from faststream.sqs.annotations import SQSBroker, SQSMessage
from faststream.sqs.router import SQSRouter
from faststream.sqs.shared.router import SQSRoute
from faststream.sqs.shared.schemas import (
Expand Down
6 changes: 6 additions & 0 deletions faststream/sqs/annotations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from aiobotocore.client import AioBaseClient

from faststream._compat import Annotated
from faststream.annotations import ContextRepo, Logger, NoCast
from faststream.sqs.broker import SQSBroker as SB # NOQA
Expand All @@ -12,8 +14,12 @@
"SQSBroker",
"SQSMessage",
"SQSProducer",
"client",
"queue_url",
)

SQSBroker = Annotated[SB, Context("broker")]
SQSMessage = Annotated[SM, Context("message")]
SQSProducer = Annotated[SQSFastProducer, Context("broker._producer")]
client = Annotated[AioBaseClient, Context("client")]
queue_url = Annotated[str, Context("queue_url")]
4 changes: 2 additions & 2 deletions faststream/sqs/asyncapi.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
class Publisher:
class Handler:
# TODO
pass


class Handler:
class Publisher:
# TODO
pass
98 changes: 95 additions & 3 deletions faststream/sqs/broker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,95 @@
class SQSBroker:
# TODO
pass
from types import TracebackType
from typing import Any, Awaitable, Callable, Dict, Optional, Sequence, Type, Union

from aiobotocore.client import AioBaseClient
from fast_depends.dependencies import Depends

from faststream import BaseMiddleware
from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter
from faststream.broker.message import StreamMessage
from faststream.broker.publisher import BasePublisher
from faststream.broker.push_back_watcher import BaseWatcher
from faststream.broker.types import (
CustomDecoder,
CustomParser,
Filter,
MsgType,
P_HandlerParams,
T_HandlerReturn,
WrappedReturn,
)
from faststream.broker.wrapper import HandlerCallWrapper
from faststream.sqs.asyncapi import Handler, Publisher
from faststream.sqs.producer import SQSFastProducer
from faststream.sqs.shared.logging import SQSLoggingMixin
from faststream.types import AnyDict, SendableMessage


class SQSBroker(
SQSLoggingMixin,
BrokerAsyncUsecase[AnyDict, AioBaseClient],
):
handlers: Dict[str, Handler] # type: ignore[assignment]
_publishers: Dict[str, Publisher] # type: ignore[assignment]
_producer: Optional[SQSFastProducer]

async def start(self) -> None:
pass

async def _connect(self, **kwargs: Any) -> AioBaseClient:
pass

async def _close(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exec_tb: Optional[TracebackType] = None,
) -> None:
pass

def _process_message(
self,
func: Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]],
watcher: BaseWatcher,
) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]],]:
pass

async def publish(
self,
message: SendableMessage,
*args: Any,
reply_to: str = "",
rpc: bool = False,
rpc_timeout: Optional[float] = None,
raise_timeout: bool = False,
**kwargs: Any,
) -> Optional[SendableMessage]:
pass

def subscriber(
self,
*broker_args: Any,
retry: Union[bool, int] = False,
dependencies: Sequence[Depends] = (),
decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None,
parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None,
middlewares: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] = None,
filter: Filter[StreamMessage[MsgType]] = default_filter,
_raw: bool = False,
_get_dependant: Optional[Any] = None,
**broker_kwargs: Any,
) -> Callable[
[
Union[
Callable[P_HandlerParams, T_HandlerReturn],
HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
]
],
HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
]:
pass

def publisher(
self, key: Any, publisher: BasePublisher[MsgType]
) -> BasePublisher[MsgType]:
pass
61 changes: 60 additions & 1 deletion faststream/sqs/handler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,69 @@
import asyncio
import logging
from typing import Any, NoReturn, Optional

import anyio
from aiobotocore.client import AioBaseClient
from typing_extensions import TypeAlias

from faststream.broker.handler import AsyncHandler
from faststream.sqs.shared.schemas import SQSQueue
from faststream.types import AnyDict
from faststream.utils.context import context

QueueUrl: TypeAlias = str


class LogicSQSHandler(AsyncHandler[AnyDict]):
queue: SQSQueue
consumer_params: AnyDict
task: Optional["asyncio.Task[Any]"] = None

async def _consume(self, queue_url: str) -> NoReturn:
c = self._get_log_context(None, self.queue.name)

connected = True
with context.scope("queue_url", queue_url):
while True:
try:
if connected is False:
await self.create_queue(self.queue)

r = await self._connection.receive_message(
QueueUrl=queue_url,
**self.consumer_params,
)

except Exception as e:
if connected is True:
self._log(e, logging.WARNING, c, exc_info=e)
self._queues.pop(self.queue.name)
connected = False

await anyio.sleep(5)

else:
if connected is False:
self._log("Connection established", logging.INFO, c)
connected = True

messages = r.get("Messages", [])
for msg in messages:
try:
await self.callback(msg, True)
except Exception:
has_trash_messages = True
else:
has_trash_messages = False

if has_trash_messages is True:
await anyio.sleep(
self.consumer_params.get("WaitTimeSeconds", 1.0)
)

async def start(self, client: AioBaseClient) -> None:
# TODO check "start" method on broker
url = await self.create_queue(self.queue)
self.task = asyncio.create_task(self._consume(url))

async def close(self) -> None:
pass

0 comments on commit acb1d0d

Please sign in to comment.