Skip to content

Commit

Permalink
Modify, Feature, Refactor Share design
Browse files Browse the repository at this point in the history
  • Loading branch information
so1n committed Aug 15, 2023
1 parent d329e22 commit 76ae98f
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 67 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,4 @@ venv.bak/
.idea/
.vscode/
.portry.lock
portry.lock
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
repos:
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910
rev: v1.0.0
hooks:
- id: mypy
- repo: https://github.com/PyCQA/isort
rev: 5.9.3
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 21.7b0
rev: 23.1.0
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
rev: 3.9.2
hooks:
- id: flake8
- repo: https://github.com/myint/autoflake
rev: v1.4
rev: v2.0.1
hooks:
- id: autoflake
args: ['--in-place', '--remove-all-unused-imports', '--remove-unused-variable', '--ignore-init-module-imports']
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
rev: v4.4.0
hooks:
- id: check-ast
- id: check-byte-order-marker
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# fast-tools
`fast-tools` is a `FastApi/Starlette` toolset, Most of the tools can be used in FastApi/Starlette, a few tools only support `FastApi` which is divided into the lack of compatibility with `FastApi`

Note: this is alpha quality code still, the API may change, and things may fall apart while you try it.
Note:
- this is alpha quality code still, the API may change, and things may fall apart while you try it.
- The current branch is under development and the documentation may not be unified

```python
# origin of name
Expand Down
2 changes: 2 additions & 0 deletions README_CH.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# fast-tools
`fast-tools`是一个`FastApi/Starlette`的库合集, 大部分库都可用于FastApi/Starlette, 少部库只支持`FastApi`是为了兼容`FastApi`的不足

> Note: 当前主分支正在开发中, 文档可能不统一
```python
# 名字由来
project_name = ('FastApi'[:2] + 'Starlette'[:2]).lower() + '-tools'
Expand Down
56 changes: 42 additions & 14 deletions example/share.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async def delay_print(duration: int) -> int:


async def run_do(share: "Share") -> None:
task_list: "List[Coroutine]" = [share.do("test_do", delay_print, args=[i]) for i in [1, 2, 3, 4, 5, 6, 7, 8, 9]]
task_list = [share.do("test_do", delay_print, args=[i]) for i in [1, 2, 3, 4, 5, 6, 7, 8, 9]]
print("run do start", time.time())
done, _ = await asyncio.wait(task_list)
print("run do end", time.time())
Expand All @@ -26,18 +26,43 @@ async def cancel_in_aio(share: "Share") -> None:
share.cancel()


async def run_cancel(share: "Share") -> None:
async def run_cancel_in_aio(share: "Share") -> None:
task_list: "List[Coroutine]" = [
share.do("test_cancel", delay_print, args=[i]) for i in [11, 12, 13, 14, 15, 16, 17, 18, 19]
share.do("test_cancel_in_aio", delay_print, args=[i]) for i in [11, 12, 13, 14, 15, 16, 17, 18, 19]
]
task_list.append(cancel_in_aio(share))
print("run cancel in aio start", time.time())
t_list = [asyncio.Task(t) for t in task_list]
await asyncio.sleep(1)

result = []
for t in t_list:
try:
await t
result.append(1)
except asyncio.CancelledError:
result.append(0)
print("run cancel in aio end", result)


async def must_cancel_coro() -> None:
await asyncio.sleep(0)
raise asyncio.CancelledError("must cancel")


async def run_cancel(share: "Share") -> None:
task_list = [share.do("test_cancel", must_cancel_coro) for _ in range(10)]
print("run cancel start", time.time())
try:
done, _ = await asyncio.wait(task_list)
print("run cancel result", [future.result() for future in done])
except asyncio.CancelledError:
print("run cancel error: asyncio.CancelledError")
print("run cancel end", time.time())
t_list = [asyncio.Task(t) for t in task_list]
await asyncio.sleep(1)
result = []
for t in t_list:
try:
await t
result.append(1)
except asyncio.CancelledError:
result.append(0)
print("run cancel end", result)


async def run_wapper_do(share: "Share") -> None:
Expand All @@ -46,7 +71,7 @@ async def test_wapper_do(num: int) -> int:
print(f"call wapper :{num}")
return await delay_print(num)

