Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add admin API to run background jobs #11352

Merged
merged 10 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11352.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add admin API to run background jobs.
4 changes: 2 additions & 2 deletions docs/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2360,8 +2360,8 @@ user_directory:
# indexes were (re)built was before Synapse 1.44, you'll have to
# rebuild the indexes in order to search through all known users.
# These indexes are built the first time Synapse starts; admins can
# manually trigger a rebuild following the instructions at
# https://matrix-org.github.io/synapse/latest/user_directory.html
# manually trigger a rebuild via API following the instructions at
# https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
#
# Uncomment to return search results containing all known users, even if that
# user does not share a room with the requester.
Expand Down
35 changes: 34 additions & 1 deletion docs/usage/administration/admin_api/background_updates.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ For each update:
`average_items_per_ms` how many items are processed per millisecond based on an exponential average.



## Enabled

This API allow pausing background updates.
Expand Down Expand Up @@ -82,3 +81,37 @@ The API returns the `enabled` param.
```

There is also a `GET` version which returns the `enabled` state.


## Run

This API runs specific background updates. The job starts immediately after calling the API.
dklimpel marked this conversation as resolved.
Show resolved Hide resolved


The API is:

```
POST /_synapse/admin/v1/background_updates/start_job
```

with the following body:

```json
{
"job_name": "populate_stats_process_rooms"
}
```

The API returns the `job_name` param.
clokep marked this conversation as resolved.
Show resolved Hide resolved

```json
{
"job_name": "populate_stats_process_rooms"
}
```

The following JSON body parameters are available:

- `job_name` - A string which job to run. Valid values are:
- `populate_stats_process_rooms` - Recalculate the stats for all rooms.
- `regenerate_directory` - Recalculate the [user directory](../../../user_directory.md) if it is stale or out of sync.
3 changes: 2 additions & 1 deletion docs/user_directory.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ who are present in a publicly viewable room present on the server.

The directory info is stored in various tables, which can (typically after
DB corruption) get stale or out of sync. If this happens, for now the
solution to fix it is to execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql)
solution to fix it is to use the [admin API](usage/administration/admin_api/background_updates.md#run) or
execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably best if we only suggest a single way to do this, I think?

and then restart synapse. This should then start a background task to
flush the current tables and regenerate the directory.

Expand Down
4 changes: 2 additions & 2 deletions synapse/config/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def generate_config_section(self, config_dir_path, server_name, **kwargs):
# indexes were (re)built was before Synapse 1.44, you'll have to
# rebuild the indexes in order to search through all known users.
# These indexes are built the first time Synapse starts; admins can
# manually trigger a rebuild following the instructions at
# https://matrix-org.github.io/synapse/latest/user_directory.html
# manually trigger a rebuild via API following the instructions at
# https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
#
# Uncomment to return search results containing all known users, even if that
# user does not share a room with the requester.
Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from synapse.rest.admin.background_updates import (
BackgroundUpdateEnabledRestServlet,
BackgroundUpdateRestServlet,
BackgroundUpdateStartJobRestServlet,
)
from synapse.rest.admin.devices import (
DeleteDevicesRestServlet,
Expand Down Expand Up @@ -259,6 +260,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SendServerNoticeServlet(hs).register(http_server)
BackgroundUpdateEnabledRestServlet(hs).register(http_server)
BackgroundUpdateRestServlet(hs).register(http_server)
BackgroundUpdateStartJobRestServlet(hs).register(http_server)


def register_servlets_for_client_rest_resource(
Expand Down
114 changes: 92 additions & 22 deletions synapse/rest/admin/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Tuple

from synapse.api.errors import SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
)
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_user_is_admin
from synapse.types import JsonDict
Expand All @@ -29,37 +34,38 @@
class BackgroundUpdateEnabledRestServlet(RestServlet):
"""Allows temporarily disabling background updates"""

PATTERNS = admin_patterns("/background_updates/enabled")
PATTERNS = admin_patterns("/background_updates/enabled$")

def __init__(self, hs: "HomeServer"):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()
self._is_mine_id = hs.is_mine_id
dklimpel marked this conversation as resolved.
Show resolved Hide resolved
self._auth = hs.get_auth()

self.data_stores = hs.get_datastores()
self._data_stores = hs.get_datastores()

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)

# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
enabled = all(db.updates.enabled for db in self.data_stores.databases)
enabled = all(db.updates.enabled for db in self._data_stores.databases)

return 200, {"enabled": enabled}

async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)

body = parse_json_object_from_request(request)

enabled = body.get("enabled", True)

if not isinstance(enabled, bool):
raise SynapseError(400, "'enabled' parameter must be a boolean")
raise SynapseError(
HTTPStatus.BAD_REQUEST, "'enabled' parameter must be a boolean"
)

for db in self.data_stores.databases:
for db in self._data_stores.databases:
db.updates.enabled = enabled

# If we're re-enabling them ensure that we start the background
Expand All @@ -73,26 +79,25 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
class BackgroundUpdateRestServlet(RestServlet):
"""Fetch information about background updates"""

PATTERNS = admin_patterns("/background_updates/status")
PATTERNS = admin_patterns("/background_updates/status$")

def __init__(self, hs: "HomeServer"):
self.group_server = hs.get_groups_server_handler()
self.is_mine_id = hs.is_mine_id
self.auth = hs.get_auth()
self._is_mine_id = hs.is_mine_id
self._auth = hs.get_auth()

self.data_stores = hs.get_datastores()
self._data_stores = hs.get_datastores()

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)

# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
enabled = all(db.updates.enabled for db in self.data_stores.databases)
enabled = all(db.updates.enabled for db in self._data_stores.databases)

current_updates = {}

for db in self.data_stores.databases:
for db in self._data_stores.databases:
update = db.updates.get_current_update()
if not update:
continue
Expand All @@ -105,3 +110,68 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
}

return 200, {"enabled": enabled, "current_updates": current_updates}


class BackgroundUpdateStartJobRestServlet(RestServlet):
"""Allows to start specific background updates"""

PATTERNS = admin_patterns("/background_updates/start_job")

def __init__(self, hs: "HomeServer"):
self._is_mine_id = hs.is_mine_id
self._auth = hs.get_auth()

self._store = hs.get_datastore()

async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester.user)

body = parse_json_object_from_request(request)
assert_params_in_dict(body, ["job_name"])

job_name = body.get("job_name")
dklimpel marked this conversation as resolved.
Show resolved Hide resolved

if job_name == "populate_stats_process_rooms":
clokep marked this conversation as resolved.
Show resolved Hide resolved
await self._store.db_pool.simple_insert(
dklimpel marked this conversation as resolved.
Show resolved Hide resolved
"background_updates",
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
},
desc="admin_api_populate_stats_process_rooms",
)
elif job_name == "regenerate_directory":
jobs = [
{
"update_name": "populate_user_directory_createtables",
"progress_json": "{}",
"depends_on": "",
},
{
"update_name": "populate_user_directory_process_rooms",
"progress_json": "{}",
"depends_on": "populate_user_directory_createtables",
},
{
"update_name": "populate_user_directory_process_users",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_rooms",
},
{
"update_name": "populate_user_directory_cleanup",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_users",
},
]
await self._store.db_pool.simple_insert_many(
table="background_updates",
values=jobs,
desc="admin_api_regenerate_directory",
)
else:
raise SynapseError(400, "Invalid job_name")

self._store.db_pool.updates.start_doing_background_updates()

return 200, {"job_name": job_name}
2 changes: 2 additions & 0 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:

def start_doing_background_updates(self) -> None:
if self.enabled:
# if we start a new background update, not all updates ar done
clokep marked this conversation as resolved.
Show resolved Hide resolved
self._all_done = False
run_as_background_process("background_updates", self.run_background_updates)

async def run_background_updates(self, sleep: bool = True) -> None:
Expand Down
Loading