Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement opening a websocket session to connection #3

Merged
merged 10 commits into from
Nov 17, 2021
Prev Previous commit
Next Next commit
fix: wait for socket to open if expected soon on send
Signed-off-by: Daniel Bluhm <[email protected]>
  • Loading branch information
dbluhm committed Nov 17, 2021
commit a887ff5f80f95ffd386a2e628a4b58a367cd152b
11 changes: 10 additions & 1 deletion echo_agent/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, connection: Connection, endpoint: Optional[str] = None):

self._task: Optional[asyncio.Future] = None
self.socket: Optional[aiohttp.ClientWebSocketResponse] = None
self._opened: asyncio.Event = asyncio.Event()

async def _open(self):
LOGGER.debug("Starting session to %s", self.endpoint)
Expand All @@ -51,6 +52,7 @@ async def _open(self):
async with session.ws_connect(self.endpoint) as socket:
LOGGER.debug("Socket connected to %s", self.endpoint)
self.socket = socket
self._opened.set()
async for msg in socket:
LOGGER.debug("Received ws message: %s", msg)
if msg.type == aiohttp.WSMsgType.BINARY:
Expand Down Expand Up @@ -84,9 +86,16 @@ async def close(self):
with suppress(asyncio.CancelledError):
await self._task
self._task = None
self._opened.clear()

async def send(self, msg: Union[dict, Message]):
if not self.socket:
raise SocketClosed("No open socket to send message")
if self._task:
await self._opened.wait()
else:
raise SocketClosed("No open socket to send message")
if not self.socket:
raise SocketClosed("No open socket even after waiting for open")

packed = self.connection.pack(msg)
await self.socket.send_bytes(packed)