Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2854 from matrix-org/erikj/event_create_worker
Browse files Browse the repository at this point in the history
Create a worker for event creation
  • Loading branch information
erikjohnston authored Feb 13, 2018
2 parents d627174 + 059d3a6 commit c0c9327
Show file tree
Hide file tree
Showing 12 changed files with 534 additions and 25 deletions.
39 changes: 34 additions & 5 deletions docs/workers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,29 @@ requests made to the federation port. The caveats regarding running a
reverse-proxy on the federation port still apply (see
https://github.com/matrix-org/synapse/blob/master/README.rst#reverse-proxying-the-federation-port).

To enable workers, you need to add a replication listener to the master synapse, e.g.::
To enable workers, you need to add two replication listeners to the master
synapse, e.g.::

listeners:
# The TCP replication port
- port: 9092
bind_address: '127.0.0.1'
type: replication
# The HTTP replication port
- port: 9093
bind_address: '127.0.0.1'
type: http
resources:
- names: [replication]

Under **no circumstances** should this replication API listener be exposed to the
public internet; it currently implements no authentication whatsoever and is
Under **no circumstances** should these replication API listeners be exposed to
the public internet; it currently implements no authentication whatsoever and is
unencrypted.

(Roughly, the TCP port is used for streaming data from the master to the
workers, and the HTTP port for the workers to send data to the main
synapse process.)

You then create a set of configs for the various worker processes. These
should be worker configuration files, and should be stored in a dedicated
subdirectory, to allow synctl to manipulate them.
Expand All @@ -52,8 +64,13 @@ You should minimise the number of overrides though to maintain a usable config.

You must specify the type of worker application (``worker_app``). The currently
available worker applications are listed below. You must also specify the
replication endpoint that it's talking to on the main synapse process
(``worker_replication_host`` and ``worker_replication_port``).
replication endpoints that it's talking to on the main synapse process.
``worker_replication_host`` should specify the host of the main synapse,
``worker_replication_port`` should point to the TCP replication listener port and
``worker_replication_http_port`` should point to the HTTP replication port.

Currently, only the ``event_creator`` worker requires specifying
``worker_replication_http_port``.

For instance::

Expand All @@ -62,6 +79,7 @@ For instance::
# The replication listener on the synapse to talk to.
worker_replication_host: 127.0.0.1
worker_replication_port: 9092
worker_replication_http_port: 9093

worker_listeners:
- type: http
Expand Down Expand Up @@ -207,3 +225,14 @@ the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
file. For example::

worker_main_http_uri: http://127.0.0.1:8008


``synapse.app.event_creator``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Handles non-state event creation. It can handle REST endpoints matching:

^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send

It will create events locally and then send them on to the main synapse
instance to be persisted and handled.
170 changes: 170 additions & 0 deletions synapse/app/event_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys

import synapse
from synapse import events
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import RoomSendEventRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import reactor
from twisted.web.resource import Resource

logger = logging.getLogger("synapse.app.event_creator")


class EventCreatorSlavedStore(
SlavedDeviceStore,
SlavedClientIpStore,
SlavedApplicationServiceStore,
SlavedEventStore,
SlavedRegistrationStore,
RoomStore,
BaseSlavedStore,
):
pass


class EventCreatorServer(HomeServer):
def setup(self):
logger.info("Setting up.")
self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
logger.info("Finished setting up.")

def _listen_http(self, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
site_tag = listener_config.get("tag", port)
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
RoomSendEventRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
"/_matrix/client/v2_alpha": resource,
"/_matrix/client/api/v1": resource,
})

root_resource = create_resource_tree(resources, Resource())

_base.listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
)
)

logger.info("Synapse event creator now listening on port %d", port)

def start_listening(self, listeners):
for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
_base.listen_tcp(
listener["bind_addresses"],
listener["port"],
manhole(
username="matrix",
password="rabbithole",
globals={"hs": self},
)
)
else:
logger.warn("Unrecognized listener type: %s", listener["type"])

self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return ReplicationClientHandler(self.get_datastore())


def start(config_options):
try:
config = HomeServerConfig.load_config(
"Synapse event creator", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)

assert config.worker_app == "synapse.app.event_creator"

assert config.worker_replication_http_port is not None

setup_logging(config, use_worker_options=True)

events.USE_FROZEN_DICTS = config.use_frozen_dicts

database_engine = create_engine(config.database_config)

tls_server_context_factory = context_factory.ServerContextFactory(config)

ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)

ss.setup()
ss.get_handlers()
ss.start_listening(config.worker_listeners)

def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()

reactor.callWhenRunning(start)

_base.start_worker_reactor("synapse-event-creator", config)


if __name__ == '__main__':
with LoggingContext("main"):
start(sys.argv[1:])
4 changes: 4 additions & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
check_requirements
from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.key.v1.server_key_resource import LocalKey
Expand Down Expand Up @@ -219,6 +220,9 @@ def _configure_named_resource(self, name, compress=False):
if name == "metrics" and self.get_config().enable_metrics:
resources[METRICS_PREFIX] = MetricsResource(self)

if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)

return resources

def start_listening(self):
Expand Down
8 changes: 8 additions & 0 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@ def read_config(self, config):
self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_file = config.get("worker_log_file")
self.worker_log_config = config.get("worker_log_config")

# The host used to connect to the main synapse
self.worker_replication_host = config.get("worker_replication_host", None)

# The port on the main synapse for TCP replication
self.worker_replication_port = config.get("worker_replication_port", None)

# The port on the main synapse for HTTP replication endpoint
self.worker_replication_http_port = config.get("worker_replication_http_port")

self.worker_name = config.get("worker_name", self.worker_app)

self.worker_main_http_uri = config.get("worker_main_http_uri", None)
Expand Down
72 changes: 72 additions & 0 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# limitations under the License.


from frozendict import frozendict


class EventContext(object):
"""
Attributes:
Expand Down Expand Up @@ -73,3 +76,72 @@ def __init__(self):
self.prev_state_events = None

self.app_service = None

def serialize(self):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
Returns:
dict
"""
return {
"current_state_ids": _encode_state_dict(self.current_state_ids),
"prev_state_ids": _encode_state_dict(self.prev_state_ids),
"state_group": self.state_group,
"rejected": self.rejected,
"push_actions": self.push_actions,
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
}

@staticmethod
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
Args:
store (DataStore): Used to convert AS ID to AS object
input (dict): A dict produced by `serialize`
Returns:
EventContext
"""
context = EventContext()
context.current_state_ids = _decode_state_dict(input["current_state_ids"])
context.prev_state_ids = _decode_state_dict(input["prev_state_ids"])
context.state_group = input["state_group"]
context.rejected = input["rejected"]
context.push_actions = input["push_actions"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]

app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = store.get_app_service_by_id(app_service_id)

return context


def _encode_state_dict(state_dict):
"""Since dicts of (type, state_key) -> event_id cannot be serialized in
JSON we need to convert them to a form that can.
"""
if state_dict is None:
return None

return [
(etype, state_key, v)
for (etype, state_key), v in state_dict.iteritems()
]


def _decode_state_dict(input):
"""Decodes a state dict encoded using `_encode_state_dict` above
"""
if input is None:
return None

return frozendict({(etype, state_key,): v for etype, state_key, v in input})
Loading

0 comments on commit c0c9327

Please sign in to comment.