Skip to content

Commit

Permalink
Merge pull request #255 from andrewwhitehead/feature/uvloop
Browse files Browse the repository at this point in the history
Add optional support for uvloop
  • Loading branch information
swcurran authored Nov 6, 2019
2 parents da9d143 + c1b5fc1 commit 9d6ec5c
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 17 deletions.
15 changes: 8 additions & 7 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class AdminModulesSchema(Schema):
"""Schema for the modules endpoint."""

result = fields.List(
fields.Str(description="admin module"),
description="List of admin modules",
fields.Str(description="admin module"), description="List of admin modules",
)


Expand Down Expand Up @@ -382,7 +381,9 @@ async def send_webhook(self, topic: str, payload: dict):
"""Add a webhook to the queue, to send to all registered targets."""
if not self.webhook_queue:
self.webhook_queue = await self.context.inject(BaseOutboundMessageQueue)
self.webhook_task = asyncio.ensure_future(self._process_webhooks())
self.webhook_task = asyncio.get_event_loop().create_task(
self._process_webhooks()
)
await self.webhook_queue.enqueue((topic, payload))

async def _process_webhooks(self):
Expand All @@ -392,16 +393,16 @@ async def _process_webhooks(self):
if collector:
session_args["trace_configs"] = [StatsTracer(collector, "webhook-http:")]
self.webhook_session = ClientSession(**session_args)
self.webhook_processor = TaskProcessor(max_pending=5)
self.webhook_processor = TaskProcessor(max_pending=20)
async for topic, payload in self.webhook_queue:
for queue in self.websocket_queues.values():
await queue.enqueue({"topic": topic, "payload": payload})
if self.webhook_targets:
if topic == "connections_activity":
# filter connections activity by default (only sent to sockets)
continue
targets = self.webhook_targets.copy()
for idx, target in targets.items():
if topic == "connections_activity":
# filter connections activity by default (only sent to sockets)
continue
if not target.topic_filter or topic in target.topic_filter:
retries = (
self.webhook_retries
Expand Down
8 changes: 8 additions & 0 deletions aries_cloudagent/commands/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from argparse import ArgumentParser
from typing import Coroutine, Sequence

try:
import uvloop
except ImportError:
uvloop = None

from ..conductor import Conductor
from ..config import argparse as arg
from ..config.default_context import DefaultContextBuilder
Expand Down Expand Up @@ -54,6 +59,9 @@ def execute(argv: Sequence[str] = None):
conductor = Conductor(context_builder)

# Run the application
if uvloop:
uvloop.install()
print("uvloop installed")
run_loop(start_app(conductor), shutdown_app(conductor))


Expand Down
10 changes: 7 additions & 3 deletions aries_cloudagent/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ async def dispatch(
)

if error_result:
return asyncio.ensure_future(responder.send_reply(error_result))
return asyncio.get_event_loop().create_task(
responder.send_reply(error_result)
)

context.injector.bind_instance(BaseResponder, responder)

Expand All @@ -96,7 +98,9 @@ async def dispatch(
collector: Collector = await context.inject(Collector, required=False)
if collector:
collector.wrap(handler_obj, "handle", ["any-message-handler"])
handler = asyncio.ensure_future(handler_obj.handle(context, responder))
handler = asyncio.get_event_loop().create_task(
handler_obj.handle(context, responder)
)
return handler

async def make_message(self, parsed_msg: dict) -> AgentMessage:
Expand Down Expand Up @@ -190,7 +194,7 @@ async def send_webhook(self, topic: str, payload: dict):
topic: the webhook topic identifier
payload: the webhook payload value
"""
asyncio.ensure_future(self._dispatch_webhook(topic, payload))
asyncio.get_event_loop().create_task(self._dispatch_webhook(topic, payload))

async def _dispatch_webhook(self, topic: str, payload: dict):
"""Perform dispatch of a webhook."""
Expand Down
5 changes: 3 additions & 2 deletions aries_cloudagent/task_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(self, *, max_pending: int = 10):
"""Instantiate the dispatcher."""
self.done_event = asyncio.Event()
self.done_event.set()
self.loop = asyncio.get_event_loop()
self.max_pending = max_pending
self.pending = set()
self.pending_lock = asyncio.Lock()
Expand Down Expand Up @@ -97,11 +98,11 @@ def _enqueue_task(self, task: PendingTask):
task.attempts += 1
task.running = asyncio.ensure_future(awaitable)
task.running.add_done_callback(
lambda fut: asyncio.ensure_future(self._check_task(task))
lambda fut: self.loop.create_task(self._check_task(task))
)
else:
task.future.set_result(None)
asyncio.ensure_future(self._check_task(task))
self.loop.create_task(self._check_task(task))

async def _check_task(self, task: PendingTask):
"""Complete a task."""
Expand Down
5 changes: 3 additions & 2 deletions aries_cloudagent/transport/outbound/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ async def start_transport(self, schemes, transport_cls):
async def start(self):
"""Start all transports and feed messages from the queue."""
startup = []
loop = asyncio.get_event_loop()
for schemes, transport_class in self.registered_transports.items():
# Don't block the loop
startup.append(
asyncio.ensure_future(self.start_transport(schemes, transport_class))
loop.create_task(self.start_transport(schemes, transport_class))
)
self.startup_tasks = startup
self.polling_task = asyncio.ensure_future(self.poll())
self.polling_task = loop.create_task(self.poll())

async def stop(self, wait: bool = True):
"""Stop all transports."""
Expand Down
5 changes: 3 additions & 2 deletions aries_cloudagent/transport/outbound/queue/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ async def dequeue(self, *, timeout: int = None):
"""
stop_event, queue = self.stop_event, self.queue
if not stop_event.is_set():
stopped = asyncio.ensure_future(stop_event.wait())
dequeued = asyncio.ensure_future(queue.get())
loop = asyncio.get_event_loop()
stopped = loop.create_task(stop_event.wait())
dequeued = loop.create_task(queue.get())
done, pending = await asyncio.wait(
(stopped, dequeued),
timeout=timeout,
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ def parse_requirements(filename):
package_data={"aries_cloudagent": ["requirements.txt"]},
install_requires=parse_requirements("requirements.txt"),
tests_require=parse_requirements("requirements.dev.txt"),
extras_require={"indy": parse_requirements("requirements.indy.txt")},
extras_require={
"indy": parse_requirements("requirements.indy.txt"),
"uvloop": {"uvloop": "^=0.14.0"},
},
python_requires=">=3.6.3",
scripts=["bin/aca-py"],
)

0 comments on commit 9d6ec5c

Please sign in to comment.