Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix new scheduled tasks jumping the queue #17962

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17962.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix new scheduled tasks jumping the queue.
2 changes: 1 addition & 1 deletion synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ def to_line(self) -> str:


class NewActiveTaskCommand(_SimpleCommand):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the NAME to TASK_READY (and related methods) to match the new behavior ?

Pro it's cleaner, con we may lost a message when doing a rolling restart. Not sure it's a big deal since the scheduler will go over all the tasks in DB when starting up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinions from me on this. Will await insight from @element-hq/synapse-core

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its particularly useful for now, and I think we can always add it in later if it is useful (e.g. for metrics maybe?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what exactly are you suggesting, Erik? We remove the command altogether?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooops, sorry, misunderstood what was going on 🤦

Eh, I don't mind. No one really ever sees what's on the wire anyway 🤷

"""Sent to inform instance handling background tasks that a new active task is available to run.
"""Sent to inform instance handling background tasks that a new task is ready to run.

Format::

Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ def on_NEW_ACTIVE_TASK(
) -> None:
"""Called when get a new NEW_ACTIVE_TASK command."""
if self._task_scheduler:
self._task_scheduler.launch_task_by_id(cmd.data)
self._task_scheduler.on_new_task(cmd.data)

def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
Expand Down
30 changes: 13 additions & 17 deletions synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ async def schedule_task(
The id of the scheduled task
"""
status = TaskStatus.SCHEDULED
start_now = False
if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec()
status = TaskStatus.ACTIVE
start_now = True

task = ScheduledTask(
random_string(16),
Expand All @@ -190,9 +191,11 @@ async def schedule_task(
)
await self._store.insert_scheduled_task(task)

if status == TaskStatus.ACTIVE:
# If the task is ready to run immediately, run the scheduling algorithm now
# rather than waiting
if start_now:
if self._run_background_tasks:
await self._launch_task(task)
self._launch_scheduled_tasks()
else:
self._hs.get_replication_command_handler().send_new_active_task(task.id)

Expand Down Expand Up @@ -300,23 +303,14 @@ async def delete_task(self, id: str) -> None:
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
await self._store.delete_scheduled_task(id)

def launch_task_by_id(self, id: str) -> None:
"""Try launching the task with the given ID."""
# Don't bother trying to launch new tasks if we're already at capacity.
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to remove this? It does avoid spinning up a background process, even if its redundant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow.

This method was previously (only) called when a NEW_ACTIVE_TASK replication command was received, which (previously) meant that another worker had added a new scheduled task and set it to ACTIVE immediately, so we should start running it if possible.

The problem is that now, the other worker doesn't make the decision about whether to set the task to ACTIVE or not -- we first have to check how many tasks are already running, so the other worker has to delegate the SCHEDULED -> ACTIVE transition to the background-tasks worker. Soooo, NEW_ACTIVE_TASK now just means "there is a new task, and it its scheduled time has already passed", which is a hint to the background-tasks worker that it might want to run its scheduler now rather than waiting 60s for it to come round again.

All of which is a long way of saying: launch_task_by_id isn't very useful any more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, it was just the if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS you were talking about?

Right, yes, we could reinstate that. Bear with me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

065b11b changes this for both the 60s timer and for the NEW_ACTIVE_TASK command handler.

run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)

async def _launch_task_by_id(self, id: str) -> None:
"""Helper async function for `launch_task_by_id`."""
task = await self.get_task(id)
if task:
await self._launch_task(task)
def on_new_task(self, task_id: str) -> None:
"""Handle a notification that a new ready-to-run task has been added to the queue"""
# Just run the scheduler
self._launch_scheduled_tasks()

@wrap_as_background_process("launch_scheduled_tasks")
Copy link
Member Author

@richvdh richvdh Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can I just say: I am not a fan of this decorator. I keep forgetting that _launch_scheduled_tasks runs as a background process.

async def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running at that time."""
"""Retrieve and launch scheduled tasks that should be running."""
# Don't bother trying to launch new tasks if we're already at capacity.
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return
Expand All @@ -330,6 +324,8 @@ async def _launch_scheduled_tasks(self) -> None:
for task in await self.get_tasks(
statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
):
# _launch_task will ignore tasks that we're already running, and
# will also do nothing if we're already at the maximum capacity.
await self._launch_task(task)
for task in await self.get_tasks(
statuses=[TaskStatus.SCHEDULED],
Expand Down
49 changes: 29 additions & 20 deletions tests/util/test_task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#

from typing import Optional, Tuple
from typing import List, Optional, Tuple

from twisted.internet.task import deferLater
from twisted.test.proto_helpers import MemoryReactor
Expand Down Expand Up @@ -104,33 +103,43 @@ def test_schedule_lot_of_tasks(self) -> None:
)
)

# This is to give the time to the active tasks to finish
def get_tasks_of_status(status: TaskStatus) -> List[ScheduledTask]:
tasks = (
self.get_success(self.task_scheduler.get_task(task_id))
for task_id in task_ids
)
return [t for t in tasks if t is not None and t.status == status]

# At this point, there should be MAX_CONCURRENT_RUNNING_TASKS active tasks and
# one scheduled task.
self.assertEquals(
len(get_tasks_of_status(TaskStatus.ACTIVE)),
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
)
self.assertEquals(
len(get_tasks_of_status(TaskStatus.SCHEDULED)),
1,
)

# Give the time to the active tasks to finish
self.reactor.advance(1)

# Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one
# Check that MAX_CONCURRENT_RUNNING_TASKS tasks have run and that one
# is still scheduled.
tasks = [
self.get_success(self.task_scheduler.get_task(task_id))
for task_id in task_ids
]

self.assertEquals(
len(
[t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE]
),
len(get_tasks_of_status(TaskStatus.COMPLETE)),
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
)

scheduled_tasks = [
t for t in tasks if t is not None and t.status == TaskStatus.ACTIVE
]
scheduled_tasks = get_tasks_of_status(TaskStatus.SCHEDULED)
self.assertEquals(len(scheduled_tasks), 1)

# We need to wait for the next run of the scheduler loop
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
Comment on lines -129 to -130
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hasn't actually changed, I'm just tightening up the test a bit. We didn't need to wait for the next run of the loop, we just had to wait 0.1 seconds. Looks like this changed in matrix-org/synapse#16313 and matrix-org/synapse#16660 without the test being updated.

self.reactor.advance(1)
# The scheduled task should start 0.1s after the first of the active tasks
# finishes
self.reactor.advance(0.1)
self.assertEquals(len(get_tasks_of_status(TaskStatus.ACTIVE)), 1)

# Check that the last task has been properly executed after the next scheduler loop run
# ... and should finally complete after another second
self.reactor.advance(1)
prev_scheduled_task = self.get_success(
self.task_scheduler.get_task(scheduled_tasks[0].id)
)
Expand Down
Loading