From 00a115b1b56e2def8231ae44fbef9b87a6a4e656 Mon Sep 17 00:00:00 2001
From: Daniel Bluhm <dbluhm@pm.me>
Date: Fri, 22 Apr 2022 13:18:38 -0400
Subject: [PATCH 1/5] feat: add webhook capture

Signed-off-by: Daniel Bluhm <dbluhm@pm.me>
---
 echo_agent/app.py           |  57 +++++++++++++++++++
 echo_agent/client.py        |  65 ++++++++++++++++++++-
 echo_agent/models.py        |   7 +++
 echo_agent/webhook_queue.py | 109 ++++++++++++++++++++++++++++++++++++
 tests/test_client.py        |  85 +++++++++++++++++++++++++++-
 5 files changed, 320 insertions(+), 3 deletions(-)
 create mode 100644 echo_agent/webhook_queue.py

diff --git a/echo_agent/app.py b/echo_agent/app.py
index b05d099..afad1ca 100644
--- a/echo_agent/app.py
+++ b/echo_agent/app.py
@@ -28,11 +28,14 @@
 from fastapi import Body, FastAPI, HTTPException, Request
 from pydantic import dataclasses
 
+from .webhook_queue import Queue
+
 from .session import Session, SessionMessage
 from .models import (
     NewConnection,
     ConnectionInfo as ConnectionInfoDataclass,
     SessionInfo,
+    Webhook,
 )
 
 # Logging
@@ -43,6 +46,7 @@
 sessions: Dict[str, Session] = {}
 recip_key_to_connection_id: Dict[str, str] = {}
 messages: Dict[str, MsgQueue] = {}
+webhooks: Queue[Webhook] = Queue()
 
 
 app = FastAPI(title="Echo Agent", version="0.1.0")
@@ -266,4 +270,57 @@ async def send_message_to_session(session_id: str, message: dict = Body(...)):
     await session.send(message)
 
 
+@app.post("/webhook/topic/{topic}", response_model=Webhook)
+async def receive_webhook(topic: str, payload: dict = Body(...)):
+    """Receive a webhook."""
+    LOGGER.debug("Received webhook: topic %s, payload %s", topic, payload)
+    await webhooks.put(Webhook(topic, payload))
+
+
+@app.get(
+    "/webhooks",
+    response_model=List[Webhook],
+    operation_id="",
+)
+async def get_webhooks(topic: Optional[str] = None):
+    """Retrieve all received messages for recipient key."""
+    if not topic:
+        LOGGER.debug("Retrieving webhooks")
+        return webhooks.get_all()
+
+    return webhooks.get_all(lambda entry: entry.topic == topic)
+
+
+@app.get("/webhook", response_model=Webhook, operation_id="wait_for_webhook")
+async def get_webhook(
+    topic: Optional[str] = None,
+    wait: Optional[bool] = True,
+    timeout: int = 5,
+):
+    """Wait for a message matching criteria."""
+
+    def _condition(entry: Webhook):
+        return entry.topic == topic if topic else True
+
+    if wait:
+        try:
+            webhook = await webhooks.get(condition=_condition, timeout=timeout)
+        except asyncio.TimeoutError:
+            raise HTTPException(
+                status_code=408,
+                detail=("No webhook found before timeout"),
+            )
+    else:
+        webhook = webhooks.get_nowait(condition=_condition)
+
+    if not webhook:
+        raise HTTPException(
+            status_code=404,
+            detail="No webhook found",
+        )
+
+    LOGGER.debug("Received webhook, returning to waiting client: %s", webhook)
+    return webhook
+
+
 __all__ = ["app"]
diff --git a/echo_agent/client.py b/echo_agent/client.py
index 2785508..6e8b77e 100644
--- a/echo_agent/client.py
+++ b/echo_agent/client.py
@@ -1,11 +1,11 @@
 """Client to Echo Agent."""
 from contextlib import AbstractAsyncContextManager, asynccontextmanager
 from dataclasses import asdict
