Skip to content

Commit

Permalink
Added the task_id attribute to schedule events
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Nov 11, 2023
1 parent a90d70d commit 28fa5ba
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 38 deletions.
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ APScheduler, see the :doc:`migration section <migration>`.
- **BREAKING** Worked around datetime microsecond precision issue on MongoDB
- **BREAKING** Renamed the ``worker_id`` field to ``scheduler_id`` in the
``JobAcquired`` and ``JobReleased`` events
- **BREAKING** Added the ``task_id`` attribute to the ``ScheduleAdded``,
``ScheduleUpdated`` and ``ScheduleRemoved`` events
- Added the ``configure_task()`` and ``get_tasks()`` scheduler methods
- Fixed out of order delivery of events delivered using worker threads
- Fixed schedule processing not setting job start deadlines correctly
Expand Down
8 changes: 8 additions & 0 deletions src/apscheduler/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ class ScheduleAdded(DataStoreEvent):
Signals that a new schedule was added to the store.
:ivar schedule_id: ID of the schedule that was added
:ivar task_id: ID of the task the schedule belongs to
:ivar next_fire_time: the first run time calculated for the schedule
"""

schedule_id: str
task_id: str
next_fire_time: datetime | None = attrs.field(converter=optional(as_aware_datetime))


Expand All @@ -107,10 +109,12 @@ class ScheduleUpdated(DataStoreEvent):
Signals that a schedule has been updated in the store.
:ivar schedule_id: ID of the schedule that was updated
:ivar task_id: ID of the task the schedule belongs to
:ivar next_fire_time: the next time the schedule will run
"""

schedule_id: str
task_id: str
next_fire_time: datetime | None = attrs.field(converter=optional(as_aware_datetime))


Expand All @@ -120,9 +124,11 @@ class ScheduleRemoved(DataStoreEvent):
Signals that a schedule was removed from the store.
:ivar schedule_id: ID of the schedule that was removed
:ivar task_id: ID of the task the schedule belongs to
"""

schedule_id: str
task_id: str


@attrs.define(kw_only=True, frozen=True)
Expand All @@ -146,10 +152,12 @@ class JobRemoved(DataStoreEvent):
Signals that a job was removed from the store.
:ivar job_id: ID of the job that was removed
:ivar task_id: ID of the task the job would have run
"""

job_id: UUID = attrs.field(converter=as_uuid)
task_id: str


@attrs.define(kw_only=True, frozen=True)
Expand Down
14 changes: 10 additions & 4 deletions src/apscheduler/datastores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,15 @@ async def add_schedule(

if old_state is not None:
event = ScheduleUpdated(
schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
schedule_id=schedule.id,
task_id=schedule.task_id,
next_fire_time=schedule.next_fire_time,
)
else:
event = ScheduleAdded(
schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
schedule_id=schedule.id,
task_id=schedule.task_id,
next_fire_time=schedule.next_fire_time,
)

await self._event_broker.publish(event)
Expand All @@ -185,7 +189,9 @@ async def remove_schedules(self, ids: Iterable[str]) -> None:
state = self._schedules_by_id.pop(schedule_id, None)
if state:
self._schedules.remove(state)
event = ScheduleRemoved(schedule_id=state.schedule.id)
event = ScheduleRemoved(
schedule_id=state.schedule.id, task_id=state.schedule.task_id
)
await self._event_broker.publish(event)

async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
Expand Down Expand Up @@ -227,7 +233,7 @@ async def release_schedules(
schedule_state.acquired_until = None
insort_right(self._schedules, schedule_state)
event = ScheduleUpdated(
schedule_id=s.id, next_fire_time=s.next_fire_time
schedule_id=s.id, task_id=s.task_id, next_fire_time=s.next_fire_time
)
await self._event_broker.publish(event)
else:
Expand Down
27 changes: 19 additions & 8 deletions src/apscheduler/datastores/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,16 @@ async def add_schedule(
)

event = ScheduleUpdated(
schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
schedule_id=schedule.id,
task_id=schedule.task_id,
next_fire_time=schedule.next_fire_time,
)
await self._event_broker.publish(event)
else:
event = ScheduleAdded(
schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
schedule_id=schedule.id,
task_id=schedule.task_id,
next_fire_time=schedule.next_fire_time,
)
await self._event_broker.publish(event)

Expand All @@ -281,14 +285,16 @@ async def remove_schedules(self, ids: Iterable[str]) -> None:
async for attempt in self._retry():
with attempt, self.client.start_session() as session:
cursor = self._schedules.find(
filters, projection=["_id"], session=session
filters, projection=["_id", "task_id"], session=session
)
ids = [doc["_id"] for doc in cursor]
ids = [(doc["_id"], doc["task_id"]) for doc in cursor]
if ids:
self._schedules.delete_many(filters, session=session)

for schedule_id in ids:
await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id))
for schedule_id, task_id in ids:
await self._event_broker.publish(
ScheduleRemoved(schedule_id=schedule_id, task_id=task_id)
)

