Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add admin API to run background jobs #11352

Merged
merged 10 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11352.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add admin API to run background jobs.
4 changes: 2 additions & 2 deletions docs/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2360,8 +2360,8 @@ user_directory:
# indexes were (re)built was before Synapse 1.44, you'll have to
# rebuild the indexes in order to search through all known users.
# These indexes are built the first time Synapse starts; admins can
# manually trigger a rebuild following the instructions at
# https://matrix-org.github.io/synapse/latest/user_directory.html
# manually trigger a rebuild via API following the instructions at
# https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
#
# Uncomment to return search results containing all known users, even if that
# user does not share a room with the requester.
Expand Down
27 changes: 26 additions & 1 deletion docs/usage/administration/admin_api/background_updates.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ For each update:
`average_items_per_ms` how many items are processed per millisecond based on an exponential average.



## Enabled

This API allow pausing background updates.
Expand Down Expand Up @@ -82,3 +81,29 @@ The API returns the `enabled` param.
```

There is also a `GET` version which returns the `enabled` state.


## Run

This API schedules a specific background update to run. The job starts immediately after calling the API.


The API is:

```
POST /_synapse/admin/v1/background_updates/start_job
```

with the following body:

```json
{
"job_name": "populate_stats_process_rooms"
}
```

The following JSON body parameters are available:

- `job_name` - A string which job to run. Valid values are:
- `populate_stats_process_rooms` - Recalculate the stats for all rooms.
- `regenerate_directory` - Recalculate the [user directory](../../../user_directory.md) if it is stale or out of sync.
6 changes: 3 additions & 3 deletions docs/user_directory.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ on this particular server - i.e. ones which your account shares a room with, or
who are present in a publicly viewable room present on the server.

The directory info is stored in various tables, which can (typically after
DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql)
and then restart synapse. This should then start a background task to
DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to use the [admin API](usage/administration/admin_api/background_updates.md#run)
and execute the job `regenerate_directory`. This should then start a background task to
flush the current tables and regenerate the directory.

Data model
Expand Down
4 changes: 2 additions & 2 deletions synapse/config/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def generate_config_section(self, config_dir_path, server_name, **kwargs):
# indexes were (re)built was before Synapse 1.44, you'll have to
# rebuild the indexes in order to search through all known users.
# These indexes are built the first time Synapse starts; admins can
# manually trigger a rebuild following the instructions at
# https://matrix-org.github.io/synapse/latest/user_directory.html
# manually trigger a rebuild via API following the instructions at
# https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
#
# Uncomment to return search results containing all known users, even if that
# user does not share a room with the requester.
Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from synapse.rest.admin.background_updates import (
BackgroundUpdateEnabledRestServlet,
BackgroundUpdateRestServlet,
BackgroundUpdateStartJobRestServlet,
)
from synapse.rest.admin.devices import (
DeleteDevicesRestServlet,
Expand Down Expand Up @@ -259,6 +260,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SendServerNoticeServlet(hs).register(http_server)
BackgroundUpdateEnabledRestServlet(hs).register(http_server)
BackgroundUpdateRestServlet(hs).register(http_server)
BackgroundUpdateStartJobRestServlet(hs).register(http_server)


def register_servlets_for_client_rest_resource(
Expand Down
123 changes: 96 additions & 27 deletions synapse/rest/admin/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Tuple

from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
)
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_user_is_admin
from synapse.types import JsonDict
Expand All @@ -29,70 +34,66 @@
class BackgroundUpdateEnabledRestServlet(RestServlet):
"""Allows temporarily disabling background updates"""

PATTERNS = admin_patterns("/background_updates/enabled")
PATTERNS = admin_patterns("/background_updates/enabled$")

def __init__(self, hs: "HomeServer"):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()

self.data_stores = hs.get_datastores()
self._auth = hs.get_auth()
self._data_stores = hs.get_datastores()

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)

# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
enabled = all(db.updates.enabled for db in self.data_stores.databases)
enabled = all(db.updates.enabled for db in self._data_stores.databases)

return 200, {"enabled": enabled}
return HTTPStatus.OK, {"enabled": enabled}

async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)

body = parse_json_object_from_request(request)

enabled = body.get("enabled", True)

if not isinstance(enabled, bool):
raise SynapseError(400, "'enabled' parameter must be a boolean")
raise SynapseError(
HTTPStatus.BAD_REQUEST, "'enabled' parameter must be a boolean"
)

for db in self.data_stores.databases:
for db in self._data_stores.databases:
db.updates.enabled = enabled

# If we're re-enabling them ensure that we start the background
# process again.
if enabled:
db.updates.start_doing_background_updates()

return 200, {"enabled": enabled}
return HTTPStatus.OK, {"enabled": enabled}


class BackgroundUpdateRestServlet(RestServlet):
"""Fetch information about background updates"""

PATTERNS = admin_patterns("/background_updates/status")
PATTERNS = admin_patterns("/background_updates/status$")

def __init__(self, hs: "HomeServer"):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()

self.data_stores = hs.get_datastores()
self._auth = hs.get_auth()
self._data_stores = hs.get_datastores()

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)

# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
enabled = all(db.updates.enabled for db in self.data_stores.databases)
enabled = all(db.updates.enabled for db in self._data_stores.databases)

current_updates = {}

for db in self.data_stores.databases:
for db in self._data_stores.databases:
update = db.updates.get_current_update()
if not update:
continue
Expand All @@ -104,4 +105,72 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
"average_items_per_ms": update.average_items_per_ms(),
}

return 200, {"enabled": enabled, "current_updates": current_updates}
return HTTPStatus.OK, {"enabled": enabled, "current_updates": current_updates}


class BackgroundUpdateStartJobRestServlet(RestServlet):
"""Allows to start specific background updates"""

PATTERNS = admin_patterns("/background_updates/start_job")

def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastore()

async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)

body = parse_json_object_from_request(request)
assert_params_in_dict(body, ["job_name"])

job_name = body["job_name"]

if job_name == "populate_stats_process_rooms":
clokep marked this conversation as resolved.
Show resolved Hide resolved
jobs = [
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
},
]
elif job_name == "regenerate_directory":
jobs = [
{
"update_name": "populate_user_directory_createtables",
"progress_json": "{}",
"depends_on": "",
},
{
"update_name": "populate_user_directory_process_rooms",
"progress_json": "{}",
"depends_on": "populate_user_directory_createtables",
},
{
"update_name": "populate_user_directory_process_users",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_rooms",
},
{
"update_name": "populate_user_directory_cleanup",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_users",
},
]
else:
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")

try:
await self._store.db_pool.simple_insert_many(
table="background_updates",
values=jobs,
desc=f"admin_api_run_{job_name}",
)
except self._store.db_pool.engine.module.IntegrityError:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Job %s is already in queue of background updates." % (job_name,),
)

self._store.db_pool.updates.start_doing_background_updates()

return HTTPStatus.OK, {}
2 changes: 2 additions & 0 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:

def start_doing_background_updates(self) -> None:
if self.enabled:
# if we start a new background update, not all updates are done.
self._all_done = False
run_as_background_process("background_updates", self.run_background_updates)

async def run_background_updates(self, sleep: bool = True) -> None:
Expand Down
Loading