Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move event_reports to RoomWorkerStore #15165

Merged
merged 3 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15165.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move `get_event_report` and `get_event_reports_paginate` from `RoomStore` to `RoomWorkerStore`.
354 changes: 177 additions & 177 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,183 @@ def get_un_partial_stated_rooms_from_stream_txn(
get_un_partial_stated_rooms_from_stream_txn,
)

async def get_event_report(self, report_id: int) -> Optional[Dict[str, Any]]:
"""Retrieve an event report

Args:
report_id: ID of reported event in database
Returns:
JSON dict of information from an event report or None if the
report does not exist.
"""

def _get_event_report_txn(
txn: LoggingTransaction, report_id: int
) -> Optional[Dict[str, Any]]:
sql = """
SELECT
er.id,
er.received_ts,
er.room_id,
er.event_id,
er.user_id,
er.content,
events.sender,
room_stats_state.canonical_alias,
room_stats_state.name,
event_json.json AS event_json
FROM event_reports AS er
LEFT JOIN events
ON events.event_id = er.event_id
JOIN event_json
ON event_json.event_id = er.event_id
JOIN room_stats_state
ON room_stats_state.room_id = er.room_id
WHERE er.id = ?
"""

txn.execute(sql, [report_id])
row = txn.fetchone()

if not row:
return None

event_report = {
"id": row[0],
"received_ts": row[1],
"room_id": row[2],
"event_id": row[3],
"user_id": row[4],
"score": db_to_json(row[5]).get("score"),
"reason": db_to_json(row[5]).get("reason"),
"sender": row[6],
"canonical_alias": row[7],
"name": row[8],
"event_json": db_to_json(row[9]),
}

return event_report

return await self.db_pool.runInteraction(
"get_event_report", _get_event_report_txn, report_id
)

async def get_event_reports_paginate(
self,
start: int,
limit: int,
direction: Direction = Direction.BACKWARDS,
user_id: Optional[str] = None,
room_id: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], int]:
"""Retrieve a paginated list of event reports

Args:
start: event offset to begin the query from
limit: number of rows to retrieve
direction: Whether to fetch the most recent first (backwards) or the
oldest first (forwards)
user_id: search for user_id. Ignored if user_id is None
room_id: search for room_id. Ignored if room_id is None
Returns:
Tuple of:
json list of event reports
total number of event reports matching the filter criteria
"""

def _get_event_reports_paginate_txn(
txn: LoggingTransaction,
) -> Tuple[List[Dict[str, Any]], int]:
filters = []
args: List[object] = []

if user_id:
filters.append("er.user_id LIKE ?")
args.extend(["%" + user_id + "%"])
if room_id:
filters.append("er.room_id LIKE ?")
args.extend(["%" + room_id + "%"])

if direction == Direction.BACKWARDS:
order = "DESC"
else:
order = "ASC"

where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""

# We join on room_stats_state despite not using any columns from it
# because the join can influence the number of rows returned;
# e.g. a room that doesn't have state, maybe because it was deleted.
# The query returning the total count should be consistent with
# the query returning the results.
sql = """
SELECT COUNT(*) as total_event_reports
FROM event_reports AS er
JOIN room_stats_state ON room_stats_state.room_id = er.room_id
{}
""".format(
where_clause
)
txn.execute(sql, args)
count = cast(Tuple[int], txn.fetchone())[0]

sql = """
SELECT
er.id,
er.received_ts,
er.room_id,
er.event_id,
er.user_id,
er.content,
events.sender,
room_stats_state.canonical_alias,
room_stats_state.name
FROM event_reports AS er
LEFT JOIN events
ON events.event_id = er.event_id
JOIN room_stats_state
ON room_stats_state.room_id = er.room_id
{where_clause}
ORDER BY er.received_ts {order}
LIMIT ?
OFFSET ?
""".format(
where_clause=where_clause,
order=order,
)

args += [limit, start]
txn.execute(sql, args)

event_reports = []
for row in txn:
try:
s = db_to_json(row[5]).get("score")
r = db_to_json(row[5]).get("reason")
except Exception:
logger.error("Unable to parse json from event_reports: %s", row[0])
continue
event_reports.append(
{
"id": row[0],
"received_ts": row[1],
"room_id": row[2],
"event_id": row[3],
"user_id": row[4],
"score": s,
"reason": r,
"sender": row[6],
"canonical_alias": row[7],
"name": row[8],
}
)

return event_reports, count

return await self.db_pool.runInteraction(
"get_event_reports_paginate", _get_event_reports_paginate_txn
)

