Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #945 from matrix-org/rav/background_reindex
Browse files Browse the repository at this point in the history
Create index on user_ips in the background
  • Loading branch information
richvdh authored Jul 25, 2016
2 parents e73ad8d + 42f4feb commit bf3de7b
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 23 deletions.
98 changes: 85 additions & 13 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

from ._base import SQLBaseStore
from . import engines

from twisted.internet import defer

Expand Down Expand Up @@ -87,10 +88,12 @@ def __init__(self, hs):

@defer.inlineCallbacks
def start_doing_background_updates(self):
while True:
if self._background_update_timer is not None:
return
assert(self._background_update_timer is not None,
"background updates already running")

logger.info("Starting background schema updates")

while True:
sleep = defer.Deferred()
self._background_update_timer = self._clock.call_later(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
Expand All @@ -101,22 +104,23 @@ def start_doing_background_updates(self):
self._background_update_timer = None

try:
result = yield self.do_background_update(
result = yield self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except:
logger.exception("Error doing update")

if result is None:
logger.info(
"No more background updates to do."
" Unscheduling background update task."
)
return
else:
if result is None:
logger.info(
"No more background updates to do."
" Unscheduling background update task."
)
defer.returnValue()

@defer.inlineCallbacks
def do_background_update(self, desired_duration_ms):
"""Does some amount of work on a background update
def do_next_background_update(self, desired_duration_ms):
"""Does some amount of work on the next queued background update
Args:
desired_duration_ms(float): How long we want to spend
updating.
Expand All @@ -135,11 +139,21 @@ def do_background_update(self, desired_duration_ms):
self._background_update_queue.append(update['update_name'])

if not self._background_update_queue:
# no work left to do
defer.returnValue(None)

# pop from the front, and add back to the back
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)

res = yield self._do_background_update(update_name, desired_duration_ms)
defer.returnValue(res)

@defer.inlineCallbacks
def _do_background_update(self, update_name, desired_duration_ms):
logger.info("Starting update batch on background update '%s'",
update_name)

update_handler = self._background_update_handlers[update_name]

performance = self._background_update_performance.get(update_name)
Expand Down Expand Up @@ -202,6 +216,64 @@ def register_background_update_handler(self, update_name, update_handler):
"""
self._background_update_handlers[update_name] = update_handler

def register_background_index_update(self, update_name, index_name,
table, columns):
"""Helper for store classes to do a background index addition
To use:
1. use a schema delta file to add a background update. Example:
INSERT INTO background_updates (update_name, progress_json) VALUES
('my_new_index', '{}');
2. In the Store constructor, call this method
Args:
update_name (str): update_name to register for
index_name (str): name of index to add
table (str): table to add index to
columns (list[str]): columns/expressions to include in index
"""

# if this is postgres, we add the indexes concurrently. Otherwise
# we fall back to doing it inline
if isinstance(self.database_engine, engines.PostgresEngine):
conc = True
else:
conc = False

sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \
% {
"conc": "CONCURRENTLY" if conc else "",
"name": index_name,
"table": table,
"columns": ", ".join(columns),
}

def create_index_concurrently(conn):
conn.rollback()
# postgres insists on autocommit for the index
conn.set_session(autocommit=True)
c = conn.cursor()
c.execute(sql)
conn.set_session(autocommit=False)

def create_index(conn):
c = conn.cursor()
c.execute(sql)

@defer.inlineCallbacks
def updater(progress, batch_size):
logger.info("Adding index %s to %s", index_name, table)
if conc:
yield self.runWithConnection(create_index_concurrently)
else:
yield self.runWithConnection(create_index)
yield self._end_background_update(update_name)
defer.returnValue(1)

self.register_background_update_handler(update_name, updater)

def start_background_update(self, update_name, progress):
"""Starts a background update running.
Expand Down
15 changes: 11 additions & 4 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

import logging

from ._base import SQLBaseStore, Cache

from twisted.internet import defer

from ._base import Cache
from . import background_updates

logger = logging.getLogger(__name__)

# Number of msec of granularity to store the user IP 'last seen' time. Smaller
Expand All @@ -27,8 +28,7 @@
LAST_SEEN_GRANULARITY = 120 * 1000


class ClientIpStore(SQLBaseStore):

class ClientIpStore(background_updates.BackgroundUpdateStore):
def __init__(self, hs):
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
Expand All @@ -37,6 +37,13 @@ def __init__(self, hs):

super(ClientIpStore, self).__init__(hs)

self.register_background_index_update(
"user_ips_device_index",
index_name="user_ips_device_id",
table="user_ips",
columns=["user_id", "device_id", "last_seen"],
)

@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec())
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/schema/delta/33/user_ips_index.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
* limitations under the License.
*/

CREATE INDEX user_ips_device_id ON user_ips(user_id, device_id, last_seen);
INSERT INTO background_updates (update_name, progress_json) VALUES
('user_ips_device_index', '{}');
20 changes: 15 additions & 5 deletions tests/storage/test_background_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):

@defer.inlineCallbacks
def setUp(self):
hs = yield setup_test_homeserver()
hs = yield setup_test_homeserver() # type: synapse.server.HomeServer
self.store = hs.get_datastore()
self.clock = hs.get_clock()

Expand All @@ -20,11 +20,20 @@ def setUp(self):
"test_update", self.update_handler
)

# run the real background updates, to get them out the way
# (perhaps we should run them as part of the test HS setup, since we
# run all of the other schema setup stuff there?)
while True:
res = yield self.store.do_next_background_update(1000)
if res is None:
break

@defer.inlineCallbacks
def test_do_background_update(self):
desired_count = 1000
duration_ms = 42

# first step: make a bit of progress
@defer.inlineCallbacks
def update(progress, count):
self.clock.advance_time_msec(count * duration_ms)
Expand All @@ -42,32 +51,33 @@ def update(progress, count):
yield self.store.start_background_update("test_update", {"my_key": 1})

self.update_handler.reset_mock()
result = yield self.store.do_background_update(
result = yield self.store.do_next_background_update(
duration_ms * desired_count
)
self.assertIsNotNone(result)
self.update_handler.assert_called_once_with(
{"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE
)

# second step: complete the update
@defer.inlineCallbacks
def update(progress, count):
yield self.store._end_background_update("test_update")
defer.returnValue(count)

self.update_handler.side_effect = update

self.update_handler.reset_mock()
result = yield self.store.do_background_update(
result = yield self.store.do_next_background_update(
duration_ms * desired_count
)
self.assertIsNotNone(result)
self.update_handler.assert_called_once_with(
{"my_key": 2}, desired_count
)

# third step: we don't expect to be called any more
self.update_handler.reset_mock()
result = yield self.store.do_background_update(
result = yield self.store.do_next_background_update(
duration_ms * desired_count
)
self.assertIsNone(result)
Expand Down

0 comments on commit bf3de7b

Please sign in to comment.