Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Some minor performance fixes for task schedular #16313

Merged
merged 13 commits into from
Sep 14, 2023
6 changes: 2 additions & 4 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,14 +672,12 @@ def on_LOCK_RELEASED(
cmd.instance_name, cmd.lock_name, cmd.lock_key
)

async def on_NEW_ACTIVE_TASK(
def on_NEW_ACTIVE_TASK(
self, conn: IReplicationConnection, cmd: NewActiveTaskCommand
) -> None:
"""Called when get a new NEW_ACTIVE_TASK command."""
if self._task_scheduler:
task = await self._task_scheduler.get_task(cmd.data)
if task:
await self._task_scheduler._launch_task(task)
self._task_scheduler.launch_task_by_id(cmd.data)

def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
Expand Down
14 changes: 14 additions & 0 deletions synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,20 @@ async def delete_task(self, id: str) -> None:
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
await self._store.delete_scheduled_task(id)

def launch_task_by_id(self, id: str) -> None:
"""Try launching the task with the given ID."""
# Don't bother trying to launch new tasks if we're already at capacity.
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return

run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)
MatMaul marked this conversation as resolved.
Show resolved Hide resolved

async def _launch_task_by_id(self, id: str) -> None:
"""Helper async function for `launch_task_by_id`."""
task = await self.get_task(id)
if task:
await self._launch_task(task)

@wrap_as_background_process("launch_scheduled_tasks")
async def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running at that time."""
Expand Down