Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix tests on postgresql #3740

Merged
merged 28 commits into from
Sep 3, 2018
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ matrix:
- python: 3.6
env: TOX_ENV=check-newsfragment

allow_failures:
- python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"

install:
- pip install tox

Expand Down
1 change: 1 addition & 0 deletions changelog.d/3740.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The test suite now passes on PostgreSQL.
8 changes: 0 additions & 8 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from .deviceinbox import DeviceInboxStore
from .directory import DirectoryStore
from .end_to_end_keys import EndToEndKeyStore
from .engines import PostgresEngine
from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore
from .events import EventsStore
Expand Down Expand Up @@ -133,13 +132,6 @@ def __init__(self, db_conn, hs):
db_conn, "local_group_updates", "stream_id",
)

if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(
db_conn, "cache_invalidation_stream", "stream_id",
)
else:
self._cache_id_gen = None

self._presence_on_startup = self._get_active_presence(db_conn)

presence_cache_prefill, min_presence_val = self._get_cache_dict(
Expand Down
9 changes: 9 additions & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from synapse.api.errors import StoreError
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext

Expand Down Expand Up @@ -191,6 +192,14 @@ def __init__(self, db_conn, hs):

self.database_engine = hs.database_engine

if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(
db_conn, "cache_invalidation_stream", "stream_id",
)
self.clock = self.hs.get_clock()
else:
self._cache_id_gen = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, fun fact: this now gets set on the slave replication stores as well as the store on master. This is a bit awkward because we do set self._cache_id_gen on the replication stores in synapse/replication/slave/storage/_base.py to something different.

In short, everything is awful and is scary and I'm scared about moving this here in case the replication stuff fail to properly overwrite this definition


def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()

Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ def _update_client_ips_batch(self):
if not self.hs.get_db_pool().running:
return

@defer.inlineCallbacks
def update():
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
yield self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn,
to_update,
)
Expand Down
16 changes: 8 additions & 8 deletions synapse/storage/monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from twisted.internet import defer

from synapse.util.caches.descriptors import cached
from synapse.util.caches.descriptors import cachedInlineCallbacks

from ._base import SQLBaseStore

Expand All @@ -29,7 +29,7 @@

class MonthlyActiveUsersStore(SQLBaseStore):
def __init__(self, dbconn, hs):
super(MonthlyActiveUsersStore, self).__init__(None, hs)
super(MonthlyActiveUsersStore, self).__init__(dbconn, hs)
self._clock = hs.get_clock()
self.hs = hs
self.reserved_users = ()
Expand Down Expand Up @@ -131,21 +131,20 @@ def _reap_users(txn):
self.user_last_seen_monthly_active.invalidate_all()
self.get_monthly_active_count.invalidate_all()

@cached(num_args=0)
@cachedInlineCallbacks(num_args=0)
def get_monthly_active_count(self):
"""Generates current count of monthly active users

Returns:
Defered[int]: Number of current monthly active users
"""

def _count_users(txn):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"

txn.execute(sql)
count, = txn.fetchone()
return count
return self.runInteraction("count_users", _count_users)
res = yield self.runInteraction("count_users", _count_users)
defer.returnValue(res)

@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
Expand All @@ -171,7 +170,7 @@ def upsert_monthly_active_user(self, user_id):
self.user_last_seen_monthly_active.invalidate((user_id,))
self.get_monthly_active_count.invalidate(())

@cached(num_args=1)
@cachedInlineCallbacks(num_args=1)
def user_last_seen_monthly_active(self, user_id):
"""
Checks if a given user is part of the monthly active user group
Expand All @@ -182,7 +181,7 @@ def user_last_seen_monthly_active(self, user_id):

"""

