From 4cb053e1f4b3dc790596d53e6712e16d814782d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 12 Nov 2023 23:53:10 +0200 Subject: [PATCH] Added the `logger` parameter to `Datastore.start()` and `EventBroker.start()` --- docs/versionhistory.rst | 2 ++ src/apscheduler/_schedulers/async_.py | 4 ++-- src/apscheduler/abc.py | 7 +++++-- src/apscheduler/datastores/base.py | 8 +++----- src/apscheduler/datastores/mongodb.py | 5 +++-- src/apscheduler/datastores/sqlalchemy.py | 5 +++-- src/apscheduler/eventbrokers/asyncpg.py | 5 +++-- src/apscheduler/eventbrokers/base.py | 8 +++----- src/apscheduler/eventbrokers/mqtt.py | 5 +++-- src/apscheduler/eventbrokers/redis.py | 5 +++-- tests/conftest.py | 17 ++++++++++++----- tests/test_datastores.py | 11 +++++++---- tests/test_eventbrokers.py | 9 +++++---- 13 files changed, 54 insertions(+), 37 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 54e2c5ef9..2473c65fb 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -17,6 +17,8 @@ APScheduler, see the :doc:`migration section `. - **BREAKING** Added the ``task_id`` attribute to the ``ScheduleAdded``, ``ScheduleUpdated`` and ``ScheduleRemoved`` events - **BREAKING** Added the ``finished`` attribute to the ``ScheduleRemoved`` event +- **BREAKING** Added the ``logger`` parameter to ``Datastore.start()`` and + ``EventBroker.start()`` to make both use the scheduler's assigned logger - Added the ``configure_task()`` and ``get_tasks()`` scheduler methods - Fixed out of order delivery of events delivered using worker threads - Fixed schedule processing not setting job start deadlines correctly diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 61daeb9b7..be8f47bca 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -157,8 +157,8 @@ async def _ensure_services_initialized(self, exit_stack: AsyncExitStack) -> None self._services_initialized = True exit_stack.callback(setattr, self, "_services_initialized", False) - await self.event_broker.start(exit_stack) - await self.data_store.start(exit_stack, self.event_broker) + await self.event_broker.start(exit_stack, self.logger) + await self.data_store.start(exit_stack, self.event_broker, self.logger) def _check_initialized(self) -> None: """Raise RuntimeError if the services have not been initialized yet.""" diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py index 584ea9e0b..cc44bbd67 100644 --- a/src/apscheduler/abc.py +++ b/src/apscheduler/abc.py @@ -4,6 +4,7 @@ from abc import ABCMeta, abstractmethod from contextlib import AsyncExitStack from datetime import datetime +from logging import Logger from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator from uuid import UUID @@ -108,12 +109,13 @@ class EventBroker(metaclass=ABCMeta): """ @abstractmethod - async def start(self, exit_stack: AsyncExitStack) -> None: + async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None: """ Start the event broker. :param exit_stack: an asynchronous exit stack which will be processed when the scheduler is shut down + :param logger: the logger object the event broker should use to log events """ @abstractmethod @@ -157,7 +159,7 @@ class DataStore(metaclass=ABCMeta): @abstractmethod async def start( - self, exit_stack: AsyncExitStack, event_broker: EventBroker + self, exit_stack: AsyncExitStack, event_broker: EventBroker, logger: Logger ) -> None: """ Start the event broker. @@ -166,6 +168,7 @@ async def start( scheduler is shut down :param event_broker: the event broker shared between the scheduler, scheduler (if any) and this data store + :param logger: the logger object the data store should use to log events """ @abstractmethod diff --git a/src/apscheduler/datastores/base.py b/src/apscheduler/datastores/base.py index 5c7ef7dae..c51c805db 100644 --- a/src/apscheduler/datastores/base.py +++ b/src/apscheduler/datastores/base.py @@ -1,7 +1,7 @@ from __future__ import annotations from contextlib import AsyncExitStack -from logging import Logger, getLogger +from logging import Logger import attrs @@ -24,12 +24,10 @@ class BaseDataStore(DataStore): _logger: Logger = attrs.field(init=False) async def start( - self, exit_stack: AsyncExitStack, event_broker: EventBroker + self, exit_stack: AsyncExitStack, event_broker: EventBroker, logger: Logger ) -> None: self._event_broker = event_broker - - def __attrs_post_init__(self): - self._logger = getLogger(self.__class__.__name__) + self._logger = logger @attrs.define(kw_only=True) diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 34896be33..7e72132d3 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -5,6 +5,7 @@ from collections.abc import Mapping from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone +from logging import Logger from typing import Any, Callable, ClassVar, Iterable from uuid import UUID @@ -154,9 +155,9 @@ def _initialize(self) -> None: self._jobs_results.create_index("expires_at", session=session) async def start( - self, exit_stack: AsyncExitStack, event_broker: EventBroker + self, exit_stack: AsyncExitStack, event_broker: EventBroker, logger: Logger ) -> None: - await super().start(exit_stack, event_broker) + await super().start(exit_stack, event_broker, logger) server_info = await to_thread.run_sync(self.client.server_info) if server_info["versionArray"] < [4, 0]: raise RuntimeError( diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 967646e57..0a398b722 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -6,6 +6,7 @@ from contextlib import AsyncExitStack, asynccontextmanager from datetime import datetime, timedelta, timezone from functools import partial +from logging import Logger from typing import Any, Iterable from uuid import UUID @@ -329,9 +330,9 @@ def get_table_definitions(self) -> MetaData: return metadata async def start( - self, exit_stack: AsyncExitStack, event_broker: EventBroker + self, exit_stack: AsyncExitStack, event_broker: EventBroker, logger: Logger ) -> None: - await super().start(exit_stack, event_broker) + await super().start(exit_stack, event_broker, logger) asynclib = sniffio.current_async_library() or "(unknown)" if asynclib != "asyncio": raise RuntimeError( diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py index 994ac21b7..d45aaf04f 100644 --- a/src/apscheduler/eventbrokers/asyncpg.py +++ b/src/apscheduler/eventbrokers/asyncpg.py @@ -3,6 +3,7 @@ from collections.abc import Awaitable, Mapping from contextlib import AsyncExitStack from functools import partial +from logging import Logger from typing import TYPE_CHECKING, Any, Callable, cast import asyncpg @@ -106,8 +107,8 @@ def from_async_sqla_engine( def _temporary_failure_exceptions(self) -> tuple[type[Exception], ...]: return OSError, InterfaceError - async def start(self, exit_stack: AsyncExitStack) -> None: - await super().start(exit_stack) + async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None: + await super().start(exit_stack, logger) self._send = cast( MemoryObjectSendStream[str], await self._task_group.start(self._listen_notifications), diff --git a/src/apscheduler/eventbrokers/base.py b/src/apscheduler/eventbrokers/base.py index 5d69c5d07..7279f1eb4 100644 --- a/src/apscheduler/eventbrokers/base.py +++ b/src/apscheduler/eventbrokers/base.py @@ -3,7 +3,7 @@ from base64 import b64decode, b64encode from contextlib import AsyncExitStack from inspect import iscoroutine -from logging import Logger, getLogger +from logging import Logger from typing import Any, Callable, Iterable import attrs @@ -40,10 +40,8 @@ class BaseEventBroker(EventBroker): _task_group: TaskGroup = attrs.field(init=False) _thread_limiter: CapacityLimiter = attrs.field(init=False) - def __attrs_post_init__(self) -> None: - self._logger = getLogger(self.__class__.__module__) - - async def start(self, exit_stack: AsyncExitStack) -> None: + async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None: + self._logger = logger self._task_group = await exit_stack.enter_async_context(create_task_group()) self._thread_limiter = CapacityLimiter(1) diff --git a/src/apscheduler/eventbrokers/mqtt.py b/src/apscheduler/eventbrokers/mqtt.py index 063cec700..c73788034 100644 --- a/src/apscheduler/eventbrokers/mqtt.py +++ b/src/apscheduler/eventbrokers/mqtt.py @@ -3,6 +3,7 @@ import sys from concurrent.futures import Future from contextlib import AsyncExitStack +from logging import Logger from typing import Any import attrs @@ -42,8 +43,8 @@ class MQTTEventBroker(BaseExternalEventBroker): _portal: BlockingPortal = attrs.field(init=False) _ready_future: Future[None] = attrs.field(init=False) - async def start(self, exit_stack: AsyncExitStack) -> None: - await super().start(exit_stack) + async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None: + await super().start(exit_stack, logger) self._portal = await exit_stack.enter_async_context(BlockingPortal()) self._ready_future = Future() self.client.on_connect = self._on_connect diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py index a866b6b6b..8c967736d 100644 --- a/src/apscheduler/eventbrokers/redis.py +++ b/src/apscheduler/eventbrokers/redis.py @@ -2,6 +2,7 @@ from asyncio import CancelledError from contextlib import AsyncExitStack +from logging import Logger import anyio import attrs @@ -68,8 +69,8 @@ def after_attempt(retry_state: tenacity.RetryCallState) -> None: reraise=True, ) - async def start(self, exit_stack: AsyncExitStack) -> None: - await super().start(exit_stack) + async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None: + await super().start(exit_stack, logger) pubsub = self.client.pubsub() await pubsub.subscribe(self.channel) diff --git a/tests/conftest.py b/tests/conftest.py index be070ccab..f22f9fd9b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,10 @@ from __future__ import annotations +import logging import sys from collections.abc import Generator from contextlib import AsyncExitStack +from logging import Logger from tempfile import TemporaryDirectory from typing import Any, AsyncGenerator, cast @@ -108,10 +110,10 @@ async def raw_event_broker(request: SubRequest) -> EventBroker: @pytest.fixture async def event_broker( - raw_event_broker: EventBroker, + raw_event_broker: EventBroker, logger: Logger ) -> AsyncGenerator[EventBroker, Any]: async with AsyncExitStack() as exit_stack: - await raw_event_broker.start(exit_stack) + await raw_event_broker.start(exit_stack, logger) yield raw_event_broker @@ -312,11 +314,16 @@ async def raw_datastore(request: SubRequest) -> DataStore: return cast(DataStore, request.param) +@pytest.fixture(scope="session") +def logger() -> Logger: + return logging.getLogger("apscheduler") + + @pytest.fixture async def datastore( - raw_datastore: DataStore, local_broker: EventBroker + raw_datastore: DataStore, local_broker: EventBroker, logger: Logger ) -> AsyncGenerator[DataStore, Any]: async with AsyncExitStack() as exit_stack: - await local_broker.start(exit_stack) - await raw_datastore.start(exit_stack, local_broker) + await local_broker.start(exit_stack, logger) + await raw_datastore.start(exit_stack, local_broker, logger) yield raw_datastore diff --git a/tests/test_datastores.py b/tests/test_datastores.py index c883e4696..091ccbbfc 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -2,6 +2,7 @@ from contextlib import AsyncExitStack, asynccontextmanager from datetime import datetime, timedelta, timezone +from logging import Logger from typing import AsyncGenerator import anyio @@ -503,18 +504,20 @@ async def test_add_get_task(datastore: DataStore) -> None: async def test_cancel_start( - raw_datastore: DataStore, local_broker: EventBroker + raw_datastore: DataStore, local_broker: EventBroker, logger: Logger ) -> None: with CancelScope() as scope: scope.cancel() async with AsyncExitStack() as exit_stack: - await raw_datastore.start(exit_stack, local_broker) + await raw_datastore.start(exit_stack, local_broker, logger) -async def test_cancel_stop(raw_datastore: DataStore, local_broker: EventBroker) -> None: +async def test_cancel_stop( + raw_datastore: DataStore, local_broker: EventBroker, logger: Logger +) -> None: with CancelScope() as scope: async with AsyncExitStack() as exit_stack: - await raw_datastore.start(exit_stack, local_broker) + await raw_datastore.start(exit_stack, local_broker, logger) scope.cancel() diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py index 05b3f5af5..88150f4a4 100644 --- a/tests/test_eventbrokers.py +++ b/tests/test_eventbrokers.py @@ -3,6 +3,7 @@ import sys from contextlib import AsyncExitStack from datetime import datetime, timezone +from logging import Logger import pytest from _pytest.logging import LogCaptureFixture @@ -106,15 +107,15 @@ def bad_subscriber(event: Event) -> None: assert "Error delivering Event" in caplog.text -async def test_cancel_start(raw_event_broker: EventBroker) -> None: +async def test_cancel_start(raw_event_broker: EventBroker, logger: Logger) -> None: with CancelScope() as scope: scope.cancel() async with AsyncExitStack() as exit_stack: - await raw_event_broker.start(exit_stack) + await raw_event_broker.start(exit_stack, logger) -async def test_cancel_stop(raw_event_broker: EventBroker) -> None: +async def test_cancel_stop(raw_event_broker: EventBroker, logger: Logger) -> None: with CancelScope() as scope: async with AsyncExitStack() as exit_stack: - await raw_event_broker.start(exit_stack) + await raw_event_broker.start(exit_stack, logger) scope.cancel()