Skip to content

Commit

Permalink
Merge pull request #207 from minrk/greenfix
Browse files Browse the repository at this point in the history
more conservative about events in zmq.green

Principal changes:

* `__state_changed()` is triggered after all send/recv/get(EVENTS) calls
  - should not triggered by intermediate send/recv inside multiparts.
* add timeout in `_wait_send/recv`, as crude workaround for apparent libzmq bug that may never
  notify of initial send event (comes up sometimes on slow VMs).
* removes Cython implementation of green Socket.

closes #199
  • Loading branch information
minrk committed May 24, 2012
2 parents 7ab9b10 + 667b4d7 commit a02bb89
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 171 deletions.
3 changes: 0 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,6 @@ def dotc(subdir, name):
'initthreads':[libzmq],
'rebuffer':[buffers],
},
green = {
'core' : [libzmq, context, socket],
}
)

try:
Expand Down
156 changes: 124 additions & 32 deletions zmq/green/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,27 @@
"""This module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
"""

from __future__ import print_function

import sys

import zmq
from zmq import *

# imported with different names as to not have the star import try to to clobber (when building with cython)
from zmq.core.context import Context as _original_Context
from zmq.core.socket import Socket as _original_Socket
from zmq import Context as _original_Context
from zmq import Socket as _original_Socket

import gevent
from gevent.event import AsyncResult
from gevent.hub import get_hub


class _Context(_original_Context):
"""Replacement for :class:`zmq.core.context.Context`
Ensures that the greened Socket below is used in calls to `socket`.
"""
_socket_class = _Socket

def _stop(evt):
"""simple wrapper for stopping an Event, allowing for method rename in gevent 1.0"""
try:
evt.stop()
except AttributeError as e:
# gevent<1.0 compat
evt.cancel()

class _Socket(_original_Socket):
"""Green version of :class:`zmq.core.socket.Socket`
Expand All @@ -48,40 +51,42 @@ class _Socket(_original_Socket):
Some double underscore prefixes are used to minimize pollution of
:class:`zmq.core.socket.Socket`'s namespace.
"""

def __init__(self, context, socket_type):
self.__in_send_multipart = False
self.__in_recv_multipart = False
self.__setup_events()

def close(self, linger=None):
# close the _state_event event, keeps the number of active file descriptors down
if not self._closed and getattr(self, '_state_event', None):
try:
self._state_event.stop()
except AttributeError as e:
# gevent<1.0 compat
self._state_event.cancel()
_stop(self._state_event)
super(_Socket, self).close(linger)

def __setup_events(self):
self.__readable = AsyncResult()
self.__writable = AsyncResult()
self.__readable.set()
self.__writable.set()

try:
self._state_event = get_hub().loop.io(self.getsockopt(FD), 1) # read state watcher
self._state_event = get_hub().loop.io(self.getsockopt(zmq.FD), 1) # read state watcher
self._state_event.start(self.__state_changed)
except AttributeError:
# for gevent<1.0 compatibility
from gevent.core import read_event
self._state_event = read_event(self.getsockopt(FD), self.__state_changed, persist=True)
self._state_event = read_event(self.getsockopt(zmq.FD), self.__state_changed, persist=True)

def __state_changed(self, event=None, _evtype=None):
if self.closed:
# if the socket has entered a close state resume any waiting greenlets
self.__writable.set()
self.__readable.set()
return
try:
if self.closed:
# if the socket has entered a close state resume any waiting greenlets
self.__writable.set()
self.__readable.set()
return
events = self.getsockopt(zmq.EVENTS)
except ZMQError as exc:
# avoid triggering __state_changed from inside __state_changed
events = super(_Socket, self).getsockopt(zmq.EVENTS)
except zmq.ZMQError as exc:
self.__writable.set_exception(exc)
self.__readable.set_exception(exc)
else:
Expand All @@ -91,38 +96,125 @@ def __state_changed(self, event=None, _evtype=None):
self.__readable.set()

def _wait_write(self):
assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
self.__writable = AsyncResult()
self.__writable.get()
# timeout is because libzmq cannot be trusted to properly signal a new send event:
# this is effectively a maximum poll interval of 1s
try:
self.__writable.get(timeout=1)
except gevent.Timeout:
if super(_Socket, self).getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent missed a libzmq send event!", file=sys.stderr)
self.__writable.set()

def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
self.__readable = AsyncResult()
self.__readable.get()
# timeout is because libzmq cannot always be trusted to play nice with libevent.
# I can only confirm that this actually happens for send, but lets be symmetrical
# with our dirty hacks.
# this is effectively a maximum poll interval of 1s
try:
self.__readable.get(timeout=1)
except gevent.Timeout:
if super(_Socket, self).getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent missed a libzmq recv event!", file=sys.stderr)
self.__readable.set()

def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""

# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
return super(_Socket, self).send(data, flags, copy, track)
try:
msg = super(_Socket, self).send(data, flags, copy, track)
finally:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
return super(_Socket, self).send(data, flags, copy, track)
msg = super(_Socket, self).send(data, flags, copy, track)
except zmq.ZMQError as e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
if not self.__in_send_multipart:
self.__state_changed()
raise
else:
if not self.__in_send_multipart:
self.__state_changed()
return msg
# defer to the event loop until we're notified the socket is writable
self._wait_write()

def recv(self, flags=0, copy=True, track=False):
"""recv, which will only block current greenlet
state_changed always fires exactly once (success or fail) at the
end of this method.
"""
if flags & zmq.NOBLOCK:
return super(_Socket, self).recv(flags, copy, track)
try:
msg = super(_Socket, self).recv(flags, copy, track)
finally:
if not self.__in_recv_multipart:
self.__state_changed()
return msg

flags |= zmq.NOBLOCK
while True:
try:
return super(_Socket, self).recv(flags, copy, track)
msg = super(_Socket, self).recv(flags, copy, track)
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
if not self.__in_recv_multipart:
self.__state_changed()
raise
else:
if not self.__in_recv_multipart:
self.__state_changed()
return msg
self._wait_read()

def send_multipart(self, *args, **kwargs):
"""wrap send_multipart to prevent state_changed on each partial send"""
self.__in_send_multipart = True
try:
msg = super(_Socket, self).send_multipart(*args, **kwargs)
finally:
self.__in_send_multipart = False
self.__state_changed()
return msg

def recv_multipart(self, *args, **kwargs):
"""wrap recv_multipart to prevent state_changed on each partial recv"""
self.__in_recv_multipart = True
try:
msg = super(_Socket, self).recv_multipart(*args, **kwargs)
finally:
self.__in_recv_multipart = False
self.__state_changed()
return msg

def getsockopt(self, opt):
"""trigger state_changed on getsockopt(EVENTS)"""
optval = super(_Socket, self).getsockopt(opt)
if opt == zmq.EVENTS:
self.__state_changed()
return optval


class _Context(_original_Context):
"""Replacement for :class:`zmq.core.context.Context`
Ensures that the greened Socket above is used in calls to `socket`.
"""
_socket_class = _Socket
136 changes: 0 additions & 136 deletions zmq/green/core.pyx

This file was deleted.

1 change: 1 addition & 0 deletions zmq/tests/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,5 +333,6 @@ def test_ipc_path_max_length_msg(self):
if have_gevent:
class TestSocketGreen(GreenTest, TestSocket):
test_bad_attr = GreenTest.skip_green
test_close_after_destroy = GreenTest.skip_green


0 comments on commit a02bb89

Please sign in to comment.