async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
async for attempt in self._retry():
Expand Down Expand Up @@ -334,6 +340,7 @@ async def release_schedules(
) -> None:
updated_schedules: list[tuple[str, datetime]] = []
finished_schedule_ids: list[str] = []
task_ids = {schedule.id: schedule.task_id for schedule in schedules}

# Update schedules that have a next fire time
requests = []
Expand Down Expand Up @@ -377,12 +384,16 @@ async def release_schedules(

for schedule_id, next_fire_time in updated_schedules:
event = ScheduleUpdated(
schedule_id=schedule_id, next_fire_time=next_fire_time
schedule_id=schedule_id,
task_id=task_ids[schedule_id],
next_fire_time=next_fire_time,
)
await self._event_broker.publish(event)

for schedule_id in finished_schedule_ids:
await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id))
await self._event_broker.publish(
ScheduleRemoved(schedule_id=schedule_id, task_id=task_ids[schedule_id])
)

async def get_next_schedule_run_time(self) -> datetime | None:
async for attempt in self._retry():
Expand Down
53 changes: 36 additions & 17 deletions src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
TIMESTAMP,
BigInteger,
Column,
Delete,
Enum,
Integer,
Interval,
Expand All @@ -44,7 +43,6 @@
from sqlalchemy.future import Connection, Engine
from sqlalchemy.sql import Executable
from sqlalchemy.sql.ddl import DropTable
from sqlalchemy.sql.dml import ReturningDelete
from sqlalchemy.sql.elements import BindParameter, literal
from sqlalchemy.sql.type_api import TypeEngine

Expand Down Expand Up @@ -473,37 +471,54 @@ async def add_schedule(
await self._execute(conn, update)

event = ScheduleUpdated(
schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
schedule_id=schedule.id,
task_id=schedule.task_id,
next_fire_time=schedule.next_fire_time,
)
await self._event_broker.publish(event)
else:
event = ScheduleAdded(
schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
schedule_id=schedule.id,
task_id=schedule.task_id,
next_fire_time=schedule.next_fire_time,
)
await self._event_broker.publish(event)

async def remove_schedules(self, ids: Iterable[str]) -> None:
async for attempt in self._retry():
with attempt:
async with self._begin_transaction() as conn:
delete: Delete | ReturningDelete[
Any
] = self._t_schedules.delete().where(
self._t_schedules.c.id.in_(ids)
)
if self._supports_update_returning:
delete_returning = delete.returning(self._t_schedules.c.id)
removed_ids: Iterable[str] = [
row[0]
delete_returning = (
self._t_schedules.delete()
.where(self._t_schedules.c.id.in_(ids))
.returning(
self._t_schedules.c.id, self._t_schedules.c.task_id
)
)
removed_ids: list[tuple[str, str]] = [
(row[0], row[1])
for row in await self._execute(conn, delete_returning)
]
else:
# TODO: actually check which rows were deleted?
query = select(
self._t_schedules.c.id, self._t_schedules.c.task_id
).where(self._t_schedules.c.id.in_(ids))
ids_to_remove: list[str] = []
removed_ids = []
for schedule_id, task_id in await self._execute(conn, query):
ids_to_remove.append(schedule_id)
removed_ids.append((schedule_id, task_id))

delete = self._t_schedules.delete().where(
self._t_schedules.c.id.in_(ids_to_remove)
)
await self._execute(conn, delete)
removed_ids = ids

for schedule_id in removed_ids:
await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id))
for schedule_id, task_id in removed_ids:
await self._event_broker.publish(
ScheduleRemoved(schedule_id=schedule_id, task_id=task_id)
)

async def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]:
query = self._t_schedules.select().order_by(self._t_schedules.c.id)
Expand Down Expand Up @@ -563,6 +578,7 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
async def release_schedules(
self, scheduler_id: str, schedules: list[Schedule]
) -> None:
task_ids = {schedule.id: schedule.task_id for schedule in schedules}
async for attempt in self._retry():
with attempt:
async with self._begin_transaction() as conn:
Expand Down Expand Up @@ -624,6 +640,7 @@ async def release_schedules(
for schedule_id in updated_ids:
event = ScheduleUpdated(
schedule_id=schedule_id,
task_id=task_ids[schedule_id],
next_fire_time=next_fire_times[schedule_id],
)
update_events.append(event)
Expand All @@ -640,7 +657,9 @@ async def release_schedules(
await self._event_broker.publish(event)

for schedule_id in finished_schedule_ids:
await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id))
await self._event_broker.publish(
ScheduleRemoved(schedule_id=schedule_id, task_id=task_ids[schedule_id])
)

