Skip to content

Commit

Permalink
Added the finished attribute to ScheduleRemoved
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Nov 11, 2023
1 parent 28fa5ba commit a67a82c
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 7 deletions.
1 change: 1 addition & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ APScheduler, see the :doc:`migration section <migration>`.
``JobAcquired`` and ``JobReleased`` events
- **BREAKING** Added the ``task_id`` attribute to the ``ScheduleAdded``,
``ScheduleUpdated`` and ``ScheduleRemoved`` events
- **BREAKING** Added the ``finished`` attribute to the ``ScheduleRemoved`` event
- Added the ``configure_task()`` and ``get_tasks()`` scheduler methods
- Fixed out of order delivery of events delivered using worker threads
- Fixed schedule processing not setting job start deadlines correctly
Expand Down
3 changes: 3 additions & 0 deletions src/apscheduler/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,13 @@ class ScheduleRemoved(DataStoreEvent):
:ivar schedule_id: ID of the schedule that was removed
:ivar task_id: ID of the task the schedule belongs to
:ivar finished: ``True`` if the schedule was removed automatically because its
trigger had no more fire times left
"""

schedule_id: str
task_id: str
finished: bool


@attrs.define(kw_only=True, frozen=True)
Expand Down
10 changes: 7 additions & 3 deletions src/apscheduler/datastores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,17 @@ async def add_schedule(

await self._event_broker.publish(event)

async def remove_schedules(self, ids: Iterable[str]) -> None:
async def remove_schedules(
self, ids: Iterable[str], *, finished: bool = False
) -> None:
for schedule_id in ids:
state = self._schedules_by_id.pop(schedule_id, None)
if state:
self._schedules.remove(state)
event = ScheduleRemoved(
schedule_id=state.schedule.id, task_id=state.schedule.task_id
schedule_id=state.schedule.id,
task_id=state.schedule.task_id,
finished=finished,
)
await self._event_broker.publish(event)

Expand Down Expand Up @@ -240,7 +244,7 @@ async def release_schedules(
finished_schedule_ids.append(s.id)

# Remove schedules that didn't get a new next fire time
await self.remove_schedules(finished_schedule_ids)
await self.remove_schedules(finished_schedule_ids, finished=True)

async def get_next_schedule_run_time(self) -> datetime | None:
return self._schedules[0].next_fire_time if self._schedules else None
Expand Down
10 changes: 8 additions & 2 deletions src/apscheduler/datastores/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ async def remove_schedules(self, ids: Iterable[str]) -> None:

for schedule_id, task_id in ids:
await self._event_broker.publish(
ScheduleRemoved(schedule_id=schedule_id, task_id=task_id)
ScheduleRemoved(
schedule_id=schedule_id, task_id=task_id, finished=False
)
)

async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
Expand Down Expand Up @@ -392,7 +394,11 @@ async def release_schedules(

for schedule_id in finished_schedule_ids:
await self._event_broker.publish(
ScheduleRemoved(schedule_id=schedule_id, task_id=task_ids[schedule_id])
ScheduleRemoved(
schedule_id=schedule_id,
task_id=task_ids[schedule_id],
finished=True,
)
)

async def get_next_schedule_run_time(self) -> datetime | None:
Expand Down
10 changes: 8 additions & 2 deletions src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,9 @@ async def remove_schedules(self, ids: Iterable[str]) -> None:

for schedule_id, task_id in removed_ids:
await self._event_broker.publish(
ScheduleRemoved(schedule_id=schedule_id, task_id=task_id)
ScheduleRemoved(
schedule_id=schedule_id, task_id=task_id, finished=False
)
)

async def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]:
Expand Down Expand Up @@ -658,7 +660,11 @@ async def release_schedules(

for schedule_id in finished_schedule_ids:
await self._event_broker.publish(
ScheduleRemoved(schedule_id=schedule_id, task_id=task_ids[schedule_id])
ScheduleRemoved(
schedule_id=schedule_id,
task_id=task_ids[schedule_id],
finished=True,
)
)

async def get_next_schedule_run_time(self) -> datetime | None:
Expand Down
1 change: 1 addition & 0 deletions tests/test_datastores.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ async def test_acquire_release_schedules(
received_event = events.pop(0)
assert isinstance(received_event, ScheduleRemoved)
assert received_event.schedule_id == "s1"
assert received_event.finished

received_event = events.pop(0)
assert isinstance(received_event, ScheduleUpdated)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ async def test_add_remove_schedule(self, raw_datastore: DataStore) -> None:
assert isinstance(event, ScheduleRemoved)
assert event.schedule_id == "foo"
assert event.task_id == f"{__name__}:dummy_async_job"
assert not event.finished

async def test_add_job_wait_result(self, raw_datastore: DataStore) -> None:
send, receive = create_memory_object_stream[Event](2)
Expand Down Expand Up @@ -408,6 +409,7 @@ async def test_scheduled_job_missed_deadline(
event = await receive.receive()
assert isinstance(event, ScheduleRemoved)
assert event.schedule_id == "foo"
assert event.finished

# The new job was acquired
event = await receive.receive()
Expand Down Expand Up @@ -773,6 +775,7 @@ def test_add_remove_schedule(self) -> None:
event = queue.get(timeout=1)
assert isinstance(event, ScheduleRemoved)
assert event.schedule_id == "foo"
assert not event.finished

def test_add_job_wait_result(self) -> None:
queue = Queue()
Expand Down

0 comments on commit a67a82c

Please sign in to comment.