Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pusher worker
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Feb 21, 2020
1 parent d85aa03 commit 27260e1
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 283 deletions.
51 changes: 51 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ def start_listening(self, listeners):

self.get_tcp_replication().start_replication(self)

def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)

def build_tcp_replication(self):
return GenericWorkerReplicationHandler(self)

Expand All @@ -558,6 +561,9 @@ def __init__(self, hs):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()

self.notify_pushers = hs.config.start_pushers
self.pusher_pool = hs.get_pusherpool()

async def on_rdata(self, stream_name, token, rows):
await super(GenericWorkerReplicationHandler, self).on_rdata(
stream_name, token, rows
Expand Down Expand Up @@ -595,6 +601,8 @@ async def process_and_notify(self, stream_name, token, rows):
self.notifier.on_new_room_event(
event, token, max_token, extra_users
)

await self.pusher_pool.on_new_notifications(token, token)
elif stream_name == "push_rules":
self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows]
Expand All @@ -607,6 +615,9 @@ async def process_and_notify(self, stream_name, token, rows):
self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows]
)
await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
elif stream_name == "typing":
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
Expand All @@ -628,9 +639,35 @@ async def process_and_notify(self, stream_name, token, rows):
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows]
)
elif stream_name == "pushers":
for row in rows:
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
except Exception:
logger.exception("Error processing replication")

def stop_pusher(self, user_id, app_id, pushkey):
if not self.notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
pushers_for_user = self.pusher_pool.pushers.get(user_id, {})
pusher = pushers_for_user.pop(key, None)
if pusher is None:
return
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()

async def start_pusher(self, user_id, app_id, pushkey):
if not self.notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
return await self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)


def start(config_options):
try:
Expand All @@ -648,6 +685,7 @@ def start(config_options):
"synapse.app.frontend_proxy",
"synapse.app.generic_worker",
"synapse.app.media_repository",
"synapse.app.pusher",
"synapse.app.synchrotron",
)

Expand All @@ -664,6 +702,19 @@ def start(config_options):
# Force the appservice to start since they will be disabled in the main config
config.notify_appservices = True

if config.worker_app == "synapse.app.pusher":
if config.start_pushers:
sys.stderr.write(
"\nThe pushers must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``start_pushers: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.start_pushers = True

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

ss = GenericWorkerServer(
Expand Down
209 changes: 4 additions & 205 deletions synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,213 +13,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys

from twisted.internet import defer, reactor
from twisted.web.resource import NoResource

import synapse
from synapse import events
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string

logger = logging.getLogger("synapse.app.pusher")


class PusherSlaveStore(
SlavedEventStore,
SlavedPusherStore,
SlavedReceiptsStore,
SlavedAccountDataStore,
RoomStore,
):
update_pusher_last_stream_ordering_and_success = __func__(
DataStore.update_pusher_last_stream_ordering_and_success
)

update_pusher_failing_since = __func__(DataStore.update_pusher_failing_since)

update_pusher_last_stream_ordering = __func__(
DataStore.update_pusher_last_stream_ordering
)

get_throttle_params_by_room = __func__(DataStore.get_throttle_params_by_room)

set_throttle_params = __func__(DataStore.set_throttle_params)

get_time_of_last_push_action_before = __func__(
DataStore.get_time_of_last_push_action_before
)

get_profile_displayname = __func__(DataStore.get_profile_displayname)


class PusherServer(HomeServer):
DATASTORE_CLASS = PusherSlaveStore

def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)

def _listen_http(self, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
site_tag = listener_config.get("tag", port)
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)

root_resource = create_resource_tree(resources, NoResource())

_base.listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
)

logger.info("Synapse pusher now listening on port %d", port)

def start_listening(self, listeners):
for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
_base.listen_tcp(
listener["bind_addresses"],
listener["port"],
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
"Metrics listener configured, but "
"enable_metrics is not True!"
)
)
else:
_base.listen_metrics(listener["bind_addresses"], listener["port"])
else:
logger.warning("Unrecognized listener type: %s", listener["type"])

self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return PusherReplicationHandler(self)


class PusherReplicationHandler(ReplicationClientHandler):
def __init__(self, hs):
super(PusherReplicationHandler, self).__init__(hs.get_datastore())

self.pusher_pool = hs.get_pusherpool()

async def on_rdata(self, stream_name, token, rows):
await super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)

@defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows):
try:
if stream_name == "pushers":
for row in rows:
if row.deleted:
yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
yield self.pusher_pool.on_new_notifications(token, token)
elif stream_name == "receipts":
yield self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
except Exception:
logger.exception("Error poking pushers")

def stop_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
pushers_for_user = self.pusher_pool.pushers.get(user_id, {})
pusher = pushers_for_user.pop(key, None)
if pusher is None:
return
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()

def start_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)


def start(config_options):
try:
config = HomeServerConfig.load_config("Synapse pusher", config_options)
except ConfigError as e:
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)

assert config.worker_app == "synapse.app.pusher"

events.USE_FROZEN_DICTS = config.use_frozen_dicts

if config.start_pushers:
sys.stderr.write(
"\nThe pushers must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``start_pushers: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.start_pushers = True

ps = PusherServer(
config.server_name,
config=config,
version_string="Synapse/" + get_version_string(synapse),
)

setup_logging(ps, config, use_worker_options=True)

ps.setup()

def start():
_base.start(ps, config.worker_listeners)
ps.get_pusherpool().start()

reactor.addSystemEventTrigger("before", "startup", start)

_base.start_worker_reactor("synapse-pusher", config)
import sys

from synapse.app.generic_worker import start
from synapse.util.logcontext import LoggingContext

if __name__ == "__main__":
with LoggingContext("main"):
ps = start(sys.argv[1:])
start(sys.argv[1:])
Loading

0 comments on commit 27260e1

Please sign in to comment.