return(self._simple_select_one_onecol(
res = yield (self._simple_select_one_onecol(
table="monthly_active_users",
keyvalues={
"user_id": user_id,
Expand All @@ -191,6 +190,7 @@ def user_last_seen_monthly_active(self, user_id):
allow_none=True,
desc="user_last_seen_monthly_active",
))
defer.returnValue(res)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for converting all these functions to defer.inlineCallbacks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

me trying things and inadvertently committing it :) -- reverted


@defer.inlineCallbacks
def populate_monthly_active_users(self, user_id):
Expand Down
143 changes: 75 additions & 68 deletions tests/handlers/test_device.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,79 +14,79 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

import synapse.api.errors
import synapse.handlers.device
import synapse.storage

from tests import unittest, utils
from tests import unittest

user1 = "@boris:aaa"
user2 = "@theresa:bbb"


class DeviceTestCase(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(DeviceTestCase, self).__init__(*args, **kwargs)
self.store = None # type: synapse.storage.DataStore
self.handler = None # type: synapse.handlers.device.DeviceHandler
self.clock = None # type: utils.MockClock

@defer.inlineCallbacks
def setUp(self):
hs = yield utils.setup_test_homeserver(self.addCleanup)
class DeviceTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver("server", http_client=None)
self.handler = hs.get_device_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
return hs

def prepare(self, reactor, clock, hs):
# These tests assume that it starts 1000 seconds in.
self.reactor.advance(1000)

@defer.inlineCallbacks
def test_device_is_created_if_doesnt_exist(self):
res = yield self.handler.check_device_registered(
user_id="@boris:foo",
device_id="fco",
initial_device_display_name="display name",
res = self.get_success(
self.handler.check_device_registered(
user_id="@boris:foo",
device_id="fco",
initial_device_display_name="display name",
)
)
self.assertEqual(res, "fco")

dev = yield self.handler.store.get_device("@boris:foo", "fco")
dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
self.assertEqual(dev["display_name"], "display name")

@defer.inlineCallbacks
def test_device_is_preserved_if_exists(self):
res1 = yield self.handler.check_device_registered(
user_id="@boris:foo",
device_id="fco",
initial_device_display_name="display name",
res1 = self.get_success(
self.handler.check_device_registered(
user_id="@boris:foo",
device_id="fco",
initial_device_display_name="display name",
)
)
self.assertEqual(res1, "fco")

res2 = yield self.handler.check_device_registered(
user_id="@boris:foo",
device_id="fco",
initial_device_display_name="new display name",
res2 = self.get_success(
self.handler.check_device_registered(
user_id="@boris:foo",
device_id="fco",
initial_device_display_name="new display name",
)
)
self.assertEqual(res2, "fco")

dev = yield self.handler.store.get_device("@boris:foo", "fco")
dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
self.assertEqual(dev["display_name"], "display name")

@defer.inlineCallbacks
def test_device_id_is_made_up_if_unspecified(self):
device_id = yield self.handler.check_device_registered(
user_id="@theresa:foo",
device_id=None,
initial_device_display_name="display",
device_id = self.get_success(
self.handler.check_device_registered(
user_id="@theresa:foo",
device_id=None,
initial_device_display_name="display",
)
)

dev = yield self.handler.store.get_device("@theresa:foo", device_id)
dev = self.get_success(self.handler.store.get_device("@theresa:foo", device_id))
self.assertEqual(dev["display_name"], "display")

@defer.inlineCallbacks
def test_get_devices_by_user(self):
yield self._record_users()
self._record_users()

res = self.get_success(self.handler.get_devices_by_user(user1))

res = yield self.handler.get_devices_by_user(user1)
self.assertEqual(3, len(res))
device_map = {d["device_id"]: d for d in res}
self.assertDictContainsSubset(
Expand Down Expand Up @@ -119,11 +120,10 @@ def test_get_devices_by_user(self):
device_map["abc"],
)

@defer.inlineCallbacks
def test_get_device(self):
yield self._record_users()
self._record_users()

res = yield self.handler.get_device(user1, "abc")
res = self.get_success(self.handler.get_device(user1, "abc"))
self.assertDictContainsSubset(
{
"user_id": user1,
Expand All @@ -135,59 +135,66 @@ def test_get_device(self):
res,
)

@defer.inlineCallbacks
def test_delete_device(self):
yield self._record_users()
self._record_users()

# delete the device
yield self.handler.delete_device(user1, "abc")
self.get_success(self.handler.delete_device(user1, "abc"))

# check the device was deleted
with self.assertRaises(synapse.api.errors.NotFoundError):
yield self.handler.get_device(user1, "abc")
res = self.handler.get_device(user1, "abc")
self.pump()
self.assertIsInstance(
self.failureResultOf(res).value, synapse.api.errors.NotFoundError
)

# we'd like to check the access token was invalidated, but that's a
# bit of a PITA.

@defer.inlineCallbacks
def test_update_device(self):
yield self._record_users()
self._record_users()

update = {"display_name": "new display"}
yield self.handler.update_device(user1, "abc", update)
self.get_success(self.handler.update_device(user1, "abc", update))

res = yield self.handler.get_device(user1, "abc")
res = self.get_success(self.handler.get_device(user1, "abc"))
self.assertEqual(res["display_name"], "new display")

@defer.inlineCallbacks
def test_update_unknown_device(self):
update = {"display_name": "new_display"}
with self.assertRaises(synapse.api.errors.NotFoundError):
yield self.handler.update_device("user_id", "unknown_device_id", update)
res = self.handler.update_device("user_id", "unknown_device_id", update)
self.pump()
self.assertIsInstance(
self.failureResultOf(res).value, synapse.api.errors.NotFoundError
)

@defer.inlineCallbacks
def _record_users(self):
# check this works for both devices which have a recorded client_ip,
# and those which don't.
yield self._record_user(user1, "xyz", "display 0")
yield self._record_user(user1, "fco", "display 1", "token1", "ip1")
yield self._record_user(user1, "abc", "display 2", "token2", "ip2")
yield self._record_user(user1, "abc", "display 2", "token3", "ip3")
self._record_user(user1, "xyz", "display 0")
self._record_user(user1, "fco", "display 1", "token1", "ip1")
self._record_user(user1, "abc", "display 2", "token2", "ip2")
self._record_user(user1, "abc", "display 2", "token3", "ip3")

self._record_user(user2, "def", "dispkay", "token4", "ip4")

yield self._record_user(user2, "def", "dispkay", "token4", "ip4")
self.reactor.advance(10000)

@defer.inlineCallbacks
def _record_user(
self, user_id, device_id, display_name, access_token=None, ip=None
):
device_id = yield self.handler.check_device_registered(
user_id=user_id,
device_id=device_id,
initial_device_display_name=display_name,
device_id = self.get_success(
self.handler.check_device_registered(
user_id=user_id,
device_id=device_id,
initial_device_display_name=display_name,
)
)

if ip is not None:
yield self.store.insert_client_ip(
user_id, access_token, ip, "user_agent", device_id
self.get_success(
self.store.insert_client_ip(
user_id, access_token, ip, "user_agent", device_id
)
)
self.clock.advance_time(1000)
self.reactor.advance(1000)
Loading