Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'develop' into markjh/direct_to_device_federation
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Haines committed Sep 8, 2016
2 parents 43954d0 + 61cd9af commit fa9d36e
Show file tree
Hide file tree
Showing 13 changed files with 542 additions and 132 deletions.
21 changes: 21 additions & 0 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,30 @@


class EventContext(object):
__slots__ = [
"current_state_ids",
"prev_state_ids",
"state_group",
"rejected",
"push_actions",
"prev_group",
"delta_ids",
"prev_state_events",
]

def __init__(self):
# The current state including the current event
self.current_state_ids = None
# The current state excluding the current event
self.prev_state_ids = None
self.state_group = None

self.rejected = False
self.push_actions = []

# A previously persisted state group and a delta between that
# and this state.
self.prev_group = None
self.delta_ids = None

self.prev_state_events = None
19 changes: 11 additions & 8 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,11 +832,13 @@ def on_send_join_request(self, origin, pdu):

new_pdu = event

message_handler = self.hs.get_handlers().message_handler
destinations = yield message_handler.get_joined_hosts_for_room_from_state(
context
users_in_room = yield self.store.get_joined_users_from_context(event, context)

destinations = set(
get_domain_from_id(user_id) for user_id in users_in_room
if not self.hs.is_mine_id(user_id)
)
destinations = set(destinations)

destinations.discard(origin)

logger.debug(
Expand Down Expand Up @@ -1055,11 +1057,12 @@ def on_send_leave_request(self, origin, pdu):

new_pdu = event

message_handler = self.hs.get_handlers().message_handler
destinations = yield message_handler.get_joined_hosts_for_room_from_state(
context
users_in_room = yield self.store.get_joined_users_from_context(event, context)

destinations = set(
get_domain_from_id(user_id) for user_id in users_in_room
if not self.hs.is_mine_id(user_id)
)
destinations = set(destinations)
destinations.discard(origin)

logger.debug(
Expand Down
44 changes: 6 additions & 38 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.metrics import measure_func
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.visibility import filter_events_for_client

from ._base import BaseHandler
Expand Down Expand Up @@ -945,7 +944,12 @@ def is_inviter_member_event(e):
event_stream_id, max_stream_id
)

destinations = yield self.get_joined_hosts_for_room_from_state(context)
users_in_room = yield self.store.get_joined_users_from_context(event, context)

destinations = [
get_domain_from_id(user_id) for user_id in users_in_room
if not self.hs.is_mine_id(user_id)
]

@defer.inlineCallbacks
def _notify():
Expand All @@ -963,39 +967,3 @@ def _notify():
preserve_fn(federation_handler.handle_new_event)(
event, destinations=destinations,
)

def get_joined_hosts_for_room_from_state(self, context):
state_group = context.state_group
if not state_group:
# If state_group is None it means it has yet to be assigned a
# state group, i.e. we need to make sure that calls with a state_group
# of None don't hit previous cached calls with a None state_group.
# To do this we set the state_group to a new object as object() != object()
state_group = object()

return self._get_joined_hosts_for_room_from_state(
state_group, context.current_state_ids
)

@cachedInlineCallbacks(num_args=1, cache_context=True)
def _get_joined_hosts_for_room_from_state(self, state_group, current_state_ids,
cache_context):

# Don't bother getting state for people on the same HS
current_state = yield self.store.get_events([
e_id for key, e_id in current_state_ids.items()
if key[0] == EventTypes.Member and not self.hs.is_mine_id(key[1])
])

destinations = set()
for e in current_state.itervalues():
try:
if e.type == EventTypes.Member:
if e.content["membership"] == Membership.JOIN:
destinations.add(get_domain_from_id(e.state_key))
except SynapseError:
logger.warn(
"Failed to get destination from event %s", e.event_id
)

defer.returnValue(destinations)
26 changes: 18 additions & 8 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@

get_updates_counter = metrics.register_counter("get_updates", labels=["type"])

notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
state_transition_counter = metrics.register_counter(
"state_transition", labels=["from", "to"]
)


# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
Expand Down Expand Up @@ -939,27 +944,32 @@ def get_all_presence_updates(self, last_id, current_id):
def should_notify(old_state, new_state):
"""Decides if a presence state change should be sent to interested parties.
"""
if old_state == new_state:
return False

if old_state.status_msg != new_state.status_msg:
notify_reason_counter.inc("status_msg_change")
return True

if old_state.state == PresenceState.ONLINE:
if new_state.state != PresenceState.ONLINE:
# Always notify for online -> anything
return True
if old_state.state != new_state.state:
notify_reason_counter.inc("state_change")
state_transition_counter.inc(old_state.state, new_state.state)
return True

if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
notify_reason_counter.inc("current_active_change")
return True

if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Only notify about last active bumps if we're not currently acive
if not (old_state.currently_active and new_state.currently_active):
if not new_state.currently_active:
notify_reason_counter.inc("last_active_change_online")
return True

elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Always notify for a transition where last active gets bumped.
return True

if old_state.state != new_state.state:
notify_reason_counter.inc("last_active_change_not_online")
return True

return False
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def __init__(self, db_conn, hs):
_get_state_groups_from_groups = (
StateStore.__dict__["_get_state_groups_from_groups"]
)
_get_state_groups_from_groups_txn = (
DataStore._get_state_groups_from_groups_txn.__func__
)
_get_state_group_from_group = (
StateStore.__dict__["_get_state_group_from_group"]
)
Expand Down
9 changes: 8 additions & 1 deletion synapse/rest/media/v1/download_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ def render_GET(self, request):
@request_handler()
@defer.inlineCallbacks
def _async_render_GET(self, request):
request.setHeader("Content-Security-Policy", "sandbox")
request.setHeader(
"Content-Security-Policy",
"default-src 'none';"
" script-src 'none';"
" plugin-types application/pdf;"
" style-src 'unsafe-inline';"
" object-src 'self';"
)
server_name, media_id, name = parse_media_id(request)
if server_name == self.server_name:
yield self._respond_local_file(request, media_id, name)
Expand Down
33 changes: 31 additions & 2 deletions synapse/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ def _gen_state_id():


class _StateCacheEntry(object):
__slots__ = ["state", "state_group", "state_id"]
__slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"]

def __init__(self, state, state_group):
def __init__(self, state, state_group, prev_group=None, delta_ids=None):
self.state = state
self.state_group = state_group

self.prev_group = prev_group
self.delta_ids = delta_ids

# The `state_id` is a unique ID we generate that can be used as ID for
# this collection of state. Usually this would be the same as the
# state group, but on worker instances we can't generate a new state
Expand Down Expand Up @@ -245,11 +248,20 @@ def compute_event_context(self, event, old_state=None):
if key in context.prev_state_ids:
replaces = context.prev_state_ids[key]
event.unsigned["replaces_state"] = replaces

context.current_state_ids = dict(context.prev_state_ids)
context.current_state_ids[key] = event.event_id

context.prev_group = entry.prev_group
context.delta_ids = entry.delta_ids
if context.delta_ids is not None:
context.delta_ids[key] = event.event_id
else:
context.current_state_ids = context.prev_state_ids

context.prev_group = entry.prev_group
context.delta_ids = entry.delta_ids

context.prev_state_events = []
defer.returnValue(context)

Expand Down Expand Up @@ -283,6 +295,8 @@ def resolve_state_groups(self, room_id, event_ids, event_type=None, state_key=""
defer.returnValue(_StateCacheEntry(
state=state_list,
state_group=name,
prev_group=name,
delta_ids={},
))

with (yield self.resolve_linearizer.queue(group_names)):
Expand Down Expand Up @@ -340,9 +354,24 @@ def resolve_state_groups(self, room_id, event_ids, event_type=None, state_key=""
if hasattr(self.store, "get_next_state_group"):
state_group = self.store.get_next_state_group()

prev_group = None
delta_ids = None
for old_group, old_ids in state_groups_ids.items():
if not set(new_state.iterkeys()) - set(old_ids.iterkeys()):
n_delta_ids = {
k: v
for k, v in new_state.items()
if old_ids.get(k) != v
}
if not delta_ids or len(n_delta_ids) < len(delta_ids):
prev_group = old_group
delta_ids = n_delta_ids

cache = _StateCacheEntry(
state=new_state,
state_group=state_group,
prev_group=prev_group,
delta_ids=delta_ids,
)

if self._state_cache is not None:
Expand Down
8 changes: 6 additions & 2 deletions synapse/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ def store_device(self, user_id, device_id,
or_ignore=ignore_if_known,
)
except Exception as e:
logger.error("store_device with device_id=%s failed: %s",
device_id, e)
logger.error("store_device with device_id=%s(%r) user_id=%s(%r)"
" display_name=%s(%r) failed: %s",
type(device_id).__name__, device_id,
type(user_id).__name__, user_id,
type(initial_device_display_name).__name__,
initial_device_display_name, e)
raise StoreError(500, "Problem storing device.")

def get_device(self, user_id, device_id):
Expand Down
68 changes: 67 additions & 1 deletion synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,11 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,

# insert into the state_group, state_groups_state and
# event_to_state_groups tables.
self._store_mult_state_groups_txn(txn, ((event, context),))
try:
self._store_mult_state_groups_txn(txn, ((event, context),))
except Exception:
logger.exception("")
raise

metadata_json = encode_json(
event.internal_metadata.get_dict()
Expand Down Expand Up @@ -1543,6 +1547,9 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
)
event_rows = txn.fetchall()

for event_id, state_key in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))

# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
Expand Down Expand Up @@ -1582,7 +1589,66 @@ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
(room_id, topological_ordering, topological_ordering)
)

state_rows = txn.fetchall()
state_groups_to_delete = [sg for sg, in state_rows]

# Now we get all the state groups that rely on these state groups
new_state_edges = []
chunks = [
state_groups_to_delete[i:i + 100]
for i in xrange(0, len(state_groups_to_delete), 100)
]
for chunk in chunks:
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=chunk,
retcols=["state_group"],
keyvalues={},
)
new_state_edges.extend(row["state_group"] for row in rows)

# Now we turn the state groups that reference to-be-deleted state groups
# to non delta versions.
for new_state_edge in new_state_edges:
curr_state = self._get_state_groups_from_groups_txn(
txn, [new_state_edge], types=None
)
curr_state = curr_state[new_state_edge]

self._simple_delete_txn(
txn,
table="state_groups_state",
keyvalues={
"state_group": new_state_edge,
}
)

self._simple_delete_txn(
txn,
table="state_group_edges",
keyvalues={
"state_group": new_state_edge,
}
)

self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": new_state_edge,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in curr_state.items()
],
)

txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
Expand Down
Loading

0 comments on commit fa9d36e

Please sign in to comment.