Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: 改进 CommandGroupMatcherGroup 的结构 #1240

Merged
merged 7 commits into from
Sep 10, 2022
183 changes: 78 additions & 105 deletions nonebot/plugin/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
"""
import re
import inspect
from functools import wraps
from types import ModuleType
from datetime import datetime, timedelta
from typing import Any, Set, Dict, List, Type, Tuple, Union, Optional
from typing import Any, Set, Dict, List, Type, Tuple, Union, Callable, Optional

from nonebot.adapters import Event
from nonebot.matcher import Matcher
Expand Down Expand Up @@ -463,7 +464,51 @@ def on_type(
return on(rule=is_type(*event_types) & rule, **kwargs, _depth=_depth + 1)


class CommandGroup:
class Group:
def __init__(self, **kwargs):
"""创建一个事件响应器组合,参数为默认值,与 `on` 一致"""
self.matchers: List[Type[Matcher]] = []
"""组内事件响应器列表"""
self.base_kwargs: Dict[str, Any] = kwargs
"""其他传递给 `on` 的参数默认值"""

def __init_subclass__(cls) -> None:
def add_matcher(
func: Callable[..., Type[Matcher]]
) -> Callable[..., Type[Matcher]]:
@wraps(func)
def wrapper(self, *args, **kwargs) -> Type[Matcher]:
matcher = func(self, *args, **kwargs)
self.matchers.append(matcher)
return matcher

return wrapper

cls_attrs = inspect.getmembers(cls, inspect.isfunction)
for key, attr in cls_attrs:
return_annotation = inspect.signature(attr).return_annotation
if return_annotation == Type[Matcher]:
setattr(cls, key, add_matcher(attr))
yanyongyu marked this conversation as resolved.
Show resolved Hide resolved

def _get_final_kwargs(
self, update: Dict[str, Any], *, exclude: Optional[Set[str]] = None
) -> dict[str, Any]:
"""获取最终传递给 `on` 的参数

参数:
update: 更新的关键字参数
exclude: 需要排除的参数
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(update)
if exclude:
for key in exclude:
final_kwargs.pop(key, None)
final_kwargs["_depth"] = 2
return final_kwargs


class CommandGroup(Group):
"""命令组,用于声明一组有相同名称前缀的命令。

参数:
Expand All @@ -479,12 +524,10 @@ class CommandGroup:
"""

def __init__(self, cmd: Union[str, Tuple[str, ...]], **kwargs):
self.basecmd: Tuple[str, ...] = (cmd,) if isinstance(cmd, str) else cmd
"""命令前缀"""
if "aliases" in kwargs:
del kwargs["aliases"]
self.base_kwargs: Dict[str, Any] = kwargs
"""其他传递给 `on_command` 的参数默认值"""
super().__init__(**kwargs)
self.basecmd: Tuple[str, ...] = (cmd,) if isinstance(cmd, str) else cmd
self.base_kwargs.pop("aliases", None)

def command(self, cmd: Union[str, Tuple[str, ...]], **kwargs) -> Type[Matcher]:
"""注册一个新的命令。新参数将会覆盖命令组默认值
Expand All @@ -503,10 +546,7 @@ def command(self, cmd: Union[str, Tuple[str, ...]], **kwargs) -> Type[Matcher]:
"""
sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
cmd = self.basecmd + sub_cmd

final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_command(cmd, **final_kwargs, _depth=1)
return on_command(cmd, **self._get_final_kwargs(kwargs))

