Skip to content

Commit

Permalink
Fix deadlock with connections talking to each other multithreadedly
Browse files Browse the repository at this point in the history
Fixes #270
  • Loading branch information
coldfix committed May 20, 2018
1 parent daa48d6 commit 04838f3
Showing 1 changed file with 23 additions and 34 deletions.
57 changes: 23 additions & 34 deletions rpyc/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
import gc

from threading import Lock, RLock, Event
from threading import Lock
from rpyc.lib import spawn, Timeout
from rpyc.lib.compat import (pickle, next, is_py3k, maxint, select_error,
acquire_lock, TimeoutError)
Expand Down Expand Up @@ -146,8 +146,6 @@ def __init__(self, root, channel, config={}):
self._recvlock = Lock()
self._sendlock = Lock()
self._sync_replies = {}
self._sync_lock = RLock()
self._sync_event = Event()
self._async_callbacks = {}
self._local_objects = RefCountingColl()
self._last_traceback = None
Expand Down Expand Up @@ -380,24 +378,8 @@ def _dispatch_exception(self, seq, raw):
#
# serving
#
def _recv(self, timeout, wait_for_lock):
timeout = Timeout(timeout)
if not acquire_lock(self._recvlock, wait_for_lock, timeout):
return None
try:
if self._channel.poll(timeout):
data = self._channel.recv()
else:
data = None
except EOFError:
self.close()
raise
finally:
self._recvlock.release()
return data

def _dispatch(self, data):
msg, seq, args = brine.load(data)
def _dispatch(self, msg, seq, args):
if msg == consts.MSG_REQUEST:
self._dispatch_request(seq, args)
elif msg == consts.MSG_REPLY:
Expand All @@ -410,20 +392,27 @@ def _dispatch(self, data):
def sync_recv_and_dispatch(self, timeout, wait_for_lock):
# lock or wait for signal
timeout = Timeout(timeout)
if acquire_lock(self._sync_lock, False, timeout):
try:
self._sync_event.clear()
data = self._recv(timeout, wait_for_lock=wait_for_lock)
if not data:
return False
self._dispatch(data)
return True
finally:
self._sync_lock.release()
self._sync_event.set()
else:
return self._sync_event.wait(
timeout.timeleft() if timeout.finite else None)
if not acquire_lock(self._recvlock, wait_for_lock, timeout):
return None
try:
data = self._channel.poll(timeout) and self._channel.recv()
if not data:
return False
msg, seq, args = brine.load(data)
# Have to enqueue to _sync_replies before releasing _recvlock to
# avoid race conditions with other threads
sync_reply = (msg in (consts.MSG_REPLY, consts.MSG_EXCEPTION) and
seq not in self._async_callbacks)
if sync_reply:
self._dispatch(msg, seq, args)
except EOFError:
self.close()
raise
finally:
self._recvlock.release()
if not sync_reply:
self._dispatch(msg, seq, args)
return True

def poll(self, timeout = 0):
"""Serves a single transaction, should one arrives in the given
Expand Down

0 comments on commit 04838f3

Please sign in to comment.