async def delete_event_report(self, report_id: int) -> bool:
"""Remove an event report from database.

Expand Down Expand Up @@ -2189,183 +2366,6 @@ async def add_event_report(
)
return next_id

async def get_event_report(self, report_id: int) -> Optional[Dict[str, Any]]:
"""Retrieve an event report

Args:
report_id: ID of reported event in database
Returns:
JSON dict of information from an event report or None if the
report does not exist.
"""

def _get_event_report_txn(
txn: LoggingTransaction, report_id: int
) -> Optional[Dict[str, Any]]:
sql = """
SELECT
er.id,
er.received_ts,
er.room_id,
er.event_id,
er.user_id,
er.content,
events.sender,
room_stats_state.canonical_alias,
room_stats_state.name,
event_json.json AS event_json
FROM event_reports AS er
LEFT JOIN events
ON events.event_id = er.event_id
JOIN event_json
ON event_json.event_id = er.event_id
JOIN room_stats_state
ON room_stats_state.room_id = er.room_id
WHERE er.id = ?
"""

txn.execute(sql, [report_id])
row = txn.fetchone()

if not row:
return None

event_report = {
"id": row[0],
"received_ts": row[1],
"room_id": row[2],
"event_id": row[3],
"user_id": row[4],
"score": db_to_json(row[5]).get("score"),
"reason": db_to_json(row[5]).get("reason"),
"sender": row[6],
"canonical_alias": row[7],
"name": row[8],
"event_json": db_to_json(row[9]),
}

return event_report

return await self.db_pool.runInteraction(
"get_event_report", _get_event_report_txn, report_id
)

async def get_event_reports_paginate(
self,
start: int,
limit: int,
direction: Direction = Direction.BACKWARDS,
user_id: Optional[str] = None,
room_id: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], int]:
"""Retrieve a paginated list of event reports

Args:
start: event offset to begin the query from
limit: number of rows to retrieve
direction: Whether to fetch the most recent first (backwards) or the
oldest first (forwards)
user_id: search for user_id. Ignored if user_id is None
room_id: search for room_id. Ignored if room_id is None
Returns:
Tuple of:
json list of event reports
total number of event reports matching the filter criteria
"""

def _get_event_reports_paginate_txn(
txn: LoggingTransaction,
) -> Tuple[List[Dict[str, Any]], int]:
filters = []
args: List[object] = []

if user_id:
filters.append("er.user_id LIKE ?")
args.extend(["%" + user_id + "%"])
if room_id:
filters.append("er.room_id LIKE ?")
args.extend(["%" + room_id + "%"])

if direction == Direction.BACKWARDS:
order = "DESC"
else:
order = "ASC"

where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""

# We join on room_stats_state despite not using any columns from it
# because the join can influence the number of rows returned;
# e.g. a room that doesn't have state, maybe because it was deleted.
# The query returning the total count should be consistent with
# the query returning the results.
sql = """
SELECT COUNT(*) as total_event_reports
FROM event_reports AS er
JOIN room_stats_state ON room_stats_state.room_id = er.room_id
{}
""".format(
where_clause
)
txn.execute(sql, args)
count = cast(Tuple[int], txn.fetchone())[0]

sql = """
SELECT
er.id,
er.received_ts,
er.room_id,
er.event_id,
er.user_id,
er.content,
events.sender,
room_stats_state.canonical_alias,
room_stats_state.name
FROM event_reports AS er
LEFT JOIN events
ON events.event_id = er.event_id
JOIN room_stats_state
ON room_stats_state.room_id = er.room_id
{where_clause}
ORDER BY er.received_ts {order}
LIMIT ?
OFFSET ?
""".format(
where_clause=where_clause,
order=order,
)

args += [limit, start]
txn.execute(sql, args)

event_reports = []
for row in txn:
try:
s = db_to_json(row[5]).get("score")
r = db_to_json(row[5]).get("reason")
except Exception:
logger.error("Unable to parse json from event_reports: %s", row[0])
continue
event_reports.append(
{
"id": row[0],
"received_ts": row[1],
"room_id": row[2],
"event_id": row[3],
"user_id": row[4],
"score": s,
"reason": r,
"sender": row[6],
"canonical_alias": row[7],
"name": row[8],
}
)

return event_reports, count

return await self.db_pool.runInteraction(
"get_event_reports_paginate", _get_event_reports_paginate_txn
)

async def block_room(self, room_id: str, user_id: str) -> None:
"""Marks the room as blocked.

Expand Down