Skip to content

Commit

Permalink
Added the logger parameter to Datastore.start() and `EventBroker.…
Browse files Browse the repository at this point in the history
…start()`
  • Loading branch information
agronholm committed Nov 12, 2023
1 parent d6c5fc0 commit 4cb053e
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 37 deletions.
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ APScheduler, see the :doc:`migration section <migration>`.
- **BREAKING** Added the ``task_id`` attribute to the ``ScheduleAdded``,
``ScheduleUpdated`` and ``ScheduleRemoved`` events
- **BREAKING** Added the ``finished`` attribute to the ``ScheduleRemoved`` event
- **BREAKING** Added the ``logger`` parameter to ``Datastore.start()`` and
``EventBroker.start()`` to make both use the scheduler's assigned logger
- Added the ``configure_task()`` and ``get_tasks()`` scheduler methods
- Fixed out of order delivery of events delivered using worker threads
- Fixed schedule processing not setting job start deadlines correctly
Expand Down
4 changes: 2 additions & 2 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ async def _ensure_services_initialized(self, exit_stack: AsyncExitStack) -> None
self._services_initialized = True
exit_stack.callback(setattr, self, "_services_initialized", False)

await self.event_broker.start(exit_stack)
await self.data_store.start(exit_stack, self.event_broker)
await self.event_broker.start(exit_stack, self.logger)
await self.data_store.start(exit_stack, self.event_broker, self.logger)

def _check_initialized(self) -> None:
"""Raise RuntimeError if the services have not been initialized yet."""
Expand Down
7 changes: 5 additions & 2 deletions src/apscheduler/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from abc import ABCMeta, abstractmethod
from contextlib import AsyncExitStack
from datetime import datetime
from logging import Logger
from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator
from uuid import UUID

