Skip to content

Commit

Permalink
Copy pythonGH-98137, make some changes, add benchmarking script, tryi…
Browse files Browse the repository at this point in the history
…ng to get it working
itamaro committed Feb 1, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 95fb0e0 commit 77f852c
Showing 4 changed files with 243 additions and 15 deletions.
31 changes: 30 additions & 1 deletion Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -36,7 +36,36 @@
tasks.__all__ +
threads.__all__ +
timeouts.__all__ +
transports.__all__)
transports.__all__ + (
'create_eager_task_factory',
'eager_task_factory',
))

# throwing things here temporarily to defer premature dir layout bikeshedding

def create_eager_task_factory(custom_task_constructor):

def factory(loop, coro, *, name=None, context=None):
loop._check_closed()
try:
result = coro.send(None)
except StopIteration as si:
print("XXX")
fut = loop.create_future()
fut.set_result(si.value)
return fut
except Exception as ex:
print("YYY")
fut = loop.create_future()
fut.set_exception(ex)
return fut
else:
return custom_task_constructor(
coro, loop=loop, name=name, context=context, yield_result=result)

return factory

eager_task_factory = create_eager_task_factory(Task)

if sys.platform == 'win32': # pragma: no cover
from .windows_events import *
7 changes: 5 additions & 2 deletions Lib/asyncio/taskgroups.py
Original file line number Diff line number Diff line change
@@ -141,11 +141,14 @@ def create_task(self, coro, *, name=None, context=None):
raise RuntimeError(f"TaskGroup {self!r} is finished")
if self._aborting:
raise RuntimeError(f"TaskGroup {self!r} is shutting down")
if context is None:
if hasattr(self._loop, "eager_task_factory"):
task = self._loop.eager_task_factory(coro, name=name, context=context)
elif context is None:
task = self._loop.create_task(coro)
else:
task = self._loop.create_task(coro, context=context)
tasks._set_task_name(task, name)
if not task.done(): # If it's done already, it's a future
tasks._set_task_name(task, name)
task.add_done_callback(self._on_task_done)
self._tasks.add(task)
return task
33 changes: 21 additions & 12 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
@@ -75,6 +75,8 @@ def _set_task_name(task, name):
set_name(name)


_NOT_SET = object()

class Task(futures._PyFuture): # Inherit Python Task implementation
# from a Python Future implementation.

@@ -93,7 +95,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True

def __init__(self, coro, *, loop=None, name=None, context=None):
def __init__(self, coro, *, loop=None, name=None, context=None,
yield_result=_NOT_SET):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
@@ -117,7 +120,10 @@ def __init__(self, coro, *, loop=None, name=None, context=None):
else:
self._context = context

self._loop.call_soon(self.__step, context=self._context)
if yield_result is _NOT_SET:
self._loop.call_soon(self.__step, context=self._context)
else:
self.__step2(yield_result)
_register_task(self)

def __del__(self):
@@ -287,6 +293,12 @@ def __step(self, exc=None):
except BaseException as exc:
super().set_exception(exc)
else:
self.__step2(result)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.

def __step2(self, result):
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
@@ -333,9 +345,6 @@ def __step(self, exc=None):
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.

def __wakeup(self, future):
try:
@@ -357,13 +366,13 @@ def __wakeup(self, future):
_PyTask = Task


try:
import _asyncio
except ImportError:
pass
else:
# _CTask is needed for tests.
Task = _CTask = _asyncio.Task
# try:
# import _asyncio
# except ImportError:
# pass
# else:
# # _CTask is needed for tests.
# Task = _CTask = _asyncio.Task


def create_task(coro, *, name=None, context=None):
187 changes: 187 additions & 0 deletions async_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com)
"""
Benchmark script for recursive async tree workloads. This script includes the
following microbenchmark scenarios:
1) "no_suspension": No suspension in the async tree.
2) "suspense_all": Suspension (simulating IO) at all leaf nodes in the async tree.
3) "memoization": Simulated IO calls at all leaf nodes, but with memoization. Only
un-memoized IO calls will result in suspensions.
4) "cpu_io_mixed": A mix of CPU-bound workload and IO-bound workload (with
memoization) at the leaf nodes.
Use the commandline flag or choose the corresponding <Scenario>AsyncTree class
to run the desired microbenchmark scenario.
"""


import asyncio
import math
import random
import time
from argparse import ArgumentParser


NUM_RECURSE_LEVELS = 6
NUM_RECURSE_BRANCHES = 6
IO_SLEEP_TIME = 0.05
DEFAULT_MEMOIZABLE_PERCENTAGE = 90
DEFAULT_CPU_PROBABILITY = 0.5
FACTORIAL_N = 500


