-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade nng version please. #103
Comments
Good idea. I'll do this soon. |
@codypiersall do you have a rough outline of how you typically go about upgrading the underlying nng version? Would love to help out if I can. |
Is the pynng library still being actively maintained? |
Any updates on this @codypiersall ? |
Hmmm, turns out back in April when I said
I had no idea what I was talking about. Unfortunately life has been, well, busy for the last several months, and when it seems there will be a reprieve, it turns out that there isn't. I'm traveling for work for the rest of this month and won't have time to work on this project.
Hmmm, in some ways a deep ontological question that is. I do have the intention of getting back to this repo. Unfortunately no clue on a timeline.
From a clean pynng repo, I
|
@codypiersall before your comment, I had done this process through trial and error. You can find the repo at https://github.com/LAMBDA-AUTOMATA/pynng (don't expect me to maintain anything). Some tests fail, but our internal test suite passes, _and our field testing personnel confirm that the segfault that was happening on previous builds, has gone away . UPDATE 17/10/2022: The segfault is still happenning. I am pretty sure this has something to do with this open nng issue (nanomsg/nng#1523) , where problems seem to go away due to a recent internal rewrite (API stays the same). However, our internal systems rely on a small Pub0/Sub0 and Req0 Rep0 functionality. Here are the usage examples that it works for: class Subscriber:
"""
A class that acts as a subscriber on a given socket address. Subscribers connect
to the publisher on that address, to receive data bytes via the `receive` function.
"""
ALL_TOPICS = ""
def __init__(
self, address: str, recv_buffer_size: int = 1, recv_timeout_ms: int = 10
) -> None:
self.address = address
self._socket = None
self.recv_buffer_size = recv_buffer_size
self.recv_timeout_ms = recv_timeout_ms
def __str__(self) -> str:
return f"Subscriber({self.address})"
def __enter__(self):
try:
self._socket = pynng.Sub0(
dial=self.address,
topics=self.ALL_TOPICS,
recv_timeout=self.recv_timeout_ms,
recv_buffer_size=self.recv_buffer_size,
block_on_dial=False,
)
except BaseException as e:
logger.error(e)
raise CommunicationError(f"Failed to setup {self}")
return self
def __exit__(self, type, value, traceback):
try:
self._socket.__exit__(type, value, traceback)
except BaseException as e:
logger.error(e)
async def receive(self) -> Optional[bytes]:
"""
Receive bytes as a self-contained message from the socket.
Returns none upon timeout."""
try:
msg = await self._socket.arecv_msg()
except pynng.exceptions.Timeout:
return None
except BaseException as e:
logger.error(e)
raise CommunicationError(f"{self} failed to receive message")
return msg.bytes class Publisher:
"""
A class that acts as a publisher on a given socket address. Subscribers connect
to it, to receive data bytes sent via the `send` function.
"""
def __init__(self, address: str) -> None:
self.address = address
self._socket = None
def __str__(self) -> str:
return f"Publisher({self.address})"
def __enter__(self):
try:
self._socket = pynng.Pub0(listen=self.address)
except BaseException as e:
logger.error(e)
raise CommunicationError(f"Failed to setup {self}")
return self
def __exit__(self, type, value, traceback):
try:
self._socket.__exit__(type, value, traceback)
except BaseException as e:
logger.error(e)
async def send(self, data: bytes) -> None:
"""
Sends bytes over the socket as a self-contained message.
"""
try:
await self._socket.asend(data)
except pynng.exceptions.Timeout:
return None
except BaseException as e:
logger.error(e)
raise CommunicationError(f"{self} failed to receive message") class RequestClient:
"""
A class that acts as a Req0 on a given socket address. Subscribers connect
to it, to receive data bytes sent via the `send` function.
"""
EMPTY_REQUEST = b""
def __init__(self, address: str, recv_timeout_ms: int = 200) -> None:
self.address = address
self._socket = None
self.recv_timeout_ms = recv_timeout_ms
@staticmethod
async def simple_request(
address: str, payload: bytes = None, recv_timeout_ms: int = 200
) -> Optional[bytes]:
payload = payload or RequestClient.EMPTY_REQUEST
with RequestClient(address=address, recv_timeout_ms=recv_timeout_ms) as req:
resp_bytes = await req.send(payload)
if resp_bytes == RequestServer.EMPTY_RESPONSE:
return None
return resp_bytes
def __str__(self) -> str:
return f"RequestClient({self.address})"
def __enter__(self):
try:
self._socket = pynng.Req0(
dial=self.address,
send_timeout=self.recv_timeout_ms,
recv_timeout=self.recv_timeout_ms,
)
except BaseException as e:
logger.error(e)
raise CommunicationError(f"Failed to connect {self}")
return self
def __exit__(self, type, value, traceback):
try:
self._socket.__exit__(type, value, traceback)
except BaseException as e:
logger.error(e)
async def send(self, data: bytes) -> Optional[bytes]:
try:
await self._socket.asend(data)
response = await self._socket.arecv_msg()
except pynng.exceptions.Timeout:
return None
except BaseException as e:
logger.error(e)
raise CommunicationError(f"{self} failed to communicate")
return response.bytes class RequestServer:
"""
A class that acts as a subscriber on a given socket address. Subscribers connect
to the publisher on that address, to receive data bytes via the `receive` function.
"""
ALL_TOPICS = ""
EMPTY_RESPONSE = b""
def __init__(
self, address: str, recv_timeout_ms: int = 200, recv_buffer_size: int = 1
) -> None:
self.address = address
self._socket = None
self.recv_timeout_ms = recv_timeout_ms
self.recv_buffer_size = recv_buffer_size
def __str__(self) -> str:
return f"RequestServer({self.address})"
def __enter__(self):
try:
self._socket = pynng.Rep0(
listen=self.address,
recv_timeout=self.recv_timeout_ms,
send_timeout=self.recv_timeout_ms,
recv_buffer_size=self.recv_buffer_size,
)
except BaseException as e:
logger.error(e)
raise CommunicationError(f"Failed to set up {self}")
return self
def __exit__(self, type, value, traceback):
try:
self._socket.__exit__(type, value, traceback)
except BaseException as e:
logger.error(e)
async def receive(self, send_empty_response: bool = False) -> Union[bytes, None]:
try:
msg = await self._socket.arecv_msg()
if send_empty_response == True:
await self._socket.asend(self.EMPTY_RESPONSE)
except pynng.exceptions.Timeout:
return None
except BaseException as e:
logger.error(e)
raise CommunicationError(f"{self} failed to receive message")
return msg.bytes
async def reply(self, data: bytes = None) -> bool:
data = data or self.EMPTY_RESPONSE
try:
await self._socket.asend(data)
except pynng.exceptions.Timeout:
return False
except BaseException as e:
logger.error(repr(e))
raise CommunicationError(
f"{self} failed to reply to message. Reason: {repr(e)}"
)
return True |
Any updates on this ? Anyway I can assist ? |
As the author of nng said, the bus protocol should be very much better in recent builds nanomsg/nng#1571 (comment)
But pynng still use the version 1.5 years ago, hope the author can upgrade the dependent nng version.
The text was updated successfully, but these errors were encountered: