Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Implements a task scheduler for resumable potentially long running tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Velten authored and MatMaul committed Jul 11, 2023
1 parent 8a529e4 commit 470f385
Show file tree
Hide file tree
Showing 9 changed files with 624 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/15891.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements a task scheduler for resumable potentially long running tasks.
2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
Expand Down Expand Up @@ -144,6 +145,7 @@ class GenericWorkerStore(
TransactionWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.
Expand Down
6 changes: 6 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
from synapse.util.macaroons import MacaroonGenerator
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import random_string
from synapse.util.task_scheduler import TaskScheduler

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -359,6 +360,7 @@ def setup_background_tasks(self) -> None:
"""
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
getattr(self, "get_" + i + "_handler")()
self.get_task_scheduler()

def get_reactor(self) -> ISynapseReactor:
"""
Expand Down Expand Up @@ -912,3 +914,7 @@ def get_request_ratelimiter(self) -> RequestRatelimiter:
def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
"""Usage metrics shared between phone home stats and the prometheus exporter."""
return CommonUsageMetricsManager(self)

@cache_in_self
def get_task_scheduler(self) -> TaskScheduler:
return TaskScheduler(self)
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from .stats import StatsStore
from .stream import StreamWorkerStore
from .tags import TagsStore
from .task_scheduler import TaskSchedulerWorkerStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
from .user_directory import UserDirectoryStore
Expand Down Expand Up @@ -127,6 +128,7 @@ class DataStore(
CacheInvalidationWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
def __init__(
self,
Expand Down
173 changes: 173 additions & 0 deletions synapse/storage/databases/main/task_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus

if TYPE_CHECKING:
from synapse.server import HomeServer


class TaskSchedulerWorkerStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

@staticmethod
def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
row["status"] = TaskStatus(row["status"])
if row["params"] is not None:
row["params"] = json.loads(row["params"])
if row["result"] is not None:
row["result"] = json.loads(row["result"])
return ScheduledTask(**row)

async def get_scheduled_tasks(
self, action: Optional[str] = None, resource_id: Optional[str] = None
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.
If the parameters are `None` all the tasks are returned.
Args:
action: Limit the returned tasks to this specific action name
resource_id: Limit the returned tasks to this specific resource id
Returns: a list of `ScheduledTask`
"""
keyvalues = {}
if action:
keyvalues["action"] = action
if resource_id:
keyvalues["resource_id"] = resource_id

rows = await self.db_pool.simple_select_list(
table="scheduled_tasks",
keyvalues=keyvalues,
retcols=(
"id",
"action",
"status",
"timestamp",
"resource_id",
"params",
"result",
"error",
),
desc="get_scheduled_tasks",
)

return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]

async def upsert_scheduled_task(self, task: ScheduledTask) -> None:
"""Upsert a specified `ScheduledTask` in the DB.
Args:
task: the `ScheduledTask` to upsert
"""
await self.db_pool.simple_upsert(
"scheduled_tasks",
{"id": task.id},
{
"action": task.action,
"status": task.status,
"timestamp": task.timestamp,
"resource_id": task.resource_id,
"params": None if task.params is None else json.dumps(task.params),
"result": None if task.result is None else json.dumps(task.result),
"error": task.error,
},
desc="upsert_scheduled_task",
)

async def update_scheduled_task(
self,
id: str,
*,
timestamp: Optional[int] = None,
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> None:
"""Update a scheduled task in the DB with some new value(s).
Args:
id: id of the `ScheduledTask` to update
timestamp: new timestamp of the task
status: new status of the task
result: new result of the task
error: new error of the task
"""
updatevalues: JsonDict = {}
if timestamp is not None:
updatevalues["timestamp"] = timestamp
if status is not None:
updatevalues["status"] = status
if result is not None:
updatevalues["result"] = json.dumps(result)
if error is not None:
updatevalues["error"] = error
await self.db_pool.simple_update(
"scheduled_tasks",
{"id": id},
updatevalues,
desc="update_scheduled_task",
)

async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
"""Get a specific `ScheduledTask` from its id.
Args:
id: the id of the task to retrieve
Returns: the task if available, `None` otherwise
"""
row = await self.db_pool.simple_select_one(
table="scheduled_tasks",
keyvalues={"id": id},
retcols=(
"id",
"action",
"status",
"timestamp",
"resource_id",
"params",
"result",
"error",
),
allow_none=True,
desc="get_scheduled_task",
)

return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None

async def delete_scheduled_task(self, id: str) -> None:
"""Delete a specific task from its id.
Args:
id: the id of the task to delete
"""
await self.db_pool.simple_delete(
"scheduled_tasks",
keyvalues={"id": id},
desc="delete_scheduled_task",
)
26 changes: 26 additions & 0 deletions synapse/storage/schema/main/delta/79/03_scheduled_tasks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- cf ScheduledTask docstring for the meaning of the fields.
CREATE TABLE IF NOT EXISTS scheduled_tasks(
id text PRIMARY KEY,
action text NOT NULL,
status text NOT NULL,
timestamp bigint NOT NULL,
resource_id text,
params text,
result text,
error text
);
38 changes: 38 additions & 0 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import abc
import re
import string
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -979,3 +980,40 @@ class UserProfile(TypedDict):
class RetentionPolicy:
min_lifetime: Optional[int] = None
max_lifetime: Optional[int] = None


class TaskStatus(str, Enum):
"""Status of a scheduled task"""

# Task is scheduled but not active
SCHEDULED = "scheduled"
# Task is active and probably running, and if not
# will be run on next scheduler loop run
ACTIVE = "active"
# Task has completed successfully
COMPLETE = "complete"
# Task is over and either returned a failed status, or had an exception
FAILED = "failed"


@attr.s(auto_attribs=True, frozen=True, slots=True)
class ScheduledTask:
"""Description of a scheduled task"""

# id used to identify the task
id: str
# name of the action to be run by this task
action: str
# current status of this task
status: TaskStatus
# if the status is SCHEDULED then this represents when it should be launched,
# otherwise it represents the last time this task got a change of state
timestamp: int
# Optionally bind a task to some resource id for easy retrieval
resource_id: Optional[str]
# Optional parameters that will be passed to the function ran by the task
params: Optional[JsonMapping]
# Optional result that can be updated by the running task
result: Optional[JsonMapping]
# Optional error that should be assigned a value when the status is FAILED
error: Optional[str]
Loading

0 comments on commit 470f385

Please sign in to comment.