diff --git a/rpyc/core/protocol.py b/rpyc/core/protocol.py index dfd6f4a4..9b083303 100644 --- a/rpyc/core/protocol.py +++ b/rpyc/core/protocol.py @@ -8,7 +8,7 @@ import time import gc -from threading import Lock, RLock, Event +from threading import Lock from rpyc.lib import spawn, Timeout from rpyc.lib.compat import (pickle, next, is_py3k, maxint, select_error, acquire_lock, TimeoutError) @@ -146,8 +146,6 @@ def __init__(self, root, channel, config={}): self._recvlock = Lock() self._sendlock = Lock() self._sync_replies = {} - self._sync_lock = RLock() - self._sync_event = Event() self._async_callbacks = {} self._local_objects = RefCountingColl() self._last_traceback = None @@ -380,24 +378,8 @@ def _dispatch_exception(self, seq, raw): # # serving # - def _recv(self, timeout, wait_for_lock): - timeout = Timeout(timeout) - if not acquire_lock(self._recvlock, wait_for_lock, timeout): - return None - try: - if self._channel.poll(timeout): - data = self._channel.recv() - else: - data = None - except EOFError: - self.close() - raise - finally: - self._recvlock.release() - return data - def _dispatch(self, data): - msg, seq, args = brine.load(data) + def _dispatch(self, msg, seq, args): if msg == consts.MSG_REQUEST: self._dispatch_request(seq, args) elif msg == consts.MSG_REPLY: @@ -410,20 +392,27 @@ def _dispatch(self, data): def sync_recv_and_dispatch(self, timeout, wait_for_lock): # lock or wait for signal timeout = Timeout(timeout) - if acquire_lock(self._sync_lock, False, timeout): - try: - self._sync_event.clear() - data = self._recv(timeout, wait_for_lock=wait_for_lock) - if not data: - return False - self._dispatch(data) - return True - finally: - self._sync_lock.release() - self._sync_event.set() - else: - return self._sync_event.wait( - timeout.timeleft() if timeout.finite else None) + if not acquire_lock(self._recvlock, wait_for_lock, timeout): + return None + try: + data = self._channel.poll(timeout) and self._channel.recv() + if not data: + return False + msg, seq, args = brine.load(data) + # Have to enqueue to _sync_replies before releasing _recvlock to + # avoid race conditions with other threads + sync_reply = (msg in (consts.MSG_REPLY, consts.MSG_EXCEPTION) and + seq not in self._async_callbacks) + if sync_reply: + self._dispatch(msg, seq, args) + except EOFError: + self.close() + raise + finally: + self._recvlock.release() + if not sync_reply: + self._dispatch(msg, seq, args) + return True def poll(self, timeout = 0): """Serves a single transaction, should one arrives in the given