Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make repl send_event idempotent and retry on timeouts #2920

Merged
merged 4 commits into from
Mar 1, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@

from twisted.internet import defer

from synapse.api.errors import SynapseError, MatrixCodeMessageException
from synapse.api.errors import (
SynapseError, MatrixCodeMessageException, CodeMessageException,
)
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import Requester

Expand All @@ -43,7 +48,9 @@ def send_event_to_master(client, host, port, requester, event, context,
ratelimit (bool)
extra_users (list(str)): Any extra users to notify about event
"""
uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,)
uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
host, port, event.event_id,
)

payload = {
"event": event.get_pdu_json(),
Expand All @@ -56,7 +63,20 @@ def send_event_to_master(client, host, port, requester, event, context,
}

try:
result = yield client.post_json_get_json(uri, payload)
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
try:
result = yield client.put_json(uri, payload)
break
except CodeMessageException as e:
if e.code != 504:
raise

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest logging something here.

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield sleep(1)
except MatrixCodeMessageException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
Expand All @@ -71,7 +91,7 @@ class ReplicationSendEventRestServlet(RestServlet):

The API looks like:

POST /_synapse/replication/send_event
POST /_synapse/replication/send_event/:event_id

{
"event": { .. serialized event .. },
Expand All @@ -83,7 +103,7 @@ class ReplicationSendEventRestServlet(RestServlet):
"extra_users": [],
}
"""
PATTERNS = [re.compile("^/_synapse/replication/send_event$")]
PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")]

def __init__(self, hs):
super(ReplicationSendEventRestServlet, self).__init__()
Expand All @@ -92,8 +112,20 @@ def __init__(self, hs):
self.store = hs.get_datastore()
self.clock = hs.get_clock()

# The responses are tiny, so we may as well cache them for a while
self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)

def on_PUT(self, request, event_id):
result = self.response_cache.get(event_id)
if not result:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest adding a line of logging for the case where there is an entry in the cache already.

result = self.response_cache.set(
event_id,
preserve_fn(self._handle_request)(request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you preserve_fn this once rather than on every request?

)
return make_deferred_yieldable(result)

@defer.inlineCallbacks
def on_POST(self, request):
def _handle_request(self, request):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)

Expand Down