Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2899 from matrix-org/erikj/split_pushers
Browse files Browse the repository at this point in the history
Split PusherStore
  • Loading branch information
erikjohnston authored Feb 23, 2018
2 parents d095775 + c2ecfcc commit 1cf9e07
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
12 changes: 3 additions & 9 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,10 +17,10 @@
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker

from synapse.storage import DataStore
from synapse.storage.pusher import PusherWorkerStore


class SlavedPusherStore(BaseSlavedStore):
class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):

def __init__(self, db_conn, hs):
super(SlavedPusherStore, self).__init__(db_conn, hs)
Expand All @@ -28,13 +29,6 @@ def __init__(self, db_conn, hs):
extra_tables=[("deleted_pushers", "stream_id")],
)

get_all_pushers = DataStore.get_all_pushers.__func__
get_pushers_by = DataStore.get_pushers_by.__func__
get_pushers_by_app_id_and_pushkey = (
DataStore.get_pushers_by_app_id_and_pushkey.__func__
)
_decode_pushers_rows = DataStore._decode_pushers_rows.__func__

def stream_positions(self):
result = super(SlavedPusherStore, self).stream_positions()
result["pushers"] = self._pushers_id_gen.get_current_token()
Expand Down
11 changes: 7 additions & 4 deletions synapse/storage/pusher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +28,7 @@
logger = logging.getLogger(__name__)


class PusherStore(SQLBaseStore):
class PusherWorkerStore(SQLBaseStore):
def _decode_pushers_rows(self, rows):
for r in rows:
dataJson = r['data']
Expand Down Expand Up @@ -102,9 +103,6 @@ def get_pushers(txn):
rows = yield self.runInteraction("get_all_pushers", get_pushers)
defer.returnValue(rows)

def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()

def get_all_updated_pushers(self, last_id, current_id, limit):
if last_id == current_id:
return defer.succeed(([], []))
Expand Down Expand Up @@ -177,6 +175,11 @@ def get_all_updated_pushers_rows_txn(txn):
"get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
)


class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()

@cachedInlineCallbacks(num_args=1, max_entries=15000)
def get_if_user_has_pusher(self, user_id):
# This only exists for the cachedList decorator
Expand Down

0 comments on commit 1cf9e07

Please sign in to comment.