task_list: "List[Coroutine]" = [test_wapper_do(i) for i in [21, 22, 23, 24, 25, 26, 27, 28, 29]]
task_list = [test_wapper_do(i) for i in [21, 22, 23, 24, 25, 26, 27, 28, 29]]
print("run wapper do start", time.time())
done, _ = await asyncio.wait(task_list)
print("run wapper do end", time.time())
Expand All @@ -61,8 +86,9 @@ async def delay_print(self, num: int) -> int:
return await delay_print(num)

test_class_wapper_do: "TestClassWapperDo" = TestClassWapperDo()
print(test_class_wapper_do.delay_print.__qualname__)

task_list: "List[Coroutine]" = [test_class_wapper_do.delay_print(i) for i in [21, 22, 23, 24, 25, 26, 27, 28, 29]]
task_list = [test_class_wapper_do.delay_print(i) for i in [21, 22, 23, 24, 25, 26, 27, 28, 29]]
print("run class wapper do start", time.time())
done, _ = await asyncio.wait(task_list)
print("run class wapper do end", time.time())
Expand All @@ -73,9 +99,11 @@ def main() -> None:
share: "Share" = Share()

loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
loop.run_until_complete(run_do(share))
loop.run_until_complete(run_cancel(share))
loop.run_until_complete(run_wapper_do(share))
# loop.run_until_complete(run_wapper_do(share))
# loop.run_until_complete(run_do(share))
# loop.run_until_complete(run_cancel(share))
# loop.run_until_complete(run_cancel_in_aio(share))
# loop.run_until_complete(run_wapper_do(share))
loop.run_until_complete(run_class_wapper_do(share))


Expand Down
1 change: 0 additions & 1 deletion fast_tools/base/redis_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def __init__(
block_timeout: Optional[int] = None,
sleep_time: float = 0.1,
):

self._redis: "RedisHelper" = redis_helper
self._lock_key: str = f"{self._redis.namespace}:lock:{lock_key}"
self._timeout: int = timeout
Expand Down
19 changes: 15 additions & 4 deletions fast_tools/base/route_trie.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Dict, List, Optional, Union

from starlette.routing import Match, Route
from starlette.routing import BaseRoute, Host, Match, Mount, Route
from starlette.types import ASGIApp, Scope


Expand All @@ -21,15 +21,26 @@ def __init__(self) -> None:
self.root: Dict[str, Union["RouteTrie", dict, Route, List[Route]]] = {}
self.route_dict: Dict["RouteTrie", List[Route]] = {}

def insert_by_route(self, route: BaseRoute, path: str = ""):
if isinstance(route, Route):
url: str = path + route.path
self.insert(url, route)
elif isinstance(route, (Mount, Host)):
path = path + route.path
for r in route.routes:
self.insert_by_route(r, path=path)
else:
raise TypeError(f"Not support class:{route.__class__}")

def insert_by_app(self, app: ASGIApp) -> None:
while True:
sub_app: ASGIApp = getattr(app, "app", None)
if not sub_app:
break
app = sub_app
for route in app.routes:
url: str = route.path
self.insert(url, route)

for route in app.routes: # type: ignore
self.insert_by_route(route)

def insert(self, url_path: str, route: Route) -> None:
cur_node: "RouteNode" = self.root_node
Expand Down
86 changes: 54 additions & 32 deletions fast_tools/share.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import asyncio
from functools import wraps
from typing import Any, Callable, Dict, Optional, TypeVar, Union
from typing import Any, Callable, Coroutine, Dict, Generic, Optional, Tuple, TypeVar, Union

from typing_extensions import ParamSpec

__all__ = ("Share", "Token")
_Tp = TypeVar("_Tp")


class Token(object):
class Token(Generic[_Tp]):
"""Result and status of managed actions"""

def __init__(self, key: str):
self._key: str = key
self._future: Optional[asyncio.Future] = None
def __init__(self, key: Any):
self._key: Any = key
self._future: Optional[asyncio.Future[_Tp]] = None

def can_do(self) -> bool:
"""Determine whether there is a future, if not, create a new future and return true, otherwise return false"""
Expand All @@ -34,15 +36,15 @@ def cancel(self) -> bool:
return True
return False

async def await_done(self) -> Optional[Any]:
async def await_done(self) -> _Tp:
"""Wait for execution to end and return data"""
if not self._future:
raise RuntimeError(f"You should use Token<{self._key}>.can_do() before Token<{self._key}>.await_done()")
if not self._future.done():
await self._future
return self._future.result()