async def get_next_schedule_run_time(self) -> datetime | None:
statenent = (
Expand Down
2 changes: 2 additions & 0 deletions tests/test_datastores.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ async def test_add_schedules(datastore: DataStore, schedules: list[Schedule]) ->

for event, schedule in zip(events, schedules):
assert event.schedule_id == schedule.id
assert event.task_id == schedule.task_id
assert event.next_fire_time == schedule.next_fire_time


Expand Down Expand Up @@ -151,6 +152,7 @@ async def test_replace_schedules(

received_event = events.pop(0)
assert received_event.schedule_id == "s3"
assert received_event.task_id == "foo"
assert received_event.next_fire_time == datetime(2020, 9, 16, tzinfo=timezone.utc)
assert not events

Expand Down
31 changes: 22 additions & 9 deletions tests/test_eventbrokers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import sys
from contextlib import AsyncExitStack
from datetime import datetime, timezone

Expand All @@ -10,6 +11,11 @@
from apscheduler import Event, ScheduleAdded
from apscheduler.abc import EventBroker

if sys.version_info >= (3, 11):
from datetime import UTC
else:
UTC = timezone.utc

pytestmark = pytest.mark.anyio


Expand All @@ -19,7 +25,8 @@ async def test_publish_subscribe(event_broker: EventBroker) -> None:
event_broker.subscribe(send.send_nowait)
event = ScheduleAdded(
schedule_id="schedule1",
next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
task_id="task1",
next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, UTC),
)
await event_broker.publish(event)

Expand All @@ -31,22 +38,23 @@ async def test_publish_subscribe(event_broker: EventBroker) -> None:
assert isinstance(event1, ScheduleAdded)
assert isinstance(event1.timestamp, datetime)
assert event1.schedule_id == "schedule1"
assert event1.next_fire_time == datetime(
2021, 9, 11, 12, 31, 56, 254867, timezone.utc
)
assert event1.task_id == "task1"
assert event1.next_fire_time == datetime(2021, 9, 11, 12, 31, 56, 254867, UTC)


async def test_subscribe_one_shot(event_broker: EventBroker) -> None:
send, receive = create_memory_object_stream(2)
event_broker.subscribe(send.send, one_shot=True)
event = ScheduleAdded(
schedule_id="schedule1",
next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, timezone.utc),
task_id="task1",
next_fire_time=datetime(2021, 9, 11, 12, 31, 56, 254867, UTC),
)
await event_broker.publish(event)
event = ScheduleAdded(
schedule_id="schedule2",
next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, timezone.utc),
task_id="task1",
next_fire_time=datetime(2021, 9, 12, 8, 42, 11, 968481, UTC),
)
await event_broker.publish(event)

Expand All @@ -58,6 +66,7 @@ async def test_subscribe_one_shot(event_broker: EventBroker) -> None:

assert isinstance(received_event, ScheduleAdded)
assert received_event.schedule_id == "schedule1"
assert received_event.task_id == "task1"


async def test_unsubscribe(event_broker: EventBroker) -> None:
Expand All @@ -73,16 +82,20 @@ async def test_unsubscribe(event_broker: EventBroker) -> None:
await receive.receive()


async def test_publish_no_subscribers(event_broker, caplog: LogCaptureFixture) -> None:
async def test_publish_no_subscribers(
event_broker: EventBroker, caplog: LogCaptureFixture
) -> None:
await event_broker.publish(Event())
assert not caplog.text


async def test_publish_exception(event_broker, caplog: LogCaptureFixture) -> None:
async def test_publish_exception(
event_broker: EventBroker, caplog: LogCaptureFixture
) -> None:
def bad_subscriber(event: Event) -> None:
raise Exception("foo")

timestamp = datetime.now(timezone.utc)
timestamp = datetime.now(UTC)
send, receive = create_memory_object_stream()
event_broker.subscribe(bad_subscriber)
event_broker.subscribe(send.send)
Expand Down
2 changes: 2 additions & 0 deletions tests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,13 @@ async def test_add_remove_schedule(self, raw_datastore: DataStore) -> None:
event = await receive.receive()
assert isinstance(event, ScheduleAdded)
assert event.schedule_id == "foo"
assert event.task_id == f"{__name__}:dummy_async_job"
assert event.next_fire_time == now

event = await receive.receive()
assert isinstance(event, ScheduleRemoved)
assert event.schedule_id == "foo"
assert event.task_id == f"{__name__}:dummy_async_job"

async def test_add_job_wait_result(self, raw_datastore: DataStore) -> None:
send, receive = create_memory_object_stream[Event](2)
Expand Down

0 comments on commit 28fa5ba

Please sign in to comment.