-from typing import Any, List, Mapping, Optional, Union
+from typing import Any, Dict, List, Mapping, Optional, Union
 
 from httpx import AsyncClient
 
-from .models import ConnectionInfo, NewConnection, SessionInfo
+from .models import ConnectionInfo, NewConnection, SessionInfo, Webhook
 
 
 class EchoClientError(Exception):
@@ -228,3 +228,64 @@ async def send_message_to_session(
 
         if response.is_error:
             raise EchoClientError(f"Failed to send message: {response.content}")
+
+    async def new_webhook(self, topic: str, payload: Dict[str, Any]):
+        if not self.client:
+            raise NoOpenClient(
+                "No client has been opened; use `async with echo_client`"
+            )
+
+        response = await self.client.post(f"/webhook/topic/{topic}", json=payload)
+
+        if response.is_error:
+            raise EchoClientError("Failed to receive webhook")
+
+    async def get_webhooks(
+        self,
+        *,
+        topic: Optional[str] = None,
+    ) -> List[Webhook]:
+        if not self.client:
+            raise NoOpenClient(
+                "No client has been opened; use `async with echo_client`"
+            )
+
+        response = await self.client.get(
+            "/webhooks",
+            params={"topic": topic} if topic else {},
+        )
+
+        if response.is_error:
+            raise EchoClientError(f"Failed to retrieve webhooks: {response.content}")
+
+        return response.json()
+
+    async def get_webhook(
+        self,
+        *,
+        topic: Optional[str] = None,
+        wait: Optional[bool] = True,
+        timeout: Optional[int] = 5,
+    ) -> Mapping[str, Any]:
+        if not self.client:
+            raise NoOpenClient(
+                "No client has been opened; use `async with echo_client`"
+            )
+
+        response = await self.client.get(
+            "/webhook",
+            params={
+                k: v
+                for k, v in {
+                    "topic": topic,
+                    "wait": wait,
+                    "timeout": timeout,
+                }.items()
+                if v is not None
+            },
+        )
+
+        if response.is_error:
+            raise EchoClientError(f"Failed to wait for webhook: {response.content}")
+
+        return response.json()
diff --git a/echo_agent/models.py b/echo_agent/models.py
index 8672d88..aae2e2e 100644
--- a/echo_agent/models.py
+++ b/echo_agent/models.py
@@ -1,4 +1,5 @@
 from dataclasses import dataclass, field
+from typing import Any, Dict
 
 
 @dataclass
@@ -21,3 +22,9 @@ class ConnectionInfo:
 class SessionInfo:
     session_id: str
     connection_id: str
+
+
+@dataclass
+class Webhook:
+    topic: str
+    payload: Dict[str, Any]
diff --git a/echo_agent/webhook_queue.py b/echo_agent/webhook_queue.py
new file mode 100644
index 0000000..b2a9605
--- /dev/null
+++ b/echo_agent/webhook_queue.py
@@ -0,0 +1,109 @@
+import asyncio
+from typing import Any, Callable, Generic, List, Optional, Sequence, TypeVar
+
+
+QueueEntry = TypeVar("QueueEntry")
+
+
+class Queue(Generic[QueueEntry]):
+    def __init__(
+        self,
+        *,
+        condition: Optional[Callable[[QueueEntry], bool]] = None,
+    ):
+        self._queue: List[Any] = []
+        self._cond = asyncio.Condition()
+        self.condition = condition
+
+    def _first_matching_index(self, condition: Callable[[QueueEntry], bool]):
+        for index, entry in enumerate(self._queue):
+            if condition(entry):
+                return index
+        return None
+
+    async def _get(
+        self, condition: Optional[Callable[[QueueEntry], bool]] = None
+    ) -> QueueEntry:
+        """Retrieve a message from the queue."""
+        while True:
+            async with self._cond:
+                # Lock acquired
+                if not self._queue:
+                    # No items on queue yet so we need to wait for items to show up
+                    await self._cond.wait()
+
+                if not self._queue:
+                    # Another task grabbed the value before we got to it
+                    continue
+
+                if not condition:
+                    # Just get the first message
+                    return self._queue.pop()
+
+                # Return first matching item, if present
+                match_idx = self._first_matching_index(condition)
+                if match_idx is not None:
+                    return self._queue.pop(match_idx)
+
+    async def get(
+        self,
+        condition: Optional[Callable[[QueueEntry], bool]] = None,
+        *,
+        timeout: int = 5,
+    ) -> QueueEntry:
+        """Retrieve a message from the queue."""
+        return await asyncio.wait_for(self._get(condition), timeout)
+
+    def get_all(
+        self, condition: Optional[Callable[[QueueEntry], bool]] = None
+    ) -> Sequence[QueueEntry]:
+        """Return all messages matching a given condition."""
+        messages = []
+        if not self._queue:
+            return messages
+
+        if not condition:
+            messages = [entry for entry in self._queue]
+            self._queue.clear()
+            return messages
+
+        # Store messages that didn't match in the order they are seen
+        filtered: List[QueueEntry] = []
+        for entry in self._queue:
+            if condition(entry):
+                messages.append(entry)
+            else:
+                filtered.append(entry)
+
+        # Queue contents set to messages that didn't match condition
+        self._queue[:] = filtered
+        return messages
+
+    def get_nowait(
+        self, condition: Optional[Callable[[QueueEntry], bool]] = None
+    ) -> Optional[QueueEntry]:
+        """Return a message from the queue without waiting."""
+        if not self._queue:
+            return None
+
+        if not condition:
+            return self._queue.pop()
+
+        match_idx = self._first_matching_index(condition)
+        if match_idx is not None:
+            return self._queue.pop(match_idx)
+
+        return None
+
+    async def put(self, value: QueueEntry):
+        """Push a message onto the queue and notify waiting tasks."""
+        if not self.condition or self.condition(value):
+            async with self._cond:
+                self._queue.append(value)
+                self._cond.notify_all()
+
+    def flush(self) -> Sequence[QueueEntry]:
+        """Clear queue and return final contents of queue at time of clear."""
+        final = self._queue.copy()
+        self._queue.clear()
+        return final
diff --git a/tests/test_client.py b/tests/test_client.py
index 8a86649..7d57bf3 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -6,7 +6,7 @@
 from aries_staticagent.message import Message
 import pytest
 
-from echo_agent.app import connections, messages, recip_key_to_connection_id
+from echo_agent.app import connections, messages, recip_key_to_connection_id, webhooks
 from echo_agent.client import EchoClient, NoOpenClient
 from echo_agent.models import ConnectionInfo
 from echo_agent.session import SessionMessage
@@ -199,3 +199,86 @@ async def test_get_message_no_wait(
         await echo_client.new_message(recip.pack(msg))
         message = await echo_client.get_message(connection_id, wait=False)
     assert message
+
+
+@pytest.mark.asyncio
+async def test_receive_webhook(
+    echo_client: EchoClient, recip: Connection, conn: Connection, connection_id: str
+):
+    """Test reception of a webhook."""
+    async with echo_client:
+        await echo_client.new_webhook("test", {"test": "test"})
+    assert webhooks._queue
+
+
+@pytest.mark.asyncio
+async def test_get_webhooks(
+    echo_client: EchoClient, recip: Connection, conn: Connection, connection_id: str
+):
+    """Test reception of a webhook."""
+    async with echo_client:
+        await echo_client.new_webhook("test", {"test": "test"})
+        webhooks = await echo_client.get_webhooks()
+    assert webhooks
+
+
+@pytest.mark.asyncio
+async def test_get_webhooks_condition(
+    echo_client: EchoClient, recip: Connection, conn: Connection, connection_id: str
+):
+    """Test reception of a webhook."""
+    async with echo_client:
+        await echo_client.new_webhook("test", {"test": "test"})
+        webhooks = await echo_client.get_webhooks(topic="test")
+    assert webhooks
+
+
+@pytest.mark.asyncio
+async def test_get_webhook_post(
+    echo_client: EchoClient, recip: Connection, conn: Connection, connection_id: str
+):
+    """Test reception of a webhook."""
+    async with echo_client:
+        await echo_client.new_webhook("test", {"test": "test"})
+        webhook = await echo_client.get_webhook()
+    assert webhook
+
+
+@pytest.mark.asyncio
+async def test_get_webhook_post_condition(
+    echo_client: EchoClient, recip: Connection, conn: Connection, connection_id: str
+):
+    """Test reception of a webhook."""
+    async with echo_client:
+        await echo_client.new_webhook("test", {"test": "test"})
+        webhook = await echo_client.get_webhook(topic="test")
+    assert webhook
+
+
+@pytest.mark.asyncio
+async def test_get_webhook_pre(
+    echo_client: EchoClient, recip: Connection, conn: Connection, connection_id: str
+):
+    """Test reception of a webhook."""
+
+    async def _produce(echo_client):
+        await asyncio.sleep(0.5)
+        await echo_client.new_webhook("test", {"test": "test"})
+
+    async def _consume(echo_client):
+        return await echo_client.get_webhook(topic="test")
+
+    async with echo_client:
+        _, webhook = await asyncio.gather(_produce(echo_client), _consume(echo_client))
+    assert webhook
+
+
+@pytest.mark.asyncio
+async def test_get_webhook_no_wait(
+    echo_client: EchoClient, recip: Connection, conn: Connection, connection_id: str
+):
+    """Test reception of a webhook."""
+    async with echo_client:
+        await echo_client.new_webhook("test", {"test": "test"})
+        webhook = await echo_client.get_webhook(topic="test", wait=False)
+    assert webhook

From a33cd686ea089e068b3c224449b9e8dd39014965 Mon Sep 17 00:00:00 2001
From: Daniel Bluhm <dbluhm@pm.me>
Date: Fri, 22 Apr 2022 13:21:50 -0400
Subject: [PATCH 2/5] feat: allow any webhook path structure

Signed-off-by: Daniel Bluhm <dbluhm@pm.me>
---
 echo_agent/app.py    | 2 +-
 echo_agent/client.py | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/echo_agent/app.py b/echo_agent/app.py
index afad1ca..87f2b7b 100644
--- a/echo_agent/app.py
+++ b/echo_agent/app.py
@@ -270,7 +270,7 @@ async def send_message_to_session(session_id: str, message: dict = Body(...)):
     await session.send(message)
 
 
-@app.post("/webhook/topic/{topic}", response_model=Webhook)
+@app.post("/webhook/{topic:path}", response_model=Webhook)
 async def receive_webhook(topic: str, payload: dict = Body(...)):
     """Receive a webhook."""
     LOGGER.debug("Received webhook: topic %s, payload %s", topic, payload)
diff --git a/echo_agent/client.py b/echo_agent/client.py
index 6e8b77e..4eb615b 100644
--- a/echo_agent/client.py
+++ b/echo_agent/client.py
@@ -235,7 +235,7 @@ async def new_webhook(self, topic: str, payload: Dict[str, Any]):
                 "No client has been opened; use `async with echo_client`"
             )
 
-        response = await self.client.post(f"/webhook/topic/{topic}", json=payload)
+        response = await self.client.post(f"/webhook/{topic}", json=payload)
 
         if response.is_error:
             raise EchoClientError("Failed to receive webhook")

From 8f44a3cc4b42079416191e0a87564f33ab78d73b Mon Sep 17 00:00:00 2001
From: Daniel Bluhm <dbluhm@pm.me>
Date: Mon, 25 Apr 2022 12:21:00 -0400
Subject: [PATCH 3/5] fix: attempt to fix different loop issues in test

Signed-off-by: Daniel Bluhm <dbluhm@pm.me>
---
 tests/conftest.py | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/tests/conftest.py b/tests/conftest.py
index c7e089b..1571f1c 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,7 +1,9 @@
 import pytest
-from echo_agent import app, EchoClient
+from echo_agent import EchoClient
 
 
 @pytest.fixture
-def echo_client():
+async def echo_client():
+    from echo_agent import app
+
     yield EchoClient(base_url="http://test", app=app)

From 2d631bb4ef9774c39fb62df90f15a7cc5c748ec2 Mon Sep 17 00:00:00 2001
From: Daniel Bluhm <dbluhm@pm.me>
Date: Mon, 25 Apr 2022 12:25:01 -0400
Subject: [PATCH 4/5] fix: explicit loop task creation

Signed-off-by: Daniel Bluhm <dbluhm@pm.me>
---
 tests/test_client.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/tests/test_client.py b/tests/test_client.py
index 7d57bf3..3963ebd 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -269,7 +269,11 @@ async def _consume(echo_client):
         return await echo_client.get_webhook(topic="test")
 
     async with echo_client:
-        _, webhook = await asyncio.gather(_produce(echo_client), _consume(echo_client))
+        loop = asyncio.get_event_loop()
+        _, webhook = await asyncio.gather(
+            loop.create_task(_produce(echo_client)),
+            loop.create_task(_consume(echo_client)),
+        )
     assert webhook
 
 

From 10031b57dc71738a14dfce5aedbf9b696f3bf8ac Mon Sep 17 00:00:00 2001
From: Daniel Bluhm <dbluhm@pm.me>
Date: Mon, 25 Apr 2022 13:29:58 -0400
Subject: [PATCH 5/5] fix: async setup of webhooks queue

Signed-off-by: Daniel Bluhm <dbluhm@pm.me>
---
 echo_agent/app.py           | 5 +++++
 echo_agent/webhook_queue.py | 5 ++++-
 tests/conftest.py           | 2 ++
 3 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/echo_agent/app.py b/echo_agent/app.py
index 87f2b7b..10d5cce 100644
--- a/echo_agent/app.py
+++ b/echo_agent/app.py
@@ -52,6 +52,11 @@
 app = FastAPI(title="Echo Agent", version="0.1.0")
 
 
+@app.on_event("startup")
+async def setup_webhook_queue():
+    await webhooks.setup()
+
+
 ConnectionInfo = dataclasses.dataclass(ConnectionInfoDataclass)
 
 
diff --git a/echo_agent/webhook_queue.py b/echo_agent/webhook_queue.py
index b2a9605..5cff3ad 100644
--- a/echo_agent/webhook_queue.py
+++ b/echo_agent/webhook_queue.py
@@ -12,9 +12,12 @@ def __init__(
         condition: Optional[Callable[[QueueEntry], bool]] = None,
     ):
         self._queue: List[Any] = []
-        self._cond = asyncio.Condition()
+        self._cond: Optional[asyncio.Condition] = None
         self.condition = condition
 
+    async def setup(self):
+        self._cond = asyncio.Condition()
+
     def _first_matching_index(self, condition: Callable[[QueueEntry], bool]):
         for index, entry in enumerate(self._queue):
             if condition(entry):
diff --git a/tests/conftest.py b/tests/conftest.py
index 1571f1c..986dc46 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,9 +1,11 @@
 import pytest
 from echo_agent import EchoClient
+from echo_agent.app import webhooks
 
 
 @pytest.fixture
 async def echo_client():
     from echo_agent import app
 
+    await webhooks.setup()
     yield EchoClient(base_url="http://test", app=app)