Expand Down Expand Up @@ -108,12 +109,13 @@ class EventBroker(metaclass=ABCMeta):
"""

@abstractmethod
async def start(self, exit_stack: AsyncExitStack) -> None:
async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
"""
Start the event broker.
:param exit_stack: an asynchronous exit stack which will be processed when the
scheduler is shut down
:param logger: the logger object the event broker should use to log events
"""

@abstractmethod
Expand Down Expand Up @@ -157,7 +159,7 @@ class DataStore(metaclass=ABCMeta):

@abstractmethod
async def start(
self, exit_stack: AsyncExitStack, event_broker: EventBroker
self, exit_stack: AsyncExitStack, event_broker: EventBroker, logger: Logger
) -> None:
"""
Start the event broker.
Expand All @@ -166,6 +168,7 @@ async def start(
scheduler is shut down
:param event_broker: the event broker shared between the scheduler, scheduler
(if any) and this data store
:param logger: the logger object the data store should use to log events
"""

@abstractmethod
Expand Down
8 changes: 3 additions & 5 deletions src/apscheduler/datastores/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from contextlib import AsyncExitStack
from logging import Logger, getLogger
from logging import Logger

import attrs

Expand All @@ -24,12 +24,10 @@ class BaseDataStore(DataStore):
_logger: Logger = attrs.field(init=False)

async def start(
self, exit_stack: AsyncExitStack, event_broker: EventBroker
self, exit_stack: AsyncExitStack, event_broker: EventBroker, logger: Logger
) -> None:
self._event_broker = event_broker

def __attrs_post_init__(self):
self._logger = getLogger(self.__class__.__name__)
self._logger = logger


@attrs.define(kw_only=True)
Expand Down
5 changes: 3 additions & 2 deletions src/apscheduler/datastores/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections.abc import Mapping
from contextlib import AsyncExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger
from typing import Any, Callable, ClassVar, Iterable
from uuid import UUID

Expand Down Expand Up @@ -154,9 +155,9 @@ def _initialize(self) -> None:
self._jobs_results.create_index("expires_at", session=session)

async def start(
self, exit_stack: AsyncExitStack, event_broker: EventBroker
self, exit_stack: AsyncExitStack, event_broker: EventBroker, logger: Logger
) -> None:
await super().start(exit_stack, event_broker)
await super().start(exit_stack, event_broker, logger)
server_info = await to_thread.run_sync(self.client.server_info)
if server_info["versionArray"] < [4, 0]:
raise RuntimeError(
Expand Down
5 changes: 3 additions & 2 deletions src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from contextlib import AsyncExitStack, asynccontextmanager
from datetime import datetime, timedelta, timezone
from functools import partial
from logging import Logger
from typing import Any, Iterable
from uuid import UUID

Expand Down Expand Up @@ -329,9 +330,9 @@ def get_table_definitions(self) -> MetaData:
return metadata

async def start(
self, exit_stack: AsyncExitStack, event_broker: EventBroker
self, exit_stack: AsyncExitStack, event_broker: EventBroker, logger: Logger
) -> None:
await super().start(exit_stack, event_broker)
await super().start(exit_stack, event_broker, logger)
asynclib = sniffio.current_async_library() or "(unknown)"
if asynclib != "asyncio":
raise RuntimeError(
Expand Down
5 changes: 3 additions & 2 deletions src/apscheduler/eventbrokers/asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections.abc import Awaitable, Mapping
from contextlib import AsyncExitStack
from functools import partial
from logging import Logger
from typing import TYPE_CHECKING, Any, Callable, cast

import asyncpg
Expand Down Expand Up @@ -106,8 +107,8 @@ def from_async_sqla_engine(
def _temporary_failure_exceptions(self) -> tuple[type[Exception], ...]:
return OSError, InterfaceError

async def start(self, exit_stack: AsyncExitStack) -> None:
await super().start(exit_stack)
async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await super().start(exit_stack, logger)
self._send = cast(
MemoryObjectSendStream[str],
await self._task_group.start(self._listen_notifications),
Expand Down
8 changes: 3 additions & 5 deletions src/apscheduler/eventbrokers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from base64 import b64decode, b64encode
from contextlib import AsyncExitStack
from inspect import iscoroutine
from logging import Logger, getLogger
from logging import Logger
from typing import Any, Callable, Iterable

import attrs
Expand Down Expand Up @@ -40,10 +40,8 @@ class BaseEventBroker(EventBroker):
_task_group: TaskGroup = attrs.field(init=False)
_thread_limiter: CapacityLimiter = attrs.field(init=False)

def __attrs_post_init__(self) -> None:
self._logger = getLogger(self.__class__.__module__)

async def start(self, exit_stack: AsyncExitStack) -> None:
async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
self._logger = logger
self._task_group = await exit_stack.enter_async_context(create_task_group())
self._thread_limiter = CapacityLimiter(1)

Expand Down
5 changes: 3 additions & 2 deletions src/apscheduler/eventbrokers/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from concurrent.futures import Future
from contextlib import AsyncExitStack
from logging import Logger
from typing import Any

import attrs
Expand Down Expand Up @@ -42,8 +43,8 @@ class MQTTEventBroker(BaseExternalEventBroker):
_portal: BlockingPortal = attrs.field(init=False)
_ready_future: Future[None] = attrs.field(init=False)

async def start(self, exit_stack: AsyncExitStack) -> None:
await super().start(exit_stack)
async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await super().start(exit_stack, logger)
self._portal = await exit_stack.enter_async_context(BlockingPortal())
self._ready_future = Future()
self.client.on_connect = self._on_connect
Expand Down
5 changes: 3 additions & 2 deletions src/apscheduler/eventbrokers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from asyncio import CancelledError
from contextlib import AsyncExitStack
from logging import Logger

import anyio
import attrs
Expand Down Expand Up @@ -68,8 +69,8 @@ def after_attempt(retry_state: tenacity.RetryCallState) -> None:
reraise=True,
)

async def start(self, exit_stack: AsyncExitStack) -> None:
await super().start(exit_stack)
async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await super().start(exit_stack, logger)
pubsub = self.client.pubsub()
await pubsub.subscribe(self.channel)

Expand Down
17 changes: 12 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import logging
import sys
from collections.abc import Generator
from contextlib import AsyncExitStack
from logging import Logger
from tempfile import TemporaryDirectory
from typing import Any, AsyncGenerator, cast

Expand Down Expand Up @@ -108,10 +110,10 @@ async def raw_event_broker(request: SubRequest) -> EventBroker:

@pytest.fixture
async def event_broker(
raw_event_broker: EventBroker,
raw_event_broker: EventBroker, logger: Logger
) -> AsyncGenerator[EventBroker, Any]:
async with AsyncExitStack() as exit_stack:
await raw_event_broker.start(exit_stack)
await raw_event_broker.start(exit_stack, logger)
yield raw_event_broker


Expand Down Expand Up @@ -312,11 +314,16 @@ async def raw_datastore(request: SubRequest) -> DataStore:
return cast(DataStore, request.param)


@pytest.fixture(scope="session")
def logger() -> Logger:
return logging.getLogger("apscheduler")


@pytest.fixture
async def datastore(
raw_datastore: DataStore, local_broker: EventBroker
raw_datastore: DataStore, local_broker: EventBroker, logger: Logger
) -> AsyncGenerator[DataStore, Any]:
async with AsyncExitStack() as exit_stack:
await local_broker.start(exit_stack)
await raw_datastore.start(exit_stack, local_broker)
await local_broker.start(exit_stack, logger)
await raw_datastore.start(exit_stack, local_broker, logger)
yield raw_datastore
11 changes: 7 additions & 4 deletions tests/test_datastores.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from contextlib import AsyncExitStack, asynccontextmanager
from datetime import datetime, timedelta, timezone
from logging import Logger
from typing import AsyncGenerator

import anyio
Expand Down Expand Up @@ -503,18 +504,20 @@ async def test_add_get_task(datastore: DataStore) -> None:


async def test_cancel_start(
raw_datastore: DataStore, local_broker: EventBroker
raw_datastore: DataStore, local_broker: EventBroker, logger: Logger
) -> None:
with CancelScope() as scope:
scope.cancel()
async with AsyncExitStack() as exit_stack:
await raw_datastore.start(exit_stack, local_broker)
await raw_datastore.start(exit_stack, local_broker, logger)


async def test_cancel_stop(raw_datastore: DataStore, local_broker: EventBroker) -> None:
async def test_cancel_stop(
raw_datastore: DataStore, local_broker: EventBroker, logger: Logger
) -> None:
with CancelScope() as scope:
async with AsyncExitStack() as exit_stack:
await raw_datastore.start(exit_stack, local_broker)
await raw_datastore.start(exit_stack, local_broker, logger)
scope.cancel()


Expand Down
9 changes: 5 additions & 4 deletions tests/test_eventbrokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from contextlib import AsyncExitStack
from datetime import datetime, timezone
from logging import Logger

import pytest
from _pytest.logging import LogCaptureFixture
Expand Down Expand Up @@ -106,15 +107,15 @@ def bad_subscriber(event: Event) -> None:
assert "Error delivering Event" in caplog.text


async def test_cancel_start(raw_event_broker: EventBroker) -> None:
async def test_cancel_start(raw_event_broker: EventBroker, logger: Logger) -> None:
with CancelScope() as scope:
scope.cancel()
async with AsyncExitStack() as exit_stack:
await raw_event_broker.start(exit_stack)
await raw_event_broker.start(exit_stack, logger)


async def test_cancel_stop(raw_event_broker: EventBroker) -> None:
async def test_cancel_stop(raw_event_broker: EventBroker, logger: Logger) -> None:
with CancelScope() as scope:
async with AsyncExitStack() as exit_stack:
await raw_event_broker.start(exit_stack)
await raw_event_broker.start(exit_stack, logger)
scope.cancel()

0 comments on commit 4cb053e

Please sign in to comment.