-
Notifications
You must be signed in to change notification settings - Fork 75
/
acapy_handler.py
99 lines (79 loc) · 3.7 KB
/
acapy_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import json
import structlog
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, Request
from pymongo.database import Database
from ..authSessions.crud import AuthSessionCRUD
from ..authSessions.models import AuthSession, AuthSessionPatch, AuthSessionState
from ..core.acapy.client import AcapyClient
from ..db.session import get_db
from ..core.config import settings
from ..routers.socketio import sio, connections_reload
logger = structlog.getLogger(__name__)
router = APIRouter()
async def _parse_webhook_body(request: Request):
return json.loads((await request.body()).decode("ascii"))
@router.post("/topic/{topic}/")
async def post_topic(request: Request, topic: str, db: Database = Depends(get_db)):
"""Called by aca-py agent."""
logger.info(f">>> post_topic : topic={topic}")
client = AcapyClient()
match topic:
case "present_proof_v2_0":
webhook_body = await _parse_webhook_body(request)
logger.info(f">>>> pres_exch_id: {webhook_body['pres_ex_id']}")
auth_session: AuthSession = await AuthSessionCRUD(db).get_by_pres_exch_id(
webhook_body["pres_ex_id"]
)
# Get the saved websocket session
pid = str(auth_session.id)
connections = connections_reload()
sid = connections.get(pid)
if webhook_body["state"] == "presentation-received":
logger.info("presentation-received")
if webhook_body["state"] == "done":
logger.info("VERIFIED")
if webhook_body["verified"] == "true":
auth_session.proof_status = AuthSessionState.VERIFIED
auth_session.presentation_exchange = webhook_body["by_format"]
await sio.emit("status", {"status": "verified"}, to=sid)
else:
auth_session.proof_status = AuthSessionState.FAILED
await sio.emit("status", {"status": "failed"}, to=sid)
await AuthSessionCRUD(db).patch(
str(auth_session.id), AuthSessionPatch(**auth_session.dict())
)
# abandoned state
if webhook_body["state"] == "abandoned":
logger.info("ABANDONED")
logger.info(webhook_body["error_msg"])
auth_session.proof_status = AuthSessionState.ABANDONED
await sio.emit("status", {"status": "abandoned"}, to=sid)
await AuthSessionCRUD(db).patch(
str(auth_session.id), AuthSessionPatch(**auth_session.dict())
)
# Calcuate the expiration time of the proof
now_time = datetime.now()
expired_time = now_time + timedelta(
seconds=settings.CONTROLLER_PRESENTATION_EXPIRE_TIME
)
# Update the expiration time of the proof
auth_session.expired_timestamp = expired_time
await AuthSessionCRUD(db).patch(
str(auth_session.id), AuthSessionPatch(**auth_session.dict())
)
# Check if expired. But only if the proof has not been started.
if (
expired_time < now_time
and auth_session.proof_status == AuthSessionState.NOT_STARTED
):
logger.info("EXPIRED")
auth_session.proof_status = AuthSessionState.EXPIRED
await sio.emit("status", {"status": "expired"}, to=sid)
await AuthSessionCRUD(db).patch(
str(auth_session.id), AuthSessionPatch(**auth_session.dict())
)
pass
case _:
logger.debug("skipping webhook")
return {}