def set_result(self, result: Any) -> bool:
def set_result(self, result: Union[_Tp, Exception]) -> bool:
"""set data or exception"""
if self._future and not self._future.done():
if isinstance(result, Exception):
Expand All @@ -53,57 +55,77 @@ def set_result(self, result: Any) -> bool:
return False


_ShareKeyType = Union[Tuple[Any, ...], str]
P = ParamSpec("P")
R_T = TypeVar("R_T")


class Share(object):
def __init__(self) -> None:
self._future_dict: Dict[str, Token] = dict()
self._future_dict: Dict[_ShareKeyType, Token] = dict()

def _get_token(self, key: str) -> Token:
def _get_token(self, key: _ShareKeyType) -> Token:
"""Get the token (if not, create a new one and return)"""
if key not in self._future_dict:
token: "Token" = Token(key)
self._future_dict[key] = token
self._future_dict[key] = Token(key)
return self._future_dict[key]

def cancel(self, key: Optional[str] = None) -> None:
def cancel(self, key: Optional[_ShareKeyType] = None) -> None:
"""Cancel the execution of the specified action, if the key is empty, cancel all"""
if not key:
for _, token in self._future_dict.items():
for token in self._future_dict.values():
token.cancel()
else:
self._future_dict[key].cancel()

def forget(self, key: _ShareKeyType) -> None:
if key not in self._future_dict:
raise KeyError(f"Key {key} not found")
self._future_dict.pop(key, None)

async def _token_handle(
self, key: str, func: Callable, args: Optional[Union[list, tuple]], kwargs: Optional[dict]
) -> Any:
args = args or ()
kwargs = kwargs or {}
self, key: _ShareKeyType, func: Callable[P, Coroutine[Any, Any, R_T]], args: P.args, kwargs: P.args
) -> R_T:
token: Token = self._get_token(key)
try:
if token.can_do():
try:
token.set_result(await func(*args, **kwargs))
token.set_result(await func(*(args or ()), **(kwargs or {})))
except Exception as e:
token.set_result(e)
raise e
return await token.await_done()
finally:
self._future_dict.pop(key, None)

async def do(
self, key: str, func: Callable, args: Optional[Union[list, tuple]] = None, kwargs: Optional[dict] = None
) -> Any:
return await self._token_handle(key, func, args, kwargs)

def wrapper_do(self, key: Optional[str] = None) -> Callable:
def wrapper(func: Callable) -> Callable:
key_name: str = func.__name__ + str(id(func)) if key is None else key
def do(
self,
key: _ShareKeyType,
func: Callable[P, Coroutine[Any, Any, R_T]],
args: P.args = None,
kwargs: P.kwargs = None,
) -> Coroutine[Any, Any, R_T]:
# trick mypy
return self._token_handle(key, func, args, kwargs)

def wrapper_do(
self, key: Optional[str] = None, only_include_class_param: bool = True, include_param: bool = False
) -> Callable:
if only_include_class_param and include_param:
raise ValueError("only_include_class_param and include_param can't be True at the same time")

def wrapper(func: Callable[P, Coroutine[Any, Any, R_T]]) -> Callable[P, Coroutine[Any, Any, R_T]]:
key_name: str = func.__qualname__ if key is None else key

@wraps(func)
async def wrapper_func(*args: Any, **kwargs: Any) -> Any:
real_key_name: str = key_name
if args and args[0].__class__.__name__ in func.__qualname__:
real_key_name += f":{id(args[0])}"
return await self._token_handle(real_key_name, func, args, kwargs)
async def wrapper_func(*args: P.args, **kwargs: P.kwargs) -> R_T:
if include_param:
real_key: Tuple[Any, ...] = (key_name, tuple(args), tuple(kwargs.values()))
else:
if only_include_class_param and args and args[0].__class__.__name__ in func.__qualname__:
real_key = (key_name + f":{id(args[0])}",)
else:
real_key = (key_name,)
return await self._token_handle(real_key, func, args, kwargs)

return wrapper_func

Expand Down
1 change: 0 additions & 1 deletion tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

class TestContext:
def test_contest(self, mocker: MockFixture) -> None:

with TestClient(app) as client:
response: Response = client.get("/")
resp_dict: dict = response.json()
Expand Down
Loading

0 comments on commit 76ae98f

Please sign in to comment.