-
Notifications
You must be signed in to change notification settings - Fork 9
/
webhooks.py
61 lines (45 loc) · 2.06 KB
/
webhooks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from typing import List
from fastapi import APIRouter, Depends
from app.dependencies.auth import AcaPyAuthVerified, acapy_auth_verified
from app.services.webhooks import get_hooks_for_wallet, get_hooks_for_wallet_by_topic
from shared.log_config import get_logger
from shared.models.webhook_events import CloudApiTopics, CloudApiWebhookEventGeneric
logger = get_logger(__name__)
router = APIRouter(prefix="/v1/webhooks", tags=["webhooks"])
@router.get("", deprecated=True)
async def get_webhooks_for_wallet(
auth: AcaPyAuthVerified = Depends(acapy_auth_verified),
) -> List[CloudApiWebhookEventGeneric]:
"""
**Deprecated**: Fetching bulk webhook events is set to be removed.
We recommend monitoring webhook events live, using the SSE endpoint instead, or websockets if preferred.
Returns 100 most recent webhooks for this wallet
This implicitly extracts the wallet ID and return only items
belonging to the caller's wallet.
Returns:
---------
List of webhooks belonging to the wallet
"""
logger.bind(body={"wallet_id": auth.wallet_id}).debug(
"GET request received: Get webhooks for wallet"
)
return await get_hooks_for_wallet(wallet_id=auth.wallet_id)
@router.get("/{topic}", deprecated=True)
async def get_webhooks_for_wallet_by_topic(
topic: CloudApiTopics,
auth: AcaPyAuthVerified = Depends(acapy_auth_verified),
) -> List[CloudApiWebhookEventGeneric]:
"""
**Deprecated**: Fetching bulk webhook events is set to be removed.
We recommend monitoring webhook events live, using the SSE endpoint instead, or websockets if preferred.
Returns 100 most recent webhooks for this wallet / topic pair
This implicitly extracts the wallet ID and return only items
belonging to the caller's wallet.
Returns:
---------
List of webhooks belonging to the wallet
"""
logger.bind(body={"wallet_id": auth.wallet_id, "topic": topic}).debug(
"GET request received: Get webhooks for wallet by topic"
)
return await get_hooks_for_wallet_by_topic(wallet_id=auth.wallet_id, topic=topic)