def parse_args():
parser = ArgumentParser(
description="""\
Benchmark script for recursive async tree workloads. It can be run as a standalone
script, in which case you can specify the microbenchmark scenario to run and whether
to print the results.
"""
)
parser.add_argument(
"-s",
"--scenario",
choices=["no_suspension", "suspense_all", "memoization", "cpu_io_mixed"],
default="no_suspension",
help="""\
Determines which microbenchmark scenario to run. Defaults to no_suspension. Options:
1) "no_suspension": No suspension in the async tree.
2) "suspense_all": Suspension (simulating IO) at all leaf nodes in the async tree.
3) "memoization": Simulated IO calls at all leaf nodes, but with memoization. Only
un-memoized IO calls will result in suspensions.
4) "cpu_io_mixed": A mix of CPU-bound workload and IO-bound workload (with
memoization) at the leaf nodes.
""",
)
parser.add_argument(
"-m",
"--memoizable-percentage",
type=int,
default=DEFAULT_MEMOIZABLE_PERCENTAGE,
help="""\
Sets the percentage (0-100) of the data that should be memoized, defaults to 90. For
example, at the default 90 percent, data 1-90 will be memoized and data 91-100 will not.
""",
)
parser.add_argument(
"-c",
"--cpu-probability",
type=float,
default=DEFAULT_CPU_PROBABILITY,
help="""\
Sets the probability (0-1) that a leaf node will execute a cpu-bound workload instead
of an io-bound workload. Defaults to 0.5. Only applies to the "cpu_io_mixed"
microbenchmark scenario.
""",
)
parser.add_argument(
"-p",
"--print",
action="store_true",
default=False,
help="Print the results (runtime and number of Tasks created).",
)
return parser.parse_args()


class AsyncTree:
def __init__(
self,
memoizable_percentage=DEFAULT_MEMOIZABLE_PERCENTAGE,
cpu_probability=DEFAULT_CPU_PROBABILITY,
):
self.suspense_count = 0
self.task_count = 0
self.memoizable_percentage = memoizable_percentage
self.cpu_probability = cpu_probability
self.cache = {}
# set to deterministic random, so that the results are reproducible
random.seed(0)

async def mock_io_call(self):
self.suspense_count += 1
await asyncio.sleep(IO_SLEEP_TIME)

def create_task(self, loop, coro):
self.task_count += 1
return asyncio.Task(coro, loop=loop)

async def suspense_func(self):
raise NotImplementedError(
"To be implemented by each microbenchmark's derived class."
)

async def recurse(self, recurse_level):
if recurse_level == 0:
await self.suspense_func()
return

await asyncio.gather(
*[self.recurse(recurse_level - 1) for _ in range(NUM_RECURSE_BRANCHES)]
)

def run(self):
loop = asyncio.new_event_loop()
# eager_factory = asyncio.create_eager_task_factory(self.create_task)
# loop.set_task_factory(eager_factory)
loop.set_task_factory(asyncio.eager_task_factory)
loop.run_until_complete(self.recurse(NUM_RECURSE_LEVELS))
loop.close()


class NoSuspensionAsyncTree(AsyncTree):
async def suspense_func(self):
return


class SuspenseAllAsyncTree(AsyncTree):
async def suspense_func(self):
await self.mock_io_call()


class MemoizationAsyncTree(AsyncTree):
async def suspense_func(self):
# deterministic random (seed preset)
data = random.randint(1, 100)

if data <= self.memoizable_percentage:
if self.cache.get(data):
return data

self.cache[data] = True

await self.mock_io_call()
return data


class CpuIoMixedAsyncTree(MemoizationAsyncTree):
async def suspense_func(self):
if random.random() < self.cpu_probability:
# mock cpu-bound call
return math.factorial(FACTORIAL_N)
else:
return await MemoizationAsyncTree.suspense_func(self)


if __name__ == "__main__":
args = parse_args()
scenario = args.scenario

trees = {
"no_suspension": NoSuspensionAsyncTree,
"suspense_all": SuspenseAllAsyncTree,
"memoization": MemoizationAsyncTree,
"cpu_io_mixed": CpuIoMixedAsyncTree,
}
async_tree_class = trees[scenario]
async_tree = async_tree_class(args.memoizable_percentage, args.cpu_probability)

start_time = time.perf_counter()
async_tree.run()
end_time = time.perf_counter()

if args.print:
print(f"Scenario: {scenario}")
print(f"Time: {end_time - start_time} s")
print(f"Tasks created: {async_tree.task_count}")
print(f"Suspense called: {async_tree.suspense_count}")

0 comments on commit 77f852c

Please sign in to comment.