diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index f13819541..687303334 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -41,6 +41,7 @@ APScheduler, see the :doc:`migration section `. limit maximum concurrent job execution - **BREAKING** Changed the ``timezone`` argument to ``CronTrigger.from_crontab()`` into a keyword-only argument +- **BREAKING** Added the ``metadata`` field to tasks, schedules and jobs - Added the ``start_time`` and ``end_time`` arguments to ``CronTrigger.from_crontab()`` (`#676 `_) - Added the ``psycopg`` event broker diff --git a/src/apscheduler/_decorators.py b/src/apscheduler/_decorators.py index 53d37ccd4..4865f1a44 100644 --- a/src/apscheduler/_decorators.py +++ b/src/apscheduler/_decorators.py @@ -5,9 +5,12 @@ from typing import Any, TypeVar import attrs +from attr.validators import instance_of, optional -from ._structures import TaskDefaults +from ._converters import as_timedelta +from ._structures import MetadataType, TaskDefaults from ._utils import UnsetValue, unset +from ._validators import if_not_unset, valid_metadata T = TypeVar("T", bound="Callable[..., Any]") @@ -17,6 +20,20 @@ @attrs.define(kw_only=True) class TaskParameters(TaskDefaults): id: str | UnsetValue = attrs.field(default=unset) + job_executor: str | UnsetValue = attrs.field( + validator=if_not_unset(instance_of(str)), default=unset + ) + max_running_jobs: int | None | UnsetValue = attrs.field( + validator=if_not_unset(optional(instance_of(int))), default=unset + ) + misfire_grace_time: timedelta | None | UnsetValue = attrs.field( + converter=as_timedelta, + validator=if_not_unset(optional(instance_of(timedelta))), + default=unset, + ) + metadata: MetadataType | UnsetValue = attrs.field( + validator=if_not_unset(valid_metadata), default=unset + ) def task( @@ -25,6 +42,7 @@ def task( job_executor: str | UnsetValue = unset, max_running_jobs: int | None | UnsetValue = unset, misfire_grace_time: int | timedelta | None | UnsetValue = unset, + metadata: MetadataType | UnsetValue = unset, ) -> Callable[[T], T]: """ Decorate a function to have implied defaults as an APScheduler task. @@ -36,7 +54,7 @@ def task( :param ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the run time of jobs created for the task are allowed to be late, compared to the scheduled run time - + :param metadata: key-value pairs for storing JSON compatible custom information """ def wrapper(func: T) -> T: @@ -56,6 +74,7 @@ def wrapper(func: T) -> T: job_executor=job_executor, max_running_jobs=max_running_jobs, misfire_grace_time=misfire_grace_time, + metadata=metadata, ), ) return func diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index a368cdf21..43fe30b1c 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -50,8 +50,16 @@ ScheduleLookupError, ) from .._marshalling import callable_from_ref, callable_to_ref -from .._structures import Job, JobResult, Schedule, ScheduleResult, Task, TaskDefaults -from .._utils import UnsetValue, unset +from .._structures import ( + Job, + JobResult, + MetadataType, + Schedule, + ScheduleResult, + Task, + TaskDefaults, +) +from .._utils import UnsetValue, merge_metadata, unset from .._validators import non_negative_number from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger from ..datastores.memory import MemoryDataStore @@ -305,6 +313,7 @@ async def configure_task( job_executor: str | UnsetValue = unset, misfire_grace_time: float | timedelta | None | UnsetValue = unset, max_running_jobs: int | None | UnsetValue = unset, + metadata: MetadataType | UnsetValue = unset, ) -> Task: """ Add or update a :ref:`task ` definition. @@ -329,6 +338,7 @@ async def configure_task( run time is allowed to be late, compared to the scheduled run time :param max_running_jobs: maximum number of instances of the task that are allowed to run concurrently + :param metadata: key-value pairs for storing JSON compatible custom information :raises TypeError: if ``func_or_task_id`` is neither a task, task ID or a callable :return: the created or updated task definition @@ -348,6 +358,7 @@ async def configure_task( job_executor=func_or_task_id.job_executor, max_running_jobs=func_or_task_id.max_running_jobs, misfire_grace_time=func_or_task_id.misfire_grace_time, + metadata=func_or_task_id.metadata, ) elif isinstance(func_or_task_id, str) and func_or_task_id: task_params = get_task_params(func) if callable(func) else TaskParameters() @@ -373,6 +384,12 @@ async def configure_task( if task_params.misfire_grace_time is unset: task_params.misfire_grace_time = self.task_defaults.misfire_grace_time + # Merge the metadata from the defaults, task definition and explicitly passed + # metadata + task_params.metadata = merge_metadata( + self.task_defaults.metadata, task_params.metadata, metadata + ) + assert task_params.id if callable(func): self._task_callables[task_params.id] = func @@ -391,6 +408,7 @@ async def configure_task( job_executor=task_params.job_executor, max_running_jobs=task_params.max_running_jobs, misfire_grace_time=task_params.misfire_grace_time, + metadata=task_params.metadata, ) modified = True else: @@ -411,6 +429,10 @@ async def configure_task( changes["misfire_grace_time"] = task_params.misfire_grace_time modified = True + if task_params.metadata != task.metadata: + changes["metadata"] = task_params.metadata + modified = True + task = attrs.evolve(task, **changes) if modified: @@ -440,6 +462,7 @@ async def add_schedule( coalesce: CoalescePolicy = CoalescePolicy.latest, job_executor: str | UnsetValue = unset, misfire_grace_time: float | timedelta | None | UnsetValue = unset, + metadata: MetadataType | UnsetValue = unset, max_jitter: float | timedelta | None = None, job_result_expiration_time: float | timedelta = 0, conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing, @@ -461,6 +484,7 @@ async def add_schedule( fire times have become due for this schedule since the last processing :param misfire_grace_time: maximum number of seconds the scheduled job's actual run time is allowed to be late, compared to the scheduled run time + :param metadata: key-value pairs for storing JSON compatible custom information :param max_jitter: maximum time (in seconds, or as a timedelta) to randomly add to the scheduled time for each job created from this schedule :param job_result_expiration_time: minimum time (in seconds, or as a timedelta) @@ -507,6 +531,9 @@ async def add_schedule( misfire_grace_time=task.misfire_grace_time if misfire_grace_time is unset else misfire_grace_time, + metadata=task.metadata.copy() + if metadata is unset + else merge_metadata(task.metadata, metadata), max_jitter=max_jitter, job_executor=task.job_executor if job_executor is unset else job_executor, job_result_expiration_time=job_result_expiration_time, @@ -615,6 +642,7 @@ async def add_job( args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, job_executor: str | UnsetValue = unset, + metadata: MetadataType | UnsetValue = unset, result_expiration_time: timedelta | float = 0, ) -> UUID: """ @@ -628,6 +656,7 @@ async def add_job( :param kwargs: keyword arguments to call the target callable with :param job_executor: name of the job executor to run the task with (overrides the executor in the task definition, if any) + :param metadata: key-value pairs for storing JSON compatible custom information :param result_expiration_time: the minimum time (as seconds, or timedelta) to keep the result of the job available for fetching (the result won't be saved at all if that time is 0) @@ -665,6 +694,7 @@ async def add_job( kwargs=kwargs or {}, executor=task.job_executor if job_executor is unset else job_executor, result_expiration_time=result_expiration_time, + metadata=merge_metadata(task.metadata, metadata), ) await self.data_store.add_job(job) return job.id @@ -717,6 +747,7 @@ async def run_job( args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, job_executor: str | UnsetValue = unset, + metadata: MetadataType | UnsetValue = unset, ) -> Any: """ Convenience method to add a job and then return its result. @@ -729,6 +760,7 @@ async def run_job( :param kwargs: keyword arguments to be passed to the task function :param job_executor: name of the job executor to run the task with (overrides the executor in the task definition, if any) + :param metadata: key-value pairs for storing JSON compatible custom information :returns: the return value of the task function """ @@ -746,6 +778,7 @@ def listener(event: JobReleased) -> None: args=args, kwargs=kwargs, job_executor=job_executor, + metadata=metadata, result_expiration_time=timedelta(minutes=15), ) await job_complete_event.wait() @@ -982,6 +1015,7 @@ async def extend_schedule_leases(schedules: Sequence[Schedule]) -> None: start_deadline=start_deadline, executor=schedule.job_executor, result_expiration_time=schedule.job_result_expiration_time, + metadata=schedule.metadata.copy(), ) await self.data_store.add_job(job) diff --git a/src/apscheduler/_schedulers/sync.py b/src/apscheduler/_schedulers/sync.py index 06bd896eb..6c75fe3b7 100644 --- a/src/apscheduler/_schedulers/sync.py +++ b/src/apscheduler/_schedulers/sync.py @@ -18,7 +18,7 @@ from .. import current_scheduler from .._enums import CoalescePolicy, ConflictPolicy, RunState, SchedulerRole from .._events import Event, T_Event -from .._structures import Job, JobResult, Schedule, Task, TaskDefaults +from .._structures import Job, JobResult, MetadataType, Schedule, Task, TaskDefaults from .._utils import UnsetValue, unset from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger from .async_ import AsyncScheduler, TaskType @@ -236,6 +236,7 @@ def configure_task( job_executor: str | UnsetValue = unset, misfire_grace_time: float | timedelta | None | UnsetValue = unset, max_running_jobs: int | None | UnsetValue = unset, + metadata: MetadataType | UnsetValue = unset, ) -> Task: portal = self._ensure_services_ready() return portal.call( @@ -246,6 +247,7 @@ def configure_task( job_executor=job_executor, misfire_grace_time=misfire_grace_time, max_running_jobs=max_running_jobs, + metadata=metadata, ) ) @@ -265,6 +267,7 @@ def add_schedule( coalesce: CoalescePolicy = CoalescePolicy.latest, job_executor: str | UnsetValue = unset, misfire_grace_time: float | timedelta | None | UnsetValue = unset, + metadata: MetadataType | UnsetValue = unset, max_jitter: float | timedelta | None = None, job_result_expiration_time: float | timedelta = 0, conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing, @@ -284,6 +287,7 @@ def add_schedule( misfire_grace_time=misfire_grace_time, max_jitter=max_jitter, job_result_expiration_time=job_result_expiration_time, + metadata=metadata, conflict_policy=conflict_policy, ) ) @@ -326,6 +330,7 @@ def add_job( args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, job_executor: str | UnsetValue = unset, + metadata: MetadataType | UnsetValue = unset, result_expiration_time: timedelta | float = 0, ) -> UUID: portal = self._ensure_services_ready() @@ -336,6 +341,7 @@ def add_job( args=args, kwargs=kwargs, job_executor=job_executor, + metadata=metadata, result_expiration_time=result_expiration_time, ) ) @@ -357,6 +363,7 @@ def run_job( args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, job_executor: str | UnsetValue = unset, + metadata: MetadataType | UnsetValue = unset, ) -> Any: portal = self._ensure_services_ready() return portal.call( @@ -366,6 +373,7 @@ def run_job( args=args, kwargs=kwargs, job_executor=job_executor, + metadata=metadata, ) ) diff --git a/src/apscheduler/_structures.py b/src/apscheduler/_structures.py index 47928465b..5dc5eeb25 100644 --- a/src/apscheduler/_structures.py +++ b/src/apscheduler/_structures.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys from datetime import datetime, timedelta, timezone from functools import partial from typing import Any @@ -12,8 +13,18 @@ from ._converters import as_aware_datetime, as_enum, as_timedelta from ._enums import CoalescePolicy, JobOutcome from ._utils import UnsetValue, unset +from ._validators import if_not_unset, valid_metadata from .abc import Serializer, Trigger +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +MetadataType: TypeAlias = ( + "dict[str, str | int | bool | None | list[MetadataType] | dict[str, MetadataType]]" +) + def serialize(inst: Any, field: attrs.Attribute, value: Any) -> Any: if isinstance(value, frozenset): @@ -36,6 +47,7 @@ class Task: :var ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the run time of jobs created for this task are allowed to be late, compared to the scheduled run time + :var metadata: key-value pairs for storing JSON compatible custom information """ id: str = attrs.field(validator=[instance_of(str), min_len(1)], on_setattr=frozen) @@ -55,6 +67,7 @@ class Task: validator=optional(instance_of(timedelta)), on_setattr=frozen, ) + metadata: MetadataType = attrs.field(validator=valid_metadata, factory=dict) running_jobs: int = attrs.field(default=0) def marshal(self, serializer: Serializer) -> dict[str, Any]: @@ -92,19 +105,21 @@ class TaskDefaults: :param ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the run time of jobs created for this task are allowed to be late, compared to the scheduled run time + :var metadata: key-value pairs for storing JSON compatible custom information """ job_executor: str | UnsetValue = attrs.field( - validator=instance_of((str, UnsetValue)), default=unset + validator=if_not_unset(instance_of(str)), default=unset ) max_running_jobs: int | None | UnsetValue = attrs.field( - validator=optional(instance_of((int, UnsetValue))), default=unset + validator=optional(instance_of(int)), default=1 ) - misfire_grace_time: timedelta | None | UnsetValue = attrs.field( + misfire_grace_time: timedelta | None = attrs.field( converter=as_timedelta, - validator=optional(instance_of((timedelta, UnsetValue))), - default=unset, + validator=optional(instance_of(timedelta)), + default=None, ) + metadata: MetadataType = attrs.field(validator=valid_metadata, factory=dict) @attrs.define(kw_only=True, order=False) @@ -127,6 +142,7 @@ class Schedule: add to the scheduled time for each job created from this schedule :var ~datetime.timedelta job_result_expiration_time: minimum time to keep the job results in storage from the jobs created by this schedule + :var metadata: key-value pairs for storing JSON compatible custom information :var ~datetime.datetime next_fire_time: the next time the task will be run :var ~datetime.datetime | None last_fire_time: the last time the task was scheduled to run @@ -172,6 +188,7 @@ class Schedule: validator=optional(instance_of(timedelta)), on_setattr=frozen, ) + metadata: MetadataType = attrs.field(validator=valid_metadata, factory=dict) next_fire_time: datetime | None = attrs.field( converter=as_aware_datetime, default=None, @@ -266,6 +283,7 @@ class Job: scheduler after this time, it is considered to be misfired and will be aborted :var ~datetime.timedelta result_expiration_time: minimum amount of time to keep the result available for fetching in the data store + :var metadata: key-value pairs for storing JSON compatible custom information :var ~datetime.datetime created_at: the time at which the job was created :var str | None acquired_by: the unique identifier of the scheduler that has acquired the job for execution @@ -295,6 +313,7 @@ class Job: result_expiration_time: timedelta = attrs.field( converter=as_timedelta, default=timedelta(), repr=False, on_setattr=frozen ) + metadata: MetadataType = attrs.field(validator=valid_metadata, factory=dict) created_at: datetime = attrs.field( converter=as_aware_datetime, factory=partial(datetime.now, timezone.utc), diff --git a/src/apscheduler/_utils.py b/src/apscheduler/_utils.py index ab3335f0a..467b8f93f 100644 --- a/src/apscheduler/_utils.py +++ b/src/apscheduler/_utils.py @@ -4,7 +4,7 @@ import sys from datetime import datetime, tzinfo -from typing import Any, NoReturn, TypeVar +from typing import TYPE_CHECKING, Any, NoReturn, TypeVar from ._exceptions import DeserializationError from .abc import Trigger @@ -14,6 +14,9 @@ else: from backports.zoneinfo import ZoneInfo +if TYPE_CHECKING: + from ._structures import MetadataType + T = TypeVar("T") @@ -73,3 +76,16 @@ def require_state_version( raise DeserializationError( 'Missing "version" key in the serialized state' ) from exc + + +def merge_metadata( + base_metadata: MetadataType, *overlays: MetadataType | UnsetValue +) -> MetadataType: + new_metadata = base_metadata.copy() + for metadata in overlays: + if isinstance(metadata, UnsetValue): + continue + + new_metadata.update(metadata) + + return new_metadata diff --git a/src/apscheduler/_validators.py b/src/apscheduler/_validators.py index cf0ea6b9c..ce709ba9e 100644 --- a/src/apscheduler/_validators.py +++ b/src/apscheduler/_validators.py @@ -1,9 +1,12 @@ from __future__ import annotations +from collections.abc import Callable from typing import Any from attrs import Attribute +from apscheduler._utils import unset + def positive_number(instance: Any, attribute: Attribute, value: Any) -> None: if value <= 0: @@ -18,3 +21,37 @@ def non_negative_number(instance: Any, attribute: Attribute, value: Any) -> None def aware_datetime(instance: Any, attribute: Attribute, value: Any) -> None: if not value.tzinfo: raise ValueError(f"{attribute.name} must be a timezone aware datetime") + + +def if_not_unset(validator: Callable[[Any, Any, Any], None]) -> None: + def validate(instance: Any, attribute: Any, value: Any) -> None: + if value is unset: + return + + validator(instance, attribute, value) + + +def valid_metadata(instance: Any, attribute: Attribute, value: Any) -> None: + def check_value(path: str, val: object) -> None: + if value is None: + return + + if isinstance(val, list): + for index, item in enumerate(val): + check_value(f"{path}[{index}]", item) + elif isinstance(val, dict): + for k, v in val.items(): + if not isinstance(k, str): + raise ValueError(f"{path} has a non-string key ({key!r})") + + check_value(f"{path}[{k!r}]", v) + elif not isinstance(val, (str, int, float, bool)): + raise ValueError( + f"{path} has a value that is not JSON compatible: ({val!r})" + ) + + if not isinstance(value, dict): + raise ValueError(f"{attribute.name} must be a dict, got: {value!r}") + + for key, value in value.items(): + check_value(key, value) diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index f9f775120..40b8f4304 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -16,6 +16,7 @@ from anyio import CancelScope, to_thread from attr.validators import instance_of from sqlalchemy import ( + JSON, BigInteger, Boolean, Column, @@ -292,6 +293,13 @@ def get_table_definitions(self) -> MetaData: else: interval_type = EmulatedInterval() + if self._engine.dialect.name == "postgresql": + from sqlalchemy.dialects.postgresql import JSONB + + json_type = JSONB + else: + json_type = JSON + metadata = MetaData(schema=self.schema) Table("metadata", metadata, Column("schema_version", Integer, nullable=False)) Table( @@ -302,6 +310,7 @@ def get_table_definitions(self) -> MetaData: Column("job_executor", Unicode(500), nullable=False), Column("max_running_jobs", Integer), Column("misfire_grace_time", interval_type), + Column("metadata", json_type, nullable=False), Column("running_jobs", Integer, nullable=False, server_default=literal(0)), ) Table( @@ -318,6 +327,7 @@ def get_table_definitions(self) -> MetaData: Column("max_jitter", interval_type), Column("job_executor", Unicode(500), nullable=False), Column("job_result_expiration_time", interval_type), + Column("metadata", json_type, nullable=False), *next_fire_time_tzoffset_columns, Column("last_fire_time", timestamp_type), Column("acquired_by", Unicode(500), index=True), @@ -336,6 +346,7 @@ def get_table_definitions(self) -> MetaData: Column("jitter", interval_type), Column("start_deadline", timestamp_type), Column("result_expiration_time", interval_type), + Column("metadata", json_type, nullable=False), Column("created_at", timestamp_type, nullable=False, index=True), Column("acquired_by", Unicode(500), index=True), Column("acquired_until", timestamp_type), @@ -445,6 +456,7 @@ async def add_task(self, task: Task) -> None: job_executor=task.job_executor, max_running_jobs=task.max_running_jobs, misfire_grace_time=task.misfire_grace_time, + metadata=task.metadata, ) try: async for attempt in self._retry(): @@ -459,6 +471,7 @@ async def add_task(self, task: Task) -> None: job_executor=task.job_executor, max_running_jobs=task.max_running_jobs, misfire_grace_time=task.misfire_grace_time, + metadata=task.metadata, ) .where(self._t_tasks.c.id == task.id) ) @@ -1054,7 +1067,7 @@ async def get_job_result(self, job_id: UUID) -> JobResult | None: delete = self._t_job_results.delete().where( self._t_job_results.c.job_id == job_id ) - result = await self._execute(conn, delete) + await self._execute(conn, delete) return JobResult.unmarshal(self.serializer, row._asdict()) if row else None diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index d2f66cf85..36f730755 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -217,20 +217,25 @@ async def test_configure_task_with_decorator(self) -> None: async def test_configure_local_task_with_decorator(self) -> None: @task( + id="taskfunc", job_executor="threadpool", max_running_jobs=3, misfire_grace_time=timedelta(seconds=6), + metadata={"local": 6}, ) def taskfunc() -> None: pass - async with AsyncScheduler() as scheduler: - await scheduler.configure_task("taskfunc", func=taskfunc) + task_defaults = TaskDefaults(metadata={"global": "foo"}) + async with AsyncScheduler(task_defaults=task_defaults) as scheduler: + await scheduler.configure_task(taskfunc, metadata={"direct": [1, 9]}) tasks = await scheduler.get_tasks() assert len(tasks) == 1 + assert tasks[0].id == "taskfunc" assert tasks[0].max_running_jobs == 3 assert tasks[0].misfire_grace_time == timedelta(seconds=6) assert tasks[0].job_executor == "threadpool" + assert tasks[0].metadata == {"global": "foo", "local": 6, "direct": [1, 9]} async def test_add_pause_unpause_remove_schedule( self, raw_datastore: DataStore, timezone: ZoneInfo