Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
EXTRA WIP DIRTY DIRTY
Browse files Browse the repository at this point in the history
  • Loading branch information
David Robertson committed Feb 22, 2023
1 parent 05c4fba commit 37c633b
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 30 deletions.
16 changes: 9 additions & 7 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple

import attr

from signedjson.key import (
decode_verify_key_bytes,
encode_verify_key_base64,
Expand Down Expand Up @@ -367,7 +366,6 @@ async def _inner_fetch_key_requests(
Returns:
{server name: {key id: fetch key result}}
"""

logger.debug("Starting fetch for %s", requests)

# First we need to deduplicate requests for the same key. We do this by
Expand Down Expand Up @@ -439,6 +437,7 @@ async def _inner_fetch_key_request(
break

logger.debug("Getting keys from %s for %s", fetcher, verify_request)
print("DMR: Getting keys from %s for %s" % (fetcher, verify_request))
keys = await fetcher.get_keys(
verify_request.server_name,
list(missing_key_ids),
Expand Down Expand Up @@ -695,7 +694,7 @@ async def get_server_verify_key_v2_indirect(
the server
"""
perspective_name = key_server.server_name
logger.info(
print(
"Requesting keys %s from notary server %s",
keys_to_fetch,
perspective_name,
Expand Down Expand Up @@ -723,13 +722,13 @@ async def get_server_verify_key_v2_indirect(
)
except (NotRetryingDestination, RequestSendFailed) as e:
# these both have str() representations which we can't really improve upon
print("DMR:", e)
raise KeyLookupError(str(e))
except HttpResponseException as e:
print("DMR:", e)
raise KeyLookupError("Remote server returned an error: %s" % (e,))

logger.debug(
"Response from notary server %s: %s", perspective_name, query_response
)
print("Response from notary server %s: %s", perspective_name, query_response)

keys: Dict[str, Dict[str, FetchKeyResult]] = {}
added_keys: List[Tuple[str, str, FetchKeyResult]] = []
Expand Down Expand Up @@ -876,6 +875,7 @@ async def get_server_verify_keys_v2_direct(
Raises:
KeyLookupError if there was a problem making the lookup
"""
print("DMR: get_server_verify_keys_v2_direct starting")
time_now_ms = self.clock.time_msec()
try:
response = await self.client.get_json(
Expand All @@ -902,6 +902,7 @@ async def get_server_verify_keys_v2_direct(
except HttpResponseException as e:
raise KeyLookupError("Remote server returned an error: %s" % (e,))

print("DMR: get_server_verify_keys_v2_direct got", response)
assert isinstance(response, dict)
if response["server_name"] != server_name:
raise KeyLookupError(
Expand Down Expand Up @@ -935,12 +936,13 @@ async def _fetch_keys(
) -> Dict[str, Dict[str, FetchKeyResult]]:
# For simplicity's sake, pick a random federation sender
instance_name = random.choice(self._federation_shard_config.instances)
print("DMR: ask", instance_name, "for", keys_to_fetch)
response = await self._client(
keys_to_fetch=keys_to_fetch,
instance_name=instance_name,
)
logger.info(
"%s fetched %s keys for us", instance_name, response["fetched_count"]
)

print("DMR:", instance_name, "replied:", response)
return await super()._fetch_keys(keys_to_fetch)
3 changes: 3 additions & 0 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ async def request(
try:
body_producer = None
if data is not None:
print("SimpleHTTPClient.request data=", data)
body_producer = QuieterFileBodyProducer(
BytesIO(data),
cooperator=self._cooperator,
Expand Down Expand Up @@ -565,9 +566,11 @@ async def post_json_get_json(
if headers:
actual_headers.update(headers) # type: ignore

logger.warning("DMR: before request")
response = await self.request(
"POST", uri, headers=Headers(actual_headers), data=json_str
)
logger.warning("DMR: after request")

body = await make_deferred_yieldable(readBody(response))

Expand Down
1 change: 0 additions & 1 deletion synapse/replication/http/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class ReplicationFetchKeysEndpoint(ReplicationEndpoint):
NAME = "fetch_keys"
PATH_ARGS = ()
METHOD = "POST"
WAIT_FOR_STREAMS = False

def __init__(self, hs: "HomeServer"):
super().__init__(hs)
Expand Down
5 changes: 3 additions & 2 deletions synapse/rest/key/v2/remote_key_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from twisted.web.server import Request

from synapse.crypto.keyring import ServerKeyFetcher
from synapse.crypto.keyring import InternalWorkerRequestKeyFetcher
from synapse.http.server import HttpServer
from synapse.http.servlet import (
RestServlet,
Expand Down Expand Up @@ -94,7 +94,7 @@ class RemoteKey(RestServlet):
"""

def __init__(self, hs: "HomeServer"):
self.fetcher = ServerKeyFetcher(hs)
self.fetcher = InternalWorkerRequestKeyFetcher(hs)
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.federation_domain_whitelist = (
Expand Down Expand Up @@ -238,6 +238,7 @@ async def query_keys(
# If there is a cache miss, request the missing keys, then recurse (and
# ensure the result is sent).
if cache_misses:
print("DMR: fetch keys remotely for", cache_misses)
await yieldable_gather_results(
lambda t: self.fetcher.get_keys(*t),
(
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def _invalidate_state_caches_and_stream(
async def send_invalidation_to_replication(
self, cache_name: str, keys: Optional[Collection[Any]]
) -> None:
print("DMR: send_invalidation_to_replication")
await self.db_pool.runInteraction(
"send_invalidation_to_replication",
self._send_invalidation_to_replication,
Expand Down
9 changes: 5 additions & 4 deletions synapse/storage/databases/main/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.storage.keys import FetchKeyResult
from synapse.storage.types import Cursor
from synapse.util.caches.descriptors import cached, cachedList
Expand All @@ -32,7 +33,7 @@
db_binary_type = memoryview


class KeyStore(SQLBaseStore):
class KeyStore(CacheInvalidationWorkerStore):
"""Persistence for signature verification keys"""

@cached()
Expand Down Expand Up @@ -105,10 +106,12 @@ async def store_server_verify_keys(
keys to be stored. Each entry is a triplet of
(server_name, key_id, key).
"""
print("DMR: store_server_verify_keys")
key_values = []
value_values = []
invalidations = []
for server_name, key_id, fetch_result in verify_keys:
logger.warning(f"DMR: {server_name=} {key_id=} {fetch_result=}")
key_values.append((server_name, key_id))
value_values.append(
(
Expand Down Expand Up @@ -136,10 +139,8 @@ async def store_server_verify_keys(
value_values=value_values,
desc="store_server_verify_keys",
)

invalidate = self._get_server_verify_key.invalidate
for i in invalidations:
invalidate((i,))
await self.invalidate_cache_and_stream("_get_server_verify_key", (i,))

async def store_server_keys_json(
self,
Expand Down
23 changes: 7 additions & 16 deletions tests/crypto/test_keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
PerspectivesKeyFetcher,
ServerKeyFetcher,
StoreKeyFetcher,
InternalWorkerRequestKeyFetcher,
)
from synapse.logging.context import (
ContextRequest,
Expand All @@ -50,7 +51,7 @@

from tests import unittest
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.server import make_request
from tests.server import make_request, TimedOutException
from tests.test_utils import make_awaitable
from tests.unittest import logcontext_clean, override_config

Expand Down Expand Up @@ -804,7 +805,7 @@ async def mock_post_json(*args: Any, **kwargs: Any) -> JsonDict:
"""Mock the request to the notary server."""
if kwargs.get("path") != "/_matrix/key/v2/query":
raise HttpResponseException(500, "ruh", b"roh")
return {}
return {"server_keys": []}

async def mock_get_json(*args: Any, **kwargs: Any) -> JsonDict:
if kwargs.get("path") != "/_matrix/key/v2/server":
Expand All @@ -823,21 +824,11 @@ async def mock_get_json(*args: Any, **kwargs: Any) -> JsonDict:
federation_http_client=mock_http_client,
)

channel = make_request(
self.reactor,
self.site,
"POST",
"/_matrix/key/v2/query",
access_token=None,
content=json.dumps({"server_keys": {SERVER_NAME: {testverifykey_id: {}}}}),
fetcher = InternalWorkerRequestKeyFetcher(self.hs)
result = self.get_success(
fetcher.get_keys(SERVER_NAME, [testverifykey_id], 0), by=0.1
)
print(channel.json_body)
# fetcher = InternalWorkerRequestKeyFetcher(self.hs)
# d = fetcher.get_keys(SERVER_NAME, [testverifykey_id], 6000)
# for _ in range(1000):
# self.reactor.advance(0.1)
# result = self.get_success(d)
# print(result)
print(result)


def get_key_id(key: SigningKey) -> str:
Expand Down
1 change: 1 addition & 0 deletions tests/replication/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def _handle_http_replication_attempt(self, hs: HomeServer, repl_port: int) -> No
# Set up the server side protocol
server_address = IPv4Address("TCP", host, port)
channel = self._hs_to_site[hs].buildProtocol((host, port))
logger.warning("DMR: _handle_http_replication_attempt channel=%s", channel)

# Connect client to server and vice versa.
client_to_server_transport = FakeTransport(
Expand Down
3 changes: 3 additions & 0 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,9 @@ def connectTCP(
if callback:
callback()

logger.warning("DMR: ThreadedMemoryReactorClock.connectTCP: return %s", conn)
# if port == 1001:
# breakpoint()
return conn

def advance(self, amount: float) -> None:
Expand Down

0 comments on commit 37c633b

Please sign in to comment.