Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions taskiq/formatters/json_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage:
return BrokerMessage(
task_id=message.task_id,
task_name=message.task_name,
queue=message.queue,
message=model_dump_json(message).encode(),
labels=message.labels,
)
Expand Down
1 change: 1 addition & 0 deletions taskiq/formatters/proxy_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage:
return BrokerMessage(
task_id=message.task_id,
task_name=message.task_name,
queue=message.queue,
message=self.broker.serializer.dumpb(model_dump(message)),
labels=message.labels,
)
Expand Down
17 changes: 17 additions & 0 deletions taskiq/kicker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from taskiq.exceptions import SendTaskError
from taskiq.labels import prepare_label
from taskiq.message import TaskiqMessage
from taskiq.queue import DEFAULT_QUEUE, Queue
from taskiq.scheduler.created_schedule import CreatedSchedule
from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask
from taskiq.task import AsyncTaskiqTask
Expand All @@ -47,10 +48,12 @@ def __init__(
task_name: str,
broker: "AsyncBroker",
labels: Dict[str, Any],
queue: Union["Queue", str] = DEFAULT_QUEUE,
return_type: Optional[Type[_ReturnType]] = None,
) -> None:
self.task_name = task_name
self.broker = broker
self.queue = Queue(queue)
self.labels = labels
self.custom_task_id: Optional[str] = None
self.custom_schedule_id: Optional[str] = None
Expand Down Expand Up @@ -111,6 +114,19 @@ def with_broker(
self.broker = broker
return self

def with_queue(
self,
queue: Union["Queue", str],
) -> "AsyncKicker[_FuncParams, _ReturnType]":
"""
Replace queue for the function.

:param queue: new queue instance or name.
:return: Kicker with new queue.
"""
self.queue = Queue(queue)
return self

@overload
async def kiq(
self: "AsyncKicker[_FuncParams, CoroutineType[Any, Any, _T]]",
Expand Down Expand Up @@ -296,6 +312,7 @@ def _prepare_message(
return TaskiqMessage(
task_id=task_id,
task_name=self.task_name,
queue=self.queue.name,
labels=labels,
labels_types=labels_types,
args=formatted_args,
Expand Down
2 changes: 2 additions & 0 deletions taskiq/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TaskiqMessage(BaseModel):

task_id: str
task_name: str
queue: str
labels: Dict[str, Any]
labels_types: Optional[Dict[str, int]] = None
args: List[Any]
Expand All @@ -40,5 +41,6 @@ class BrokerMessage(BaseModel):

task_id: str
task_name: str
queue: str
message: bytes
labels: Dict[str, Any]
26 changes: 26 additions & 0 deletions taskiq/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

import dataclasses

DEFAULT_QUEUE = "taksiq"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oromogans typo



@dataclasses.dataclass(frozen=True, init=False, eq=True)
class Queue:
"""Represents an abstraction for dealing with queues in real brokers."""

name: str

def __init__(self, src: str | Queue) -> None:
if isinstance(src, Queue):
object.__setattr__(self, "name", src.name)
elif isinstance(src, str):
object.__setattr__(self, "name", src)
else:
raise TypeError(
"Queue.__init__ expect str or Queue, "
"{type(src).__name__!r} is recieved",
)

def __repr__(self) -> str:
return self.name
7 changes: 7 additions & 0 deletions tests/cli/worker/test_parameters_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_parse_params_no_signature() -> None:
task_id="",
task_name="",
labels={},
queue="taksiq",
args=[1, 2],
kwargs={"a": 1},
)
Expand All @@ -49,6 +50,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
task_id="",
task_name="",
labels={},
queue="taksiq",
args=[{"field": "test_val"}],
kwargs={},
)
Expand All @@ -66,6 +68,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
task_id="",
task_name="",
labels={},
queue="taksiq",
args=[],
kwargs={"param": {"field": "test_val"}},
)
Expand All @@ -91,6 +94,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
task_id="",
task_name="",
labels={},
queue="taksiq",
args=[{"unknown": "unknown"}],
kwargs={},
)
Expand All @@ -107,6 +111,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
task_id="",
task_name="",
labels={},
queue="taksiq",
args=[],
kwargs={"param": {"unknown": "unknown"}},
)
Expand All @@ -130,6 +135,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
msg_with_args = TaskiqMessage(
task_id="",
task_name="",
queue="taksiq",
labels={},
args=[None],
kwargs={},
Expand All @@ -142,6 +148,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
msg_with_kwargs = TaskiqMessage(
task_id="",
task_name="",
queue="taksiq",
labels={},
args=[],
kwargs={"param": None},
Expand Down
1 change: 1 addition & 0 deletions tests/depends/test_progress_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_message(
task_id=task_id or task.broker.id_generator(),
task_name=task.task_name,
labels=labels,
queue="taksiq",
args=list(args),
kwargs=kwargs,
)
Expand Down
3 changes: 3 additions & 0 deletions tests/formatters/test_json_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ async def test_json_dumps() -> None:
msg = TaskiqMessage(
task_id="task-id",
task_name="task.name",
queue="taksiq",
labels={"label1": 1, "label2": "text"},
args=[1, "a"],
kwargs={"p1": "v1"},
)
expected = BrokerMessage(
task_id="task-id",
task_name="task.name",
queue="taksiq",
message=(
b'{"task_id":"task-id","task_name":"task.name",'
b'"labels":{"label1":1,"label2":"text"},'
Expand Down Expand Up @@ -46,6 +48,7 @@ async def test_json_loads() -> None:
task_id="task-id",
task_name="task.name",
labels={"label1": 1, "label2": "text"},
queue="taksiq",
args=[1, "a"],
kwargs={"p1": "v1"},
)
Expand Down
3 changes: 3 additions & 0 deletions tests/formatters/test_proxy_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ async def test_proxy_dumps() -> None:
msg = TaskiqMessage(
task_id="task-id",
task_name="task.name",
queue="taksiq",
labels={"label1": 1, "label2": "text"},
args=[1, "a"],
kwargs={"p1": "v1"},
)
expected = BrokerMessage(
task_id="task-id",
task_name="task.name",
queue="taskiq",
message=(
b'{"task_id": "task-id", "task_name": "task.name", '
b'"labels": {"label1": 1, "label2": "text"}, '
Expand All @@ -41,6 +43,7 @@ async def test_proxy_loads() -> None:
expected = TaskiqMessage(
task_id="task-id",
task_name="task.name",
queue="taskiq",
labels={"label1": 1, "label2": "text"},
args=[1, "a"],
kwargs={"p1": "v1"},
Expand Down
3 changes: 3 additions & 0 deletions tests/middlewares/test_simple_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async def test_successful_retry(broker: AsyncMock) -> None:
TaskiqMessage(
task_id="test_id",
task_name="meme",
queue="taskiq",
labels={
"retry_on_error": "True",
},
Expand All @@ -47,6 +48,7 @@ async def test_no_retry(broker: AsyncMock) -> None:
TaskiqMessage(
task_id="test_id",
task_name="meme",
queue="taskiq",
labels={},
args=[],
kwargs={},
Expand All @@ -65,6 +67,7 @@ async def test_max_retries(broker: AsyncMock) -> None:
TaskiqMessage(
task_id="test_id",
task_name="meme",
queue="taskiq",
labels={
"retry_on_error": "True",
"_retries": "2",
Expand Down
12 changes: 12 additions & 0 deletions tests/receiver/test_params_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def func(a: int, b: int) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=["1", "2"],
kwargs={},
)
Expand All @@ -47,6 +48,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[{"a": "10", "b": "20"}],
kwargs={},
)
Expand All @@ -68,6 +70,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[{"a": "10", "b": "20"}],
kwargs={},
)
Expand All @@ -85,6 +88,7 @@ def func(a: int, b: int) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=["f3", "2"],
kwargs={},
)
Expand All @@ -108,6 +112,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[{"a": "10", "b": "f3"}],
kwargs={},
)
Expand All @@ -130,6 +135,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[{"a": "10", "b": "f3"}],
kwargs={},
)
Expand All @@ -149,6 +155,7 @@ def func(a: int, b: int) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[1],
kwargs={"b": "2"},
)
Expand All @@ -171,6 +178,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": {"a": "10", "b": "20"}},
)
Expand All @@ -192,6 +200,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": {"a": "10", "b": "20"}},
)
Expand All @@ -209,6 +218,7 @@ def func(a: int, b: int) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": "1", "b": "f3"},
)
Expand All @@ -232,6 +242,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": {"a": "10", "b": "f3"}},
)
Expand All @@ -254,6 +265,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": {"a": "10", "b": "f3"}},
)
Expand Down
Loading
Loading