Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Feature: Add filter to /messages. Add 'contains_url' to filter. #922

Merged
merged 5 commits into from
Jul 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ class Filter(object):
def __init__(self, filter_json):
self.filter_json = filter_json

self.types = self.filter_json.get("types", None)
self.not_types = self.filter_json.get("not_types", [])

self.rooms = self.filter_json.get("rooms", None)
self.not_rooms = self.filter_json.get("not_rooms", [])

self.senders = self.filter_json.get("senders", None)
self.not_senders = self.filter_json.get("not_senders", [])

self.contains_url = self.filter_json.get("contains_url", None)

def check(self, event):
"""Checks whether the filter matches the given event.

Expand All @@ -209,9 +220,10 @@ def check(self, event):
event.get("room_id", None),
sender,
event.get("type", None),
"url" in event.get("content", {})
)

def check_fields(self, room_id, sender, event_type):
def check_fields(self, room_id, sender, event_type, contains_url):
"""Checks whether the filter matches the given event fields.

Returns:
Expand All @@ -225,15 +237,20 @@ def check_fields(self, room_id, sender, event_type):

for name, match_func in literal_keys.items():
not_name = "not_%s" % (name,)
disallowed_values = self.filter_json.get(not_name, [])
disallowed_values = getattr(self, not_name)
if any(map(match_func, disallowed_values)):
return False

allowed_values = self.filter_json.get(name, None)
allowed_values = getattr(self, name)
if allowed_values is not None:
if not any(map(match_func, allowed_values)):
return False

contains_url_filter = self.filter_json.get("contains_url")
if contains_url_filter is not None:
if contains_url_filter != contains_url:
return False

return True

def filter_rooms(self, room_ids):
Expand Down
16 changes: 12 additions & 4 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def purge_history(self, room_id, event_id):

@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True):
as_client_event=True, event_filter=None):
"""Get messages in a room.

Args:
Expand All @@ -75,11 +75,11 @@ def get_messages(self, requester, room_id=None, pagin_config=None,
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
event_filter (Filter): Filter to apply to results or None
Returns:
dict: Pagination API results
"""
user_id = requester.user.to_string()
data_source = self.hs.get_event_sources().sources["room"]

if pagin_config.from_token:
room_token = pagin_config.from_token.room_key
Expand Down Expand Up @@ -129,8 +129,13 @@ def get_messages(self, requester, room_id=None, pagin_config=None,
room_id, max_topo
)

events, next_key = yield data_source.get_pagination_rows(
requester.user, source_config, room_id
events, next_key = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
direction=source_config.direction,
limit=source_config.limit,
event_filter=event_filter,
)

next_token = pagin_config.from_token.copy_and_replace(
Expand All @@ -144,6 +149,9 @@ def get_messages(self, requester, room_id=None, pagin_config=None,
"end": next_token.to_string(),
})

if event_filter:
events = event_filter.filter(events)

events = yield filter_events_for_client(
self.store,
user_id,
Expand Down
11 changes: 10 additions & 1 deletion synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
from synapse.api.errors import SynapseError, Codes, AuthError
from synapse.streams.config import PaginationConfig
from synapse.api.constants import EventTypes, Membership
from synapse.api.filtering import Filter
from synapse.types import UserID, RoomID, RoomAlias
from synapse.events.utils import serialize_event
from synapse.http.servlet import parse_json_object_from_request

import logging
import urllib
import ujson as json

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -327,12 +329,19 @@ def on_GET(self, request, room_id):
request, default_limit=10,
)
as_client_event = "raw" not in request.args
filter_bytes = request.args.get("filter", None)
if filter_bytes:
filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8")
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
handler = self.handlers.message_handler
msgs = yield handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
as_client_event=as_client_event
as_client_event=as_client_event,
event_filter=event_filter,
)

defer.returnValue((200, msgs))
Expand Down
82 changes: 82 additions & 0 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,18 @@ def _get_drainining_queue(self, room_id):

class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"

def __init__(self, hs):
super(EventsStore, self).__init__(hs)
self._clock = hs.get_clock()
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
self.register_background_update_handler(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
self._background_reindex_fields_sender,
)

self._event_persist_queue = _EventPeristenceQueue()

Expand Down Expand Up @@ -576,6 +581,11 @@ def event_dict(event):
"content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
"sender": event.sender,
"contains_url": (
"url" in event.content
and isinstance(event.content["url"], basestring)
),
}
for event, _ in events_and_contexts
],
Expand Down Expand Up @@ -1115,6 +1125,78 @@ def _count_messages(txn):
ret = yield self.runInteraction("count_messages", _count_messages)
defer.returnValue(ret)

@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)

INSERT_CLUMP_SIZE = 1000

def reindex_txn(txn):
sql = (
"SELECT stream_ordering, event_id, json FROM events"
" INNER JOIN event_json USING (event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
)

txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))

rows = txn.fetchall()
if not rows:
return 0

min_stream_id = rows[-1][0]

update_rows = []
for row in rows:
try:
event_id = row[1]
event_json = json.loads(row[2])
sender = event_json["sender"]
content = event_json["content"]

