Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Create a worker for event creation #2854

Merged
merged 7 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/workers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,14 @@ the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
file. For example::

worker_main_http_uri: http://127.0.0.1:8008


``synapse.app.event_creator``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Handles non-state event creation. It can handle REST endpoints matching:

^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also be handling the macro endpoints for creating events (https://matrix.org/docs/spec/client_server/r0.3.0.html#post-matrix-client-r0-rooms-roomid-ban etc)? Or are they deliberately being left on the master as they're not that common?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're currently only handling non-state events on this worker for now, as sometimes the membership state changes end up doing complicated things that probably need to be done on the master.


It will create events locally and then send them on to the main synapse
instance to be persisted and handled.
170 changes: 170 additions & 0 deletions synapse/app/event_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys

import synapse
from synapse import events
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import RoomSendEventRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import reactor
from twisted.web.resource import Resource

logger = logging.getLogger("synapse.app.event_creator")


class EventCreatorSlavedStore(
SlavedDeviceStore,
SlavedClientIpStore,
SlavedApplicationServiceStore,
SlavedEventStore,
SlavedRegistrationStore,
RoomStore,
BaseSlavedStore,
):
pass


class EventCreatorServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")

def _listen_http(self, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
site_tag = listener_config.get("tag", port)
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
RoomSendEventRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
"/_matrix/client/v2_alpha": resource,
"/_matrix/client/api/v1": resource,
})

root_resource = create_resource_tree(resources, Resource())

_base.listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
)
)

logger.info("Synapse event creator now listening on port %d", port)

def start_listening(self, listeners):
for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
_base.listen_tcp(
listener["bind_addresses"],
listener["port"],
manhole(
username="matrix",
password="rabbithole",
globals={"hs": self},
)
)
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return ReplicationClientHandler(self.get_datastore())


def start(config_options):
try:
config = HomeServerConfig.load_config(
"Synapse event creator", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)

assert config.worker_app == "synapse.app.event_creator"

assert config.worker_replication_http_port is not None

setup_logging(config, use_worker_options=True)

events.USE_FROZEN_DICTS = config.use_frozen_dicts

database_engine = create_engine(config.database_config)

tls_server_context_factory = context_factory.ServerContextFactory(config)

ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)

ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners)

def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()

reactor.callWhenRunning(start)

_base.start_worker_reactor("synapse-event-creator", config)


if __name__ == '__main__':
with LoggingContext("main"):
start(sys.argv[1:])
4 changes: 4 additions & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
check_requirements
from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.key.v1.server_key_resource import LocalKey
Expand Down Expand Up @@ -219,6 +220,9 @@ def _configure_named_resource(self, name, compress=False):
if name == "metrics" and self.get_config().enable_metrics:
resources[METRICS_PREFIX] = MetricsResource(self)

if name == "replication":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs documenting?

resources[REPLICATION_PREFIX] = ReplicationRestResource(self)

return resources

def start_listening(self):
Expand Down
8 changes: 8 additions & 0 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@ def read_config(self, config):
self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_file = config.get("worker_log_file")
self.worker_log_config = config.get("worker_log_config")

# The host used to connect to the main synapse
self.worker_replication_host = config.get("worker_replication_host", None)

# The port on the main synapse for TCP replication
self.worker_replication_port = config.get("worker_replication_port", None)

# The port on the main synapse for HTTP replication endpoint
self.worker_replication_http_port = config.get("worker_replication_http_port")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need documenting in workers.rst ?


self.worker_name = config.get("worker_name", self.worker_app)

self.worker_main_http_uri = config.get("worker_main_http_uri", None)
Expand Down
72 changes: 72 additions & 0 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# limitations under the License.


from frozendict import frozendict


class EventContext(object):
"""
Attributes:
Expand Down Expand Up @@ -73,3 +76,72 @@ def __init__(self):
self.prev_state_events = None

self.app_service = None

def serialize(self):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`

Returns:
dict
"""
return {
"current_state_ids": _encode_state_dict(self.current_state_ids),
"prev_state_ids": _encode_state_dict(self.prev_state_ids),
"state_group": self.state_group,
"rejected": self.rejected,
"push_actions": self.push_actions,
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
}

@staticmethod
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.

Args:
store (DataStore): Used to convert AS ID to AS object
input (dict): A dict produced by `serialize`

Returns:
EventContext
"""
context = EventContext()
context.current_state_ids = _decode_state_dict(input["current_state_ids"])
context.prev_state_ids = _decode_state_dict(input["prev_state_ids"])
context.state_group = input["state_group"]
context.rejected = input["rejected"]
context.push_actions = input["push_actions"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]

app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = store.get_app_service_by_id(app_service_id)

return context


def _encode_state_dict(state_dict):
"""Since dicts of (type, state_key) -> event_id cannot be serialized in
JSON we need to convert them to a form that can.
"""
if state_dict is None:
return None

return [
(etype, state_key, v)
for (etype, state_key), v in state_dict.iteritems()
]


def _decode_state_dict(input):
"""Decodes a state dict encoded using `_encode_state_dict` above
"""
if input is None:
return None

return frozendict({(etype, state_key,): v for etype, state_key, v in input})
33 changes: 22 additions & 11 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import unfreeze
from synapse.visibility import filter_events_for_client
from synapse.replication.http.send_event import send_event_to_master

from ._base import BaseHandler

Expand Down Expand Up @@ -312,6 +313,9 @@ def __init__(self, hs):
self.server_name = hs.hostname
self.ratelimiter = hs.get_ratelimiter()
self.notifier = hs.get_notifier()
self.config = hs.config

self.http_client = hs.get_simple_http_client()

# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
Expand Down Expand Up @@ -403,11 +407,6 @@ def send_nonmember_event(self, requester, event, context, ratelimit=True):
"Tried to send member event through non-member codepath"
)

# We check here if we are currently being rate limited, so that we
# don't do unnecessary work. We check again just before we actually
# send the event.
yield self.base_handler.ratelimit(requester, update=False)

user = UserID.from_string(event.sender)

assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
Expand All @@ -424,12 +423,6 @@ def send_nonmember_event(self, requester, event, context, ratelimit=True):
ratelimit=ratelimit,
)

if event.type == EventTypes.Message:
presence = self.hs.get_presence_handler()
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
preserve_fn(presence.bump_presence_active_time)(user)

@defer.inlineCallbacks
def deduplicate_state_event(self, event, context):
"""
Expand Down Expand Up @@ -564,6 +557,18 @@ def handle_new_client_event(
):
# We now need to go and hit out to wherever we need to hit out to.

# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
)
return

if ratelimit:
yield self.base_handler.ratelimit(requester)

Expand Down Expand Up @@ -697,3 +702,9 @@ def _notify():
)

preserve_fn(_notify)()

if event.type == EventTypes.Message:
presence = self.hs.get_presence_handler()
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
preserve_fn(presence.bump_presence_active_time)(requester.user)
Loading