diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 4fc07c3e..26c6bd7b 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -15,6 +15,7 @@ APScheduler, see the :doc:`migration section `. ``JobAcquired`` and ``JobReleased`` events - **BREAKING** Added the ``task_id`` attribute to the ``ScheduleAdded``, ``ScheduleUpdated`` and ``ScheduleRemoved`` events +- **BREAKING** Added the ``finished`` attribute to the ``ScheduleRemoved`` event - Added the ``configure_task()`` and ``get_tasks()`` scheduler methods - Fixed out of order delivery of events delivered using worker threads - Fixed schedule processing not setting job start deadlines correctly diff --git a/src/apscheduler/_events.py b/src/apscheduler/_events.py index bb9e0902..0b3d476f 100644 --- a/src/apscheduler/_events.py +++ b/src/apscheduler/_events.py @@ -125,10 +125,13 @@ class ScheduleRemoved(DataStoreEvent): :ivar schedule_id: ID of the schedule that was removed :ivar task_id: ID of the task the schedule belongs to + :ivar finished: ``True`` if the schedule was removed automatically because its + trigger had no more fire times left """ schedule_id: str task_id: str + finished: bool @attrs.define(kw_only=True, frozen=True) diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index 06c052f0..dd8fa0d7 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -184,13 +184,17 @@ async def add_schedule( await self._event_broker.publish(event) - async def remove_schedules(self, ids: Iterable[str]) -> None: + async def remove_schedules( + self, ids: Iterable[str], *, finished: bool = False + ) -> None: for schedule_id in ids: state = self._schedules_by_id.pop(schedule_id, None) if state: self._schedules.remove(state) event = ScheduleRemoved( - schedule_id=state.schedule.id, task_id=state.schedule.task_id + schedule_id=state.schedule.id, + task_id=state.schedule.task_id, + finished=finished, ) await self._event_broker.publish(event) @@ -240,7 +244,7 @@ async def release_schedules( finished_schedule_ids.append(s.id) # Remove schedules that didn't get a new next fire time - await self.remove_schedules(finished_schedule_ids) + await self.remove_schedules(finished_schedule_ids, finished=True) async def get_next_schedule_run_time(self) -> datetime | None: return self._schedules[0].next_fire_time if self._schedules else None diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 3c88a029..34896be3 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -293,7 +293,9 @@ async def remove_schedules(self, ids: Iterable[str]) -> None: for schedule_id, task_id in ids: await self._event_broker.publish( - ScheduleRemoved(schedule_id=schedule_id, task_id=task_id) + ScheduleRemoved( + schedule_id=schedule_id, task_id=task_id, finished=False + ) ) async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: @@ -392,7 +394,11 @@ async def release_schedules( for schedule_id in finished_schedule_ids: await self._event_broker.publish( - ScheduleRemoved(schedule_id=schedule_id, task_id=task_ids[schedule_id]) + ScheduleRemoved( + schedule_id=schedule_id, + task_id=task_ids[schedule_id], + finished=True, + ) ) async def get_next_schedule_run_time(self) -> datetime | None: diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 13c0c174..416a77a2 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -517,7 +517,9 @@ async def remove_schedules(self, ids: Iterable[str]) -> None: for schedule_id, task_id in removed_ids: await self._event_broker.publish( - ScheduleRemoved(schedule_id=schedule_id, task_id=task_id) + ScheduleRemoved( + schedule_id=schedule_id, task_id=task_id, finished=False + ) ) async def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]: @@ -658,7 +660,11 @@ async def release_schedules( for schedule_id in finished_schedule_ids: await self._event_broker.publish( - ScheduleRemoved(schedule_id=schedule_id, task_id=task_ids[schedule_id]) + ScheduleRemoved( + schedule_id=schedule_id, + task_id=task_ids[schedule_id], + finished=True, + ) ) async def get_next_schedule_run_time(self) -> datetime | None: diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 98056d11..c883e469 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -218,6 +218,7 @@ async def test_acquire_release_schedules( received_event = events.pop(0) assert isinstance(received_event, ScheduleRemoved) assert received_event.schedule_id == "s1" + assert received_event.finished received_event = events.pop(0) assert isinstance(received_event, ScheduleUpdated) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 2de5326d..6c8e3c2d 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -226,6 +226,7 @@ async def test_add_remove_schedule(self, raw_datastore: DataStore) -> None: assert isinstance(event, ScheduleRemoved) assert event.schedule_id == "foo" assert event.task_id == f"{__name__}:dummy_async_job" + assert not event.finished async def test_add_job_wait_result(self, raw_datastore: DataStore) -> None: send, receive = create_memory_object_stream[Event](2) @@ -408,6 +409,7 @@ async def test_scheduled_job_missed_deadline( event = await receive.receive() assert isinstance(event, ScheduleRemoved) assert event.schedule_id == "foo" + assert event.finished # The new job was acquired event = await receive.receive() @@ -773,6 +775,7 @@ def test_add_remove_schedule(self) -> None: event = queue.get(timeout=1) assert isinstance(event, ScheduleRemoved) assert event.schedule_id == "foo" + assert not event.finished def test_add_job_wait_result(self) -> None: queue = Queue()