Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Implements a task scheduler for resumable potentially long running tasks #15891

Merged
merged 38 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
470f385
Implements a task scheduler for resumable potentially long running tasks
Jul 7, 2023
0961f52
Add filters to task retrieval + clean less often
Jul 24, 2023
862425f
Merge remote-tracking branch 'origin/develop' into mv/task-scheduler
MatMaul Jul 24, 2023
e6bf9ce
Move sql file
MatMaul Jul 24, 2023
abbbddf
Update schema version
MatMaul Jul 24, 2023
a4ed9fe
Add ts to get_tasks and narrow down the query in the loops
MatMaul Jul 26, 2023
3490a91
Renames
Jul 26, 2023
e03c12d
Comments
Jul 26, 2023
8e29f3d
Update synapse/util/task_scheduler.py
Jul 26, 2023
b2dff65
Update synapse/util/task_scheduler.py
Jul 26, 2023
ce981c5
Add underscore to private fields
Jul 26, 2023
9e25f5e
lint
Jul 26, 2023
8c713f9
Apply suggestions from code review
Jul 26, 2023
bc92b72
Comments
Jul 26, 2023
46cbde8
Fix error handling
Jul 26, 2023
1025432
More comments
Jul 26, 2023
fd2c3dd
Fix
Jul 27, 2023
9e9a5e8
Add doc
Jul 28, 2023
861992f
Merge remote-tracking branch 'origin/develop' into mv/task-scheduler
Jul 28, 2023
4c1c833
Move to new DB delta folder
Jul 28, 2023
6762c07
Limit concurrent running tasks to 20
Jul 28, 2023
dfa3983
Remove launch now feature, max 10
Jul 28, 2023
7730d74
Fix tests
Jul 28, 2023
5878384
Fix olddeps tests
Jul 29, 2023
2383d1e
Merge loops
Aug 3, 2023
6122291
Merge remote-tracking branch 'origin/develop' into mv/task-scheduler
Aug 7, 2023
16be76b
Address comments
Aug 10, 2023
ad25237
Address comments
Aug 10, 2023
7b96379
Add delete_task
Aug 11, 2023
3fd518c
Add index on status
Aug 11, 2023
1a410ee
Add big docstring
Aug 14, 2023
8ed3e37
Add running tasks metric and log running tasks for more than 24hrs wi…
Aug 14, 2023
60ea494
Merge remote-tracking branch 'origin/develop' into mv/task-scheduler
Aug 14, 2023
c089f67
Remove unused sql file
Aug 16, 2023
d158b67
Address reviews
Aug 16, 2023
223f87e
Address comment
Aug 16, 2023
e5a5344
Fix
Aug 16, 2023
56ab35e
Address comment
Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15891.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements a task scheduler for resumable potentially long running tasks.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be a feature -- it is something internally used by Synapse; admins and end-users don't care.

We could probably just combine this with the other PR?