contains_url = "url" in content
if contains_url:
contains_url &= isinstance(content["url"], basestring)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue

update_rows.append((sender, contains_url, event_id))

sql = (
"UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
)

for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
clump = update_rows[index:index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)

progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(rows)
}

self._background_update_progress_txn(
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
)

return len(rows)

result = yield self.runInteraction(
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
)

if not result:
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)

defer.returnValue(result)

@defer.inlineCallbacks
def _background_reindex_origin_server_ts(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
Expand Down
60 changes: 60 additions & 0 deletions synapse/storage/schema/delta/33/event_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.storage.prepare_database import get_statements

import logging
import ujson

logger = logging.getLogger(__name__)


ALTER_TABLE = """
ALTER TABLE events ADD COLUMN sender TEXT;
ALTER TABLE events ADD COLUMN contains_url BOOLEAN;
Copy link
Contributor

@NegativeMjark NegativeMjark Jul 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add an index on (room_id, contains_url, ...whatever columns the pagination queries can by indexed against)?
It's probably not necessary if if people are posting things with URLs frequently enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, though I'm slightly loathed to put tooooo many indices on the events table. I'd also want to check what actual queries are common, as e.g. file pagination would require a (room_id, topological, stream_ordering, contains_url) index

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you mean a (room_id, contains_url, topological, stream_ordering) index?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, yes, quite.

"""


def run_create(cur, database_engine, *args, **kwargs):
for statement in get_statements(ALTER_TABLE.splitlines()):
cur.execute(statement)

cur.execute("SELECT MIN(stream_ordering) FROM events")
rows = cur.fetchall()
min_stream_id = rows[0][0]

cur.execute("SELECT MAX(stream_ordering) FROM events")
rows = cur.fetchall()
max_stream_id = rows[0][0]

if min_stream_id is not None and max_stream_id is not None:
progress = {
"target_min_stream_id_inclusive": min_stream_id,
"max_stream_id_exclusive": max_stream_id + 1,
"rows_inserted": 0,
}
progress_json = ujson.dumps(progress)

sql = (
"INSERT into background_updates (update_name, progress_json)"
" VALUES (?, ?)"
)

sql = database_engine.convert_param_style(sql)

cur.execute(sql, ("event_fields_sender_url", progress_json))


def run_upgrade(cur, database_engine, *args, **kwargs):
pass
56 changes: 55 additions & 1 deletion synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,54 @@ def upper_bound(token, engine, inclusive=True):
)


def filter_to_clause(event_filter):
# NB: This may create SQL clauses that don't optimise well (and we don't
# have indices on all possible clauses). E.g. it may create
# "room_id == X AND room_id != X", which postgres doesn't optimise.

if not event_filter:
return "", []

clauses = []
args = []

if event_filter.types:
clauses.append(
"(%s)" % " OR ".join("type = ?" for _ in event_filter.types)
)
args.extend(event_filter.types)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently allow wildcard matches for event types.

"types": lambda v: _matches_wildcard(event_type, v)


for typ in event_filter.not_types:
clauses.append("type != ?")
args.append(typ)

if event_filter.senders:
clauses.append(
"(%s)" % " OR ".join("sender = ?" for _ in event_filter.senders)
)
args.extend(event_filter.senders)

for sender in event_filter.not_senders:
clauses.append("sender != ?")
args.append(sender)

if event_filter.rooms:
clauses.append(
"(%s)" % " OR ".join("room_id = ?" for _ in event_filter.rooms)
)
args.extend(event_filter.rooms)

for room_id in event_filter.not_rooms:
clauses.append("room_id != ?")
args.append(room_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding the rooms filter for /messages? Either the room is filtered out by the filter or it isn't.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have this function do the expected thing and filter on room_ids too, so its usable elsewhere. Separately I guess we could check that the room_id matches the filter, but even if it doesn't I think the query would be relatively quick

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'd be impressed if postgres had a hard time optimising room_id = '1' and room_id != '1'. So this seems fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sighs


if event_filter.contains_url:
clauses.append("contains_url = ?")
args.append(event_filter.contains_url)

return " AND ".join(clauses), args


class StreamStore(SQLBaseStore):
@defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
Expand Down Expand Up @@ -320,7 +368,7 @@ def f(txn):

@defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1):
direction='b', limit=-1, event_filter=None):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
Expand All @@ -344,6 +392,12 @@ def paginate_room_events(self, room_id, from_key, to_key=None,
RoomStreamToken.parse(to_key), self.database_engine
))

filter_clause, filter_args = filter_to_clause(event_filter)

if filter_clause:
bounds += " AND " + filter_clause
args.extend(filter_args)

if int(limit) > 0:
args.append(int(limit))
limit_str = " LIMIT ?"
Expand Down
1 change: 1 addition & 0 deletions tests/storage/event_injector.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, hs):
def create_room(self, room):
builder = self.event_builder_factory.new({
"type": EventTypes.Create,
"sender": "",
"room_id": room.to_string(),
"content": {},
})
Expand Down
Loading