Skip to content

Commit

Permalink
Merge pull request #212 from geromueller/fix_locks
Browse files Browse the repository at this point in the history
Resolves #207
  • Loading branch information
coldfix committed Jul 28, 2017
2 parents 6663466 + a00be89 commit 3f1cc1e
Showing 1 changed file with 51 additions and 32 deletions.
83 changes: 51 additions & 32 deletions rpyc/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
import gc

from threading import Lock, RLock, Event
from threading import Lock, RLock, Event, Thread
from rpyc.lib.compat import pickle, next, is_py3k, maxint, select_error
from rpyc.lib.colls import WeakValueDict, RefCountingColl
from rpyc.core import consts, brine, vinegar, netref
Expand Down Expand Up @@ -337,17 +337,15 @@ def _netref_factory(self, oid, clsname, modname):
# dispatching
#
def _dispatch_request(self, seq, raw_args):
logger = self._config["logger"]
try:
handler, args = raw_args
if logger:
logger.debug("dispatching: %r (%r)", handler, seq)
args = self._unbox(args)
res = self._HANDLERS[handler](self, *args)
except:
# need to catch old style exceptions too
t, v, tb = sys.exc_info()
self._last_traceback = tb
logger = self._config["logger"]
if logger and t is not StopIteration:
logger.debug("Exception caught", exc_info=True)
if t is SystemExit and self._config["propagate_SystemExit_locally"]:
Expand Down Expand Up @@ -404,17 +402,29 @@ def _dispatch(self, data):
else:
raise ValueError("invalid message type: %r" % (msg,))

def sync_recv_and_dispatch(self, timeout, wait_for_lock):
# lock or wait for signal
if self._sync_lock.acquire(False):
try:
self._sync_event.clear()
data = self._recv(timeout, wait_for_lock = False)
if not data:
return False
self._dispatch(data)
return True
finally:
self._sync_lock.release()
self._sync_event.set()
else:
self._sync_event.wait()

def poll(self, timeout = 0):
"""Serves a single transaction, should one arrives in the given
interval. Note that handling a request/reply may trigger nested
requests, which are all part of a single transaction.
:returns: ``True`` if a transaction was served, ``False`` otherwise"""
data = self._recv(timeout, wait_for_lock = False)
if not data:
return False
self._dispatch(data)
return True
return self.sync_recv_and_dispatch(timeout, wait_for_lock=False)

def serve(self, timeout = 1):
"""Serves a single request or reply that arrives within the given
Expand All @@ -425,18 +435,14 @@ def serve(self, timeout = 1):
:returns: ``True`` if a request or reply were received, ``False``
otherwise.
"""
data = self._recv(timeout, wait_for_lock = True)
if not data:
return False
self._dispatch(data)
return True
return self.sync_recv_and_dispatch(timeout, wait_for_lock=True)

def serve_all(self):
"""Serves all requests and replies for as long as the connection is
alive."""
try:
while True:
self.serve(0.1)
self.serve(None)
except (socket.error, select_error, IOError):
if not self.closed:
raise
Expand All @@ -445,7 +451,34 @@ def serve_all(self):
finally:
self.close()

def poll_all(self, timeout = 0):
def serve_threaded(self, thread_count=10):
def _thread_target():
try:
while True:
self.serve(None)
except (socket.error, select_error, IOError):
if not self.closed:
raise
except EOFError:
pass

threads = []

"""Serves all requests and replies for as long as the connection is
alive."""
try:
for _ in range(thread_count):
thread = Thread(target=_thread_target)
thread.daemon = True
thread.start()
threads.append(thread)

for thread in threads:
thread.join()
finally:
self.close()

def poll_all(self, timeout=0):
"""Serves all requests and replies that arrive within the given interval.
:returns: ``True`` if at least a single transaction was served, ``False`` otherwise
Expand Down Expand Up @@ -476,24 +509,10 @@ def sync_request(self, handler, *args):
"""
seq = self._get_seq_id()
self._send_request(seq, handler, args)
start_time = time.time()
timeout = self._config["sync_request_timeout"]

timeout = self._config["sync_request_timeout"]
while seq not in self._sync_replies:
remaining_time = timeout - (time.time() - start_time)
if remaining_time < 0:
raise socket.timeout

# lock or wait for signal
if self._sync_lock.acquire(False):
try:
self._sync_event.clear()
self.serve(remaining_time)
finally:
self._sync_lock.release()
self._sync_event.set()
else:
self._sync_event.wait(remaining_time)
self.sync_recv_and_dispatch(timeout, True)

isexc, obj = self._sync_replies.pop(seq)
if isexc:
Expand Down

0 comments on commit 3f1cc1e

Please sign in to comment.