Skip to content

Commit

Permalink
Merge pull request #2009 from ianco/mediator-testing
Browse files Browse the repository at this point in the history
Fix for mediator load testing race condition when scaling horizontally
  • Loading branch information
ianco authored Nov 9, 2022
2 parents f857f8c + ab6c64e commit 75c057e
Showing 1 changed file with 33 additions and 14 deletions.
47 changes: 33 additions & 14 deletions aries_cloudagent/protocols/routing/v1_0/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Routing manager classes for tracking and inspecting routing records."""

import asyncio
import logging
from typing import Coroutine, Sequence

from ....core.error import BaseError
Expand All @@ -16,6 +18,12 @@
from .models.route_updated import RouteUpdated


LOGGER = logging.getLogger(__name__)

RECIP_ROUTE_PAUSE = 0.1
RECIP_ROUTE_RETRY = 10


class RoutingManagerError(BaseError):
"""Generic routing error."""

Expand Down Expand Up @@ -54,21 +62,30 @@ async def get_recipient(self, recip_verkey: str) -> RouteRecord:
if not recip_verkey:
raise RoutingManagerError("Must pass non-empty recip_verkey")

try:
async with self._profile.session() as session:
record = await RouteRecord.retrieve_by_recipient_key(
session, recip_verkey
i = 0
record = None
while not record:
try:
LOGGER.info(">>> fetching routing record for verkey: " + recip_verkey)
async with self._profile.session() as session:
record = await RouteRecord.retrieve_by_recipient_key(
session, recip_verkey
)
LOGGER.info(">>> FOUND routing record for verkey: " + recip_verkey)
return record
except StorageDuplicateError:
LOGGER.info(">>> DUPLICATE routing record for verkey: " + recip_verkey)
raise RouteNotFoundError(
f"More than one route record found with recipient key: {recip_verkey}"
)
except StorageDuplicateError:
raise RouteNotFoundError(
f"More than one route record found with recipient key: {recip_verkey}"
)
except StorageNotFoundError:
raise RouteNotFoundError(
f"No route found with recipient key: {recip_verkey}"
)

return record
except StorageNotFoundError:
LOGGER.info(">>> NOT FOUND routing record for verkey: " + recip_verkey)
i += 1
if i > RECIP_ROUTE_RETRY:
raise RouteNotFoundError(
f"No route found with recipient key: {recip_verkey}"
)
await asyncio.sleep(RECIP_ROUTE_PAUSE)

async def get_routes(
self, client_connection_id: str = None, tag_filter: dict = None
Expand Down Expand Up @@ -136,13 +153,15 @@ async def create_route_record(
)
if not recipient_key:
raise RoutingManagerError("Missing recipient_key")
LOGGER.info(">>> creating routing record for verkey: " + recipient_key)
route = RouteRecord(
connection_id=client_connection_id,
wallet_id=internal_wallet_id,
recipient_key=recipient_key,
)
async with self._profile.session() as session:
await route.save(session, reason="Created new route")
LOGGER.info(">>> CREATED routing record for verkey: " + recipient_key)
return route

async def update_routes(
Expand Down

0 comments on commit 75c057e

Please sign in to comment.