def shell_command(
self, cmd: Union[str, Tuple[str, ...]], **kwargs
Expand All @@ -528,22 +568,12 @@ def shell_command(
"""
sub_cmd = (cmd,) if isinstance(cmd, str) else cmd
cmd = self.basecmd + sub_cmd

final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
return on_shell_command(cmd, **final_kwargs, _depth=1)
return on_shell_command(cmd, **self._get_final_kwargs(kwargs))


class MatcherGroup:
class MatcherGroup(Group):
"""事件响应器组合,统一管理。为 `Matcher` 创建提供默认属性。"""

def __init__(self, **kwargs):
"""创建一个事件响应器组合,参数为默认值,与 `on` 一致"""
self.matchers: List[Type[Matcher]] = []
"""组内事件响应器列表"""
self.base_kwargs: Dict[str, Any] = kwargs
"""其他传递给 `on` 的参数默认值"""

def on(self, **kwargs) -> Type[Matcher]:
"""注册一个基础事件响应器,可自定义类型。

Expand All @@ -558,11 +588,7 @@ def on(self, **kwargs) -> Type[Matcher]:
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
matcher = on(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
return on(**self._get_final_kwargs(kwargs))

def on_metaevent(self, **kwargs) -> Type[Matcher]:
"""注册一个元事件响应器。
Expand All @@ -576,13 +602,8 @@ def on_metaevent(self, **kwargs) -> Type[Matcher]:
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
final_kwargs.pop("permission", None)
matcher = on_metaevent(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type", "permission"})
return on_metaevent(**final_kwargs)

def on_message(self, **kwargs) -> Type[Matcher]:
"""注册一个消息事件响应器。
Expand All @@ -597,12 +618,8 @@ def on_message(self, **kwargs) -> Type[Matcher]:
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_message(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type"})
return on_message(**final_kwargs)

def on_notice(self, **kwargs) -> Type[Matcher]:
"""注册一个通知事件响应器。
Expand All @@ -616,13 +633,8 @@ def on_notice(self, **kwargs) -> Type[Matcher]:
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
final_kwargs.pop("permission", None)
matcher = on_notice(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type", "permission"})
return on_notice(**final_kwargs)

def on_request(self, **kwargs) -> Type[Matcher]:
"""注册一个请求事件响应器。
Expand All @@ -636,13 +648,8 @@ def on_request(self, **kwargs) -> Type[Matcher]:
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
final_kwargs.pop("permission", None)
matcher = on_request(**final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type", "permission"})
return on_request(**final_kwargs)

def on_startswith(
self, msg: Union[str, Tuple[str, ...]], **kwargs
Expand All @@ -661,12 +668,8 @@ def on_startswith(
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_startswith(msg, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type"})
return on_startswith(msg, **final_kwargs)

def on_endswith(self, msg: Union[str, Tuple[str, ...]], **kwargs) -> Type[Matcher]:
"""注册一个消息事件响应器,并且当消息的**文本部分**以指定内容结尾时响应。
Expand All @@ -683,12 +686,8 @@ def on_endswith(self, msg: Union[str, Tuple[str, ...]], **kwargs) -> Type[Matche
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_endswith(msg, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type"})
return on_endswith(msg, **final_kwargs)

def on_fullmatch(self, msg: Union[str, Tuple[str, ...]], **kwargs) -> Type[Matcher]:
"""注册一个消息事件响应器,并且当消息的**文本部分**与指定内容完全一致时响应。
Expand All @@ -705,12 +704,8 @@ def on_fullmatch(self, msg: Union[str, Tuple[str, ...]], **kwargs) -> Type[Match
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_fullmatch(msg, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type"})
return on_fullmatch(msg, **final_kwargs)

def on_keyword(self, keywords: Set[str], **kwargs) -> Type[Matcher]:
"""注册一个消息事件响应器,并且当消息纯文本部分包含关键词时响应。
Expand All @@ -726,12 +721,8 @@ def on_keyword(self, keywords: Set[str], **kwargs) -> Type[Matcher]:
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_keyword(keywords, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type"})
return on_keyword(keywords, **final_kwargs)

def on_command(
self,
Expand All @@ -755,12 +746,8 @@ def on_command(
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_command(cmd, aliases=aliases, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type"})
return on_command(cmd, aliases=aliases, **final_kwargs)

def on_shell_command(
self,
Expand Down Expand Up @@ -788,14 +775,8 @@ def on_shell_command(
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_shell_command(
cmd, aliases=aliases, parser=parser, **final_kwargs, _depth=1
)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type"})
return on_shell_command(cmd, aliases=aliases, parser=parser, **final_kwargs)

def on_regex(
self, pattern: str, flags: Union[int, re.RegexFlag] = 0, **kwargs
Expand All @@ -816,12 +797,8 @@ def on_regex(
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_regex(pattern, flags=flags, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type"})
return on_regex(pattern, flags=flags, **final_kwargs)

def on_type(
self, types: Union[Type[Event], Tuple[Type[Event]]], **kwargs
Expand All @@ -839,9 +816,5 @@ def on_type(
block: 是否阻止事件向更低优先级传递
state: 默认 state
"""
final_kwargs = self.base_kwargs.copy()
final_kwargs.update(kwargs)
final_kwargs.pop("type", None)
matcher = on_type(types, **final_kwargs, _depth=1)
self.matchers.append(matcher)
return matcher
final_kwargs = self._get_final_kwargs(kwargs, exclude={"type"})
return on_type(types, **final_kwargs)