Skip to content

Commit

Permalink
Avoid starting bg process if we are at capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed Nov 28, 2024
1 parent afb7d08 commit 065b11b
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,8 @@ def on_new_task(self, task_id: str) -> None:
# Just run the scheduler
self._launch_scheduled_tasks()

@wrap_as_background_process("launch_scheduled_tasks")
async def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running."""
def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running at this time."""
# Don't bother trying to launch new tasks if we're already at capacity.
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return
Expand All @@ -320,22 +319,26 @@ async def _launch_scheduled_tasks(self) -> None:

self._launching_new_tasks = True

try:
for task in await self.get_tasks(
statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
):
# _launch_task will ignore tasks that we're already running, and
# will also do nothing if we're already at the maximum capacity.
await self._launch_task(task)
for task in await self.get_tasks(
statuses=[TaskStatus.SCHEDULED],
max_timestamp=self._clock.time_msec(),
limit=self.MAX_CONCURRENT_RUNNING_TASKS,
):
await self._launch_task(task)

finally:
self._launching_new_tasks = False
async def inner() -> None:
try:
for task in await self.get_tasks(
statuses=[TaskStatus.ACTIVE],
limit=self.MAX_CONCURRENT_RUNNING_TASKS,
):
# _launch_task will ignore tasks that we're already running, and
# will also do nothing if we're already at the maximum capacity.
await self._launch_task(task)
for task in await self.get_tasks(
statuses=[TaskStatus.SCHEDULED],
max_timestamp=self._clock.time_msec(),
limit=self.MAX_CONCURRENT_RUNNING_TASKS,
):
await self._launch_task(task)

finally:
self._launching_new_tasks = False

run_as_background_process("launch_scheduled_tasks", inner)

@wrap_as_background_process("clean_scheduled_tasks")
async def _clean_scheduled_tasks(self) -> None:
Expand Down

0 comments on commit 065b11b

Please sign in to comment.