Skip to content

Commit

Permalink
Merge pull request #2990 from OpSecId/pstlouis/no-transport-mode
Browse files Browse the repository at this point in the history
Enable `no-transport` mode as startup parameter
  • Loading branch information
swcurran authored Jun 24, 2024
2 parents 594d642 + 5ca29dc commit 66c9c1d
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 116 deletions.
54 changes: 37 additions & 17 deletions aries_cloudagent/config/argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,10 @@ def get_settings(self, args: Namespace) -> dict:
if args.endpoint:
settings["default_endpoint"] = args.endpoint[0]
settings["additional_endpoints"] = args.endpoint[1:]
else:

elif "no_transport" not in args:
raise ArgsParseError("-e/--endpoint is required")

if args.profile_endpoint:
settings["profile_endpoint"] = args.profile_endpoint

Expand Down Expand Up @@ -1273,6 +1275,19 @@ class TransportGroup(ArgumentGroup):

def add_arguments(self, parser: ArgumentParser):
"""Add transport-specific command line arguments to the parser."""
parser.add_argument(
"--no-transport",
dest="no_transport",
action="store_true",
env_var="ACAPY_NO_TRANSPORT",
help=(
"Specifies that aca-py will run with no transport configured. "
"This must be set if running in no-transport mode. Overrides any "
"specified transport or endpoint configurations. "
"Either this parameter or the "
"'--endpoint' parameter MUST be specified. Default: false."
),
)
parser.add_argument(
"-it",
"--inbound-transport",
Expand Down Expand Up @@ -1381,28 +1396,33 @@ def add_arguments(self, parser: ArgumentParser):
def get_settings(self, args: Namespace):
"""Extract transport settings."""
settings = {}
if args.inbound_transports:
settings["transport.inbound_configs"] = args.inbound_transports
else:
raise ArgsParseError("-it/--inbound-transport is required")
if args.outbound_transports:
settings["transport.outbound_configs"] = args.outbound_transports
if args.no_transport:
settings["transport.disabled"] = True
else:
raise ArgsParseError("-ot/--outbound-transport is required")
settings["transport.enable_undelivered_queue"] = args.enable_undelivered_queue
if args.inbound_transports:
settings["transport.inbound_configs"] = args.inbound_transports
else:
raise ArgsParseError("-it/--inbound-transport is required")
if args.outbound_transports:
settings["transport.outbound_configs"] = args.outbound_transports
else:
raise ArgsParseError("-ot/--outbound-transport is required")
settings["transport.enable_undelivered_queue"] = (
args.enable_undelivered_queue
)
if args.max_message_size:
settings["transport.max_message_size"] = args.max_message_size
if args.max_outbound_retry:
settings["transport.max_outbound_retry"] = args.max_outbound_retry
if args.ws_heartbeat_interval:
settings["transport.ws.heartbeat_interval"] = args.ws_heartbeat_interval
if args.ws_timeout_interval:
settings["transport.ws.timeout_interval"] = args.ws_timeout_interval

if args.label:
settings["default_label"] = args.label
if args.image_url:
settings["image_url"] = args.image_url
if args.max_message_size:
settings["transport.max_message_size"] = args.max_message_size
if args.max_outbound_retry:
settings["transport.max_outbound_retry"] = args.max_outbound_retry
if args.ws_heartbeat_interval:
settings["transport.ws.heartbeat_interval"] = args.ws_heartbeat_interval
if args.ws_timeout_interval:
settings["transport.ws.timeout_interval"] = args.ws_timeout_interval

return settings

Expand Down
8 changes: 6 additions & 2 deletions aries_cloudagent/config/default_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ async def load_plugins(self, context: InjectionContext):
context.injector.bind_instance(PluginRegistry, plugin_registry)

# Register standard protocol plugins
plugin_registry.register_package("aries_cloudagent.protocols")
if not self.settings.get("transport.disabled"):
plugin_registry.register_package("aries_cloudagent.protocols")

# Currently providing admin routes only
plugin_registry.register_plugin("aries_cloudagent.holder")
plugin_registry.register_plugin("aries_cloudagent.ledger")

if not self.settings.get("ledger.disabled"):
plugin_registry.register_plugin("aries_cloudagent.ledger")

plugin_registry.register_plugin("aries_cloudagent.messaging.jsonld")
plugin_registry.register_plugin("aries_cloudagent.resolver")
plugin_registry.register_plugin("aries_cloudagent.settings")
Expand Down
126 changes: 67 additions & 59 deletions aries_cloudagent/config/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,45 +324,47 @@ def print_banner(
# Title
banner.title(agent_label or "ACA")
# Inbound transports
banner.subtitle("Inbound Transports")
internal_in_transports = [
f"{transport.scheme}://{transport.host}:{transport.port}"
for transport in inbound_transports.values()
if not transport.is_external
]
if internal_in_transports:
banner.list(internal_in_transports)
external_in_transports = [
f"{transport.scheme}://{transport.host}:{transport.port}"
for transport in inbound_transports.values()
if transport.is_external
]
if external_in_transports:
banner.subtitle(" External Plugin")
banner.list(external_in_transports)
if inbound_transports:
banner.subtitle("Inbound Transports")
internal_in_transports = [
f"{transport.scheme}://{transport.host}:{transport.port}"
for transport in inbound_transports.values()
if not transport.is_external
]
if internal_in_transports:
banner.list(internal_in_transports)
external_in_transports = [
f"{transport.scheme}://{transport.host}:{transport.port}"
for transport in inbound_transports.values()
if transport.is_external
]
if external_in_transports:
banner.subtitle(" External Plugin")
banner.list(external_in_transports)

# Outbound transports
banner.subtitle("Outbound Transports")
internal_schemes = set().union(
*(
transport.schemes
for transport in outbound_transports.values()
if not transport.is_external
if outbound_transports:
banner.subtitle("Outbound Transports")
internal_schemes = set().union(
*(
transport.schemes
for transport in outbound_transports.values()
if not transport.is_external
)
)
)
if internal_schemes:
banner.list([f"{scheme}" for scheme in sorted(internal_schemes)])

external_schemes = set().union(
*(
transport.schemes
for transport in outbound_transports.values()
if transport.is_external
if internal_schemes:
banner.list([f"{scheme}" for scheme in sorted(internal_schemes)])

external_schemes = set().union(
*(
transport.schemes
for transport in outbound_transports.values()
if transport.is_external
)
)
)
if external_schemes:
banner.subtitle(" External Plugin")
banner.list([f"{scheme}" for scheme in sorted(external_schemes)])
if external_schemes:
banner.subtitle(" External Plugin")
banner.list([f"{scheme}" for scheme in sorted(external_schemes)])

# DID info
if public_did:
Expand All @@ -387,36 +389,42 @@ def print_banner(
def print_notices(cls, settings: Settings):
"""Print notices and warnings."""
with Banner(border=":", length=80, file=sys.stderr) as banner:
banner.centered("⚠ DEPRECATION NOTICE: ⚠")
banner.hr()
if settings.get("wallet.type", "in_memory").lower() == "indy":
banner.centered("⚠ DEPRECATION NOTICE: ⚠")
banner.hr()
banner.print(
"The Indy wallet type is deprecated, use Askar instead; see: "
"https://aca-py.org/main/deploying/IndySDKtoAskarMigration/",
)
banner.hr()
banner.print(
"Receiving a core DIDComm protocol with the "
"`did:sov:BzCbsNYhMrjHiqZDTUASHg;spec` prefix is deprecated. All parties "
"sending this prefix should be notified that support for receiving such "
"messages will be removed in a future release. "
"Use https://didcomm.org/ instead."
)
banner.hr()
banner.print(
"Aries RFC 0160: Connection Protocol is deprecated and support will be "
"removed in a future release; use RFC 0023: DID Exchange instead."
)
banner.hr()
banner.print(
"Aries RFC 0036: Issue Credential 1.0 is deprecated and support will be "
"removed in a future release; use RFC 0453: Issue Credential 2.0 instead."
)
banner.hr()
banner.print(
"Aries RFC 0037: Present Proof 1.0 is deprecated and support will be "
"removed in a future release; use RFC 0454: Present Proof 2.0 instead."
)
if not settings.get("transport.disabled"):
banner.centered("⚠ DEPRECATION NOTICE: ⚠")
banner.hr()
banner.print(
"Receiving a core DIDComm protocol with the "
"`did:sov:BzCbsNYhMrjHiqZDTUASHg;spec` prefix is deprecated. "
"All parties sending this prefix should be notified that support "
"for receiving such messages will be removed in a future release. "
"Use https://didcomm.org/ instead."
)
banner.hr()
banner.print(
"Aries RFC 0160: Connection Protocol is deprecated and "
"support will be removed in a future release; "
"use RFC 0023: DID Exchange instead."
)
banner.hr()
banner.print(
"Aries RFC 0036: Issue Credential 1.0 is deprecated "
"and support will be removed in a future release; "
"use RFC 0453: Issue Credential 2.0 instead."
)
banner.hr()
banner.print(
"Aries RFC 0037: Present Proof 1.0 is deprecated "
"and support will be removed in a future release; "
"use RFC 0454: Present Proof 2.0 instead."
)
print()


Expand Down
93 changes: 55 additions & 38 deletions aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,20 +191,22 @@ async def setup(self):
):
LOGGER.warning("No ledger configured")

# Register all inbound transports
self.inbound_transport_manager = InboundTransportManager(
self.root_profile, self.inbound_message_router, self.handle_not_returned
)
await self.inbound_transport_manager.setup()
context.injector.bind_instance(
InboundTransportManager, self.inbound_transport_manager
)
if not context.settings.get("transport.disabled"):
# Register all inbound transports if enabled
self.inbound_transport_manager = InboundTransportManager(
self.root_profile, self.inbound_message_router, self.handle_not_returned
)
await self.inbound_transport_manager.setup()
context.injector.bind_instance(
InboundTransportManager, self.inbound_transport_manager
)

# Register all outbound transports
self.outbound_transport_manager = OutboundTransportManager(
self.root_profile, self.handle_not_delivered
)
await self.outbound_transport_manager.setup()
if not context.settings.get("transport.disabled"):
# Register all outbound transports
self.outbound_transport_manager = OutboundTransportManager(
self.root_profile, self.handle_not_delivered
)
await self.outbound_transport_manager.setup()

# Initialize dispatcher
self.dispatcher = Dispatcher(self.root_profile)
Expand Down Expand Up @@ -286,17 +288,18 @@ async def start(self) -> None:
context = self.root_profile.context
await self.check_for_valid_wallet_type(self.root_profile)

# Start up transports
try:
await self.inbound_transport_manager.start()
except Exception:
LOGGER.exception("Unable to start inbound transports")
raise
try:
await self.outbound_transport_manager.start()
except Exception:
LOGGER.exception("Unable to start outbound transports")
raise
if not context.settings.get("transport.disabled"):
# Start up transports if enabled
try:
await self.inbound_transport_manager.start()
except Exception:
LOGGER.exception("Unable to start inbound transports")
raise
try:
await self.outbound_transport_manager.start()
except Exception:
LOGGER.exception("Unable to start outbound transports")
raise

# Start up Admin server
if self.admin_server:
Expand All @@ -316,14 +319,23 @@ async def start(self) -> None:
# Get agent label
default_label = context.settings.get("default_label")

# Show some details about the configuration to the user
LoggingConfigurator.print_banner(
default_label,
self.inbound_transport_manager.registered_transports,
self.outbound_transport_manager.registered_transports,
self.setup_public_did and self.setup_public_did.did,
self.admin_server,
)
if context.settings.get("transport.disabled"):
LoggingConfigurator.print_banner(
default_label,
None,
None,
self.setup_public_did and self.setup_public_did.did,
self.admin_server,
)
else:
LoggingConfigurator.print_banner(
default_label,
self.inbound_transport_manager.registered_transports,
self.outbound_transport_manager.registered_transports,
self.setup_public_did and self.setup_public_did.did,
self.admin_server,
)

LoggingConfigurator.print_notices(context.settings)

# record ACA-Py version in Wallet, if needed
Expand Down Expand Up @@ -620,19 +632,24 @@ def dispatch_complete(self, message: InboundMessage, completed: CompletedTask):
async def get_stats(self) -> dict:
"""Get the current stats tracked by the conductor."""
stats = {
"in_sessions": len(self.inbound_transport_manager.sessions),
"in_sessions": (
len(self.inbound_transport_manager.sessions)
if self.inbound_transport_manager
else 0
),
"out_encode": 0,
"out_deliver": 0,
"task_active": self.dispatcher.task_queue.current_active,
"task_done": self.dispatcher.task_queue.total_done,
"task_failed": self.dispatcher.task_queue.total_failed,
"task_pending": self.dispatcher.task_queue.current_pending,
}
for m in self.outbound_transport_manager.outbound_buffer:
if m.state == QueuedOutboundMessage.STATE_ENCODE:
stats["out_encode"] += 1
if m.state == QueuedOutboundMessage.STATE_DELIVER:
stats["out_deliver"] += 1
if self.outbound_transport_manager:
for m in self.outbound_transport_manager.outbound_buffer:
if m.state == QueuedOutboundMessage.STATE_ENCODE:
stats["out_encode"] += 1
if m.state == QueuedOutboundMessage.STATE_DELIVER:
stats["out_deliver"] += 1
return stats

async def outbound_message_router(
Expand Down

0 comments on commit 66c9c1d

Please sign in to comment.