2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
Expand Down Expand Up @@ -144,6 +145,7 @@ class GenericWorkerStore(
TransactionWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.
Expand Down
7 changes: 6 additions & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
from synapse.util.macaroons import MacaroonGenerator
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import random_string
from synapse.util.task_scheduler import TaskScheduler

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -360,6 +361,7 @@ def setup_background_tasks(self) -> None:
"""
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
getattr(self, "get_" + i + "_handler")()
self.get_task_scheduler()
clokep marked this conversation as resolved.
Show resolved Hide resolved

def get_reactor(self) -> ISynapseReactor:
"""
Expand Down Expand Up @@ -912,6 +914,9 @@ def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
"""Usage metrics shared between phone home stats and the prometheus exporter."""
return CommonUsageMetricsManager(self)

@cache_in_self
def get_worker_locks_handler(self) -> WorkerLocksHandler:
return WorkerLocksHandler(self)

@cache_in_self
def get_task_scheduler(self) -> TaskScheduler:
return TaskScheduler(self)
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from .stats import StatsStore
from .stream import StreamWorkerStore
from .tags import TagsStore
from .task_scheduler import TaskSchedulerWorkerStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
from .user_directory import UserDirectoryStore
Expand Down Expand Up @@ -127,6 +128,7 @@ class DataStore(
CacheInvalidationWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
def __init__(
self,
Expand Down
202 changes: 202 additions & 0 deletions synapse/storage/databases/main/task_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Any, Dict, List, Optional

from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
from synapse.util import json_encoder

if TYPE_CHECKING:
from synapse.server import HomeServer


class TaskSchedulerWorkerStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

@staticmethod
def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
row["status"] = TaskStatus(row["status"])
if row["params"] is not None:
row["params"] = db_to_json(row["params"])
if row["result"] is not None:
row["result"] = db_to_json(row["result"])
return ScheduledTask(**row)

async def get_scheduled_tasks(
self,
*,
actions: Optional[List[str]] = None,
resource_id: Optional[str] = None,
statuses: Optional[List[TaskStatus]] = None,
max_timestamp: Optional[int] = None,
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.

Args:
actions: Limit the returned tasks to those specific action names
resource_id: Limit the returned tasks to the specific resource id, if specified
statuses: Limit the returned tasks to the specific statuses
max_timestamp: Limit the returned tasks to the ones that have
a timestamp inferior to the specified one

Returns: a list of `ScheduledTask`, ordered by increasing timestamps
"""

def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
clauses: List[str] = []
args: List[Any] = []
if resource_id:
clauses.append("resource_id = ?")
args.append(resource_id)
if actions is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "action", actions
)
clauses.append(clause)
args.extend(temp_args)
if statuses is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "status", statuses
)
clauses.append(clause)
args.extend(temp_args)
if max_timestamp is not None:
clauses.append("timestamp <= ?")
args.append(max_timestamp)

sql = "SELECT * FROM scheduled_tasks"
if clauses:
sql = sql + " WHERE " + " AND ".join(clauses)

sql = sql + "ORDER BY timestamp"

txn.execute(sql, args)
return self.db_pool.cursor_to_dict(txn)

rows = await self.db_pool.runInteraction(
"get_scheduled_tasks", get_scheduled_tasks_txn
)
return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]

async def insert_scheduled_task(self, task: ScheduledTask) -> None:
"""Insert a specified `ScheduledTask` in the DB.

Args:
task: the `ScheduledTask` to insert
"""
await self.db_pool.simple_insert(
"scheduled_tasks",
{
"id": task.id,
"action": task.action,
"status": task.status,
"timestamp": task.timestamp,
"resource_id": task.resource_id,
"params": None
if task.params is None
else json_encoder.encode(task.params),
"result": None
if task.result is None
else json_encoder.encode(task.result),
"error": task.error,
},
desc="insert_scheduled_task",
)

async def update_scheduled_task(
self,
id: str,
timestamp: int,
*,
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
"""Update a scheduled task in the DB with some new value(s).

Args:
id: id of the `ScheduledTask` to update
timestamp: new timestamp of the task
status: new status of the task
result: new result of the task
error: new error of the task

Returns: `False` if no matching row was found, `True` otherwise
"""
updatevalues: JsonDict = {"timestamp": timestamp}
if status is not None:
updatevalues["status"] = status
if result is not None:
updatevalues["result"] = json_encoder.encode(result)
if error is not None:
updatevalues["error"] = error
nb_rows = await self.db_pool.simple_update(
"scheduled_tasks",
{"id": id},
updatevalues,
desc="update_scheduled_task",
)
return nb_rows > 0

async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
"""Get a specific `ScheduledTask` from its id.

Args:
id: the id of the task to retrieve

Returns: the task if available, `None` otherwise
"""
row = await self.db_pool.simple_select_one(
table="scheduled_tasks",
keyvalues={"id": id},
retcols=(
"id",
"action",
"status",
"timestamp",
"resource_id",
"params",
"result",
"error",
),
allow_none=True,
desc="get_scheduled_task",
)

return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None

async def delete_scheduled_task(self, id: str) -> None:
"""Delete a specific task from its id.

Args:
id: the id of the task to delete
"""
await self.db_pool.simple_delete(
"scheduled_tasks",
keyvalues={"id": id},
desc="delete_scheduled_task",
)
1 change: 1 addition & 0 deletions synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@

Changes in SCHEMA_VERSION = 80
- The event_txn_id_device_id is always written to for new events.
- Add tables for the task scheduler.
"""


Expand Down
28 changes: 28 additions & 0 deletions synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- cf ScheduledTask docstring for the meaning of the fields.
CREATE TABLE IF NOT EXISTS scheduled_tasks(
id TEXT PRIMARY KEY,
action TEXT NOT NULL,
status TEXT NOT NULL,
timestamp BIGINT NOT NULL,
resource_id TEXT,
params TEXT,
result TEXT,
error TEXT
);

CREATE INDEX IF NOT EXISTS scheduled_tasks_status ON scheduled_tasks(status);
39 changes: 39 additions & 0 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import abc
import re
import string
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand Down Expand Up @@ -968,3 +969,41 @@ class UserProfile(TypedDict):
class RetentionPolicy:
min_lifetime: Optional[int] = None
max_lifetime: Optional[int] = None


class TaskStatus(str, Enum):
MatMaul marked this conversation as resolved.
Show resolved Hide resolved
"""Status of a scheduled task"""

# Task is scheduled but not active
SCHEDULED = "scheduled"
# Task is active and probably running, and if not
# will be run on next scheduler loop run
ACTIVE = "active"
# Task has completed successfully
COMPLETE = "complete"
# Task is over and either returned a failed status, or had an exception
FAILED = "failed"


@attr.s(auto_attribs=True, frozen=True, slots=True)
class ScheduledTask:
"""Description of a scheduled task"""

# Id used to identify the task
id: str
# Name of the action to be run by this task
action: str
# Current status of this task
status: TaskStatus
# If the status is SCHEDULED then this represents when it should be launched,
# otherwise it represents the last time this task got a change of state.
# In milliseconds since epoch in system time timezone, usually UTC.
timestamp: int
# Optionally bind a task to some resource id for easy retrieval
resource_id: Optional[str]
# Optional parameters that will be passed to the function ran by the task
params: Optional[JsonMapping]
# Optional result that can be updated by the running task
result: Optional[JsonMapping]
# Optional error that should be assigned a value when the status is FAILED
error: Optional[str]
Loading