Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Only run one background update at a time #7190

Merged
merged 8 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 8 additions & 32 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,19 @@ async def run_background_updates(self, sleep=True):
except Exception:
logger.exception("Error doing update")
else:
if result is None:
if result:
logger.info(
"No more background updates to do."
" Unscheduling background update task."
)
self._all_done = True
return None

@defer.inlineCallbacks
def has_completed_background_updates(self):
async def has_completed_background_updates(self) -> bool:
"""Check if all the background updates have completed

Returns:
Deferred[bool]: True if all background updates have completed
True if all background updates have completed
"""
# if we've previously determined that there is nothing left to do, that
# is easy
Expand All @@ -138,7 +137,7 @@ def has_completed_background_updates(self):
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
updates = yield self.db.simple_select_onecol(
updates = await self.db.simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
Expand Down Expand Up @@ -170,9 +169,7 @@ async def has_completed_background_update(self, update_name) -> bool:

return not update_exists

async def do_next_background_update(
self, desired_duration_ms: float
) -> Optional[int]:
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
"""Does some amount of work on the next queued background update

Returns once some amount of work is done.
Expand All @@ -181,7 +178,7 @@ async def do_next_background_update(
desired_duration_ms(float): How long we want to spend
updating.
Returns:
None if there is no more work to do, otherwise an int
True if there is no more work to do, otherwise False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be less confusing to do this the other way around (i.e. returning False if there's no more work to do).
The question the return value seemed to be answering before that change seems to be "Do we still have work to do?", and not "Do we not have work to do?", which could lead to confusing double negations, so I suggest we keep the semantics here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if I changed the description to True if we have finished running all the background updates, otherwise False?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's better, though I still find it a bit confusing, but ymmv.

"""
if not self._background_update_queue:
updates = await self.db.simple_select_list(
Expand All @@ -196,14 +193,14 @@ async def do_next_background_update(

if not self._background_update_queue:
# no work left to do
return None
return True

# pop from the front, and add back to the back
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)

res = await self._do_background_update(update_name, desired_duration_ms)
return res
return False

async def _do_background_update(
self, update_name: str, desired_duration_ms: float
Expand Down Expand Up @@ -400,27 +397,6 @@ def updater(progress, batch_size):

self.register_background_update_handler(update_name, updater)

def start_background_update(self, update_name, progress):
"""Starts a background update running.

Args:
update_name: The update to set running.
progress: The initial state of the progress of the update.

Returns:
A deferred that completes once the task has been added to the
queue.
"""
# Clear the background update queue so that we will pick up the new
# task on the next iteration of do_background_update.
self._background_update_queue = []
progress_json = json.dumps(progress)

return self.db.simple_insert(
"background_updates",
{"update_name": update_name, "progress_json": progress_json},
)

def _end_background_update(self, update_name):
"""Removes a completed background update task from the queue.

Expand Down
24 changes: 15 additions & 9 deletions tests/storage/test_background_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, homeserver):
self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
# the base test class should have run the real bg updates for us
self.assertTrue(self.updates.has_completed_background_updates())
self.assertTrue(
self.get_success(self.updates.has_completed_background_updates())
)

self.update_handler = Mock()
self.updates.register_background_update_handler(
Expand All @@ -25,12 +27,20 @@ def test_do_background_update(self):
# the target runtime for each bg update
target_background_update_duration_ms = 50000

store = self.hs.get_datastore()
self.get_success(
store.db.simple_insert(
"background_updates",
values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
)
)

# first step: make a bit of progress
@defer.inlineCallbacks
def update(progress, count):
yield self.clock.sleep((count * duration_ms) / 1000)
progress = {"my_key": progress["my_key"] + 1}
yield self.hs.get_datastore().db.runInteraction(
yield store.db.runInteraction(
"update_progress",
self.updates._background_update_progress_txn,
"test_update",
Expand All @@ -39,18 +49,14 @@ def update(progress, count):
return count

self.update_handler.side_effect = update

self.get_success(
self.updates.start_background_update("test_update", {"my_key": 1})
)
self.update_handler.reset_mock()
res = self.get_success(
self.updates.do_next_background_update(
target_background_update_duration_ms
),
by=0.1,
)
self.assertIsNotNone(res)
self.assertFalse(res)

# on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with(
Expand All @@ -73,13 +79,13 @@ def update(progress, count):
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
self.assertIsNotNone(result)
self.assertFalse(result)
self.update_handler.assert_called_once()

# third step: we don't expect to be called any more
self.update_handler.reset_mock()
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
self.assertIsNone(result)
self.assertTrue(result)
self.assertFalse(self.update_handler.called)
11 changes: 7 additions & 4 deletions tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import (
SENTINEL_CONTEXT,
LoggingContext,
current_context,
set_current_context,
)
Expand Down Expand Up @@ -419,15 +420,17 @@ def setup_test_homeserver(self, *args, **kwargs):
config_obj.parse_config_dict(config, "", "")
kwargs["config"] = config_obj

async def run_bg_updates():
with LoggingContext("run_bg_updates", request="run_bg_updates-1"):
while not await stor.db.updates.has_completed_background_updates():
await stor.db.updates.do_next_background_update(1)

hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
stor = hs.get_datastore()

# Run the database background updates, when running against "master".
if hs.__class__.__name__ == "TestHomeServer":
while not self.get_success(
stor.db.updates.has_completed_background_updates()
):
self.get_success(stor.db.updates.do_next_background_update(1))
self.get_success(run_bg_updates())

return hs

Expand Down