diff --git a/rpyc/core/protocol.py b/rpyc/core/protocol.py index dfd6f4a4..76cc80f6 100644 --- a/rpyc/core/protocol.py +++ b/rpyc/core/protocol.py @@ -396,8 +396,7 @@ def _recv(self, timeout, wait_for_lock): self._recvlock.release() return data - def _dispatch(self, data): - msg, seq, args = brine.load(data) + def _dispatch(self, msg, seq, args): if msg == consts.MSG_REQUEST: self._dispatch_request(seq, args) elif msg == consts.MSG_REPLY: @@ -416,11 +415,21 @@ def sync_recv_and_dispatch(self, timeout, wait_for_lock): data = self._recv(timeout, wait_for_lock=wait_for_lock) if not data: return False - self._dispatch(data) - return True + + msg, seq, args = brine.load(data) + # Have to enqueue to _sync_replies before releasing _recvlock to + # avoid race conditions with other threads + sync_reply = (msg in (consts.MSG_REPLY, consts.MSG_EXCEPTION) and + seq not in self._async_callbacks) + if sync_reply: + self._dispatch(msg, seq, args) + return True + finally: self._sync_lock.release() self._sync_event.set() + self._dispatch(msg, seq, args) + return True else: return self._sync_event.wait( timeout.timeleft() if timeout.finite else None)