-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
667a19d
commit fe019bf
Showing
19 changed files
with
4,232 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
2.0.5 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
"""Home Assistant Intent Language parser""" | ||
|
||
from .expression import ( | ||
ListReference, | ||
RuleReference, | ||
Sentence, | ||
Sequence, | ||
SequenceType, | ||
TextChunk, | ||
) | ||
from .intents import Intents | ||
from .parse_expression import parse_sentence | ||
from .recognize import is_match, recognize, recognize_all, recognize_best |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
"""Command-line interface to hassil.""" | ||
|
||
import argparse | ||
import logging | ||
import os | ||
import sys | ||
from pathlib import Path | ||
|
||
import yaml | ||
|
||
from .intents import Intents, TextSlotList | ||
from .recognize import recognize | ||
from .util import merge_dict | ||
|
||
_LOGGER = logging.getLogger("hassil") | ||
|
||
|
||
def main(): | ||
"""Main entry point""" | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument("yaml", nargs="+", help="YAML files or directories") | ||
parser.add_argument( | ||
"--areas", | ||
nargs="+", | ||
help="Area names", | ||
default=[], | ||
) | ||
parser.add_argument("--names", nargs="+", default=[], help="Device/entity names") | ||
parser.add_argument( | ||
"--debug", action="store_true", help="Print DEBUG messages to the console" | ||
) | ||
args = parser.parse_args() | ||
|
||
level = logging.DEBUG if args.debug else logging.INFO | ||
logging.basicConfig(level=level) | ||
_LOGGER.debug(args) | ||
|
||
slot_lists = { | ||
"area": TextSlotList.from_strings(args.areas), | ||
"name": TextSlotList.from_strings(args.names), | ||
} | ||
|
||
input_dict = {"intents": {}} | ||
for yaml_path_str in args.yaml: | ||
yaml_path = Path(yaml_path_str) | ||
if yaml_path.is_dir(): | ||
yaml_file_paths = yaml_path.glob("*.yaml") | ||
else: | ||
yaml_file_paths = [yaml_path] | ||
|
||
for yaml_file_path in yaml_file_paths: | ||
_LOGGER.debug("Loading file: %s", yaml_file_path) | ||
with open(yaml_file_path, "r", encoding="utf-8") as yaml_file: | ||
merge_dict(input_dict, yaml.safe_load(yaml_file)) | ||
|
||
assert input_dict, "No intent YAML files loaded" | ||
intents = Intents.from_dict(input_dict) | ||
|
||
_LOGGER.info("Area names: %s", args.areas) | ||
_LOGGER.info("Device/Entity names: %s", args.names) | ||
|
||
if os.isatty(sys.stdout.fileno()): | ||
print("Reading sentences from stdin...", file=sys.stderr) | ||
|
||
try: | ||
for line in sys.stdin: | ||
line = line.strip() | ||
if not line: | ||
continue | ||
|
||
try: | ||
result = recognize(line, intents, slot_lists=slot_lists) | ||
if result is not None: | ||
print( | ||
{ | ||
"intent": result.intent.name, | ||
**{e.name: e.value for e in result.entities_list}, | ||
} | ||
) | ||
else: | ||
print("<no match>") | ||
except Exception: | ||
_LOGGER.exception(line) | ||
except KeyboardInterrupt: | ||
pass | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
"""Shared access to package resources""" | ||
|
||
import os | ||
import typing | ||
from pathlib import Path | ||
|
||
try: | ||
import importlib.resources | ||
|
||
files = importlib.resources.files # type: ignore | ||
except (ImportError, AttributeError): | ||
# Backport for Python < 3.9 | ||
import importlib_resources # type: ignore | ||
|
||
files = importlib_resources.files | ||
|
||
_PACKAGE = "hassil" | ||
_DIR = Path(typing.cast(os.PathLike, files(_PACKAGE))) | ||
|
||
__version__ = (_DIR / "VERSION").read_text(encoding="utf-8").strip() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
"""Errors for hassil.""" | ||
|
||
|
||
class HassilError(Exception): | ||
"""Base class for hassil errors""" | ||
|
||
|
||
class MissingListError(HassilError): | ||
"""Error when a {slot_list} is missing.""" | ||
|
||
|
||
class MissingRuleError(HassilError): | ||
"""Error when an <expansion_rule> is missing.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
"""Classes for representing sentence templates.""" | ||
|
||
import re | ||
from abc import ABC | ||
from dataclasses import dataclass, field | ||
from enum import Enum | ||
from typing import Dict, Iterator, List, Optional | ||
|
||
|
||
@dataclass | ||
class Expression(ABC): | ||
"""Base class for expressions.""" | ||
|
||
|
||
@dataclass | ||
class TextChunk(Expression): | ||
"""Contiguous chunk of text (with whitespace).""" | ||
|
||
# Text with casing/whitespace normalized | ||
text: str = "" | ||
|
||
# Set in __post_init__ | ||
original_text: str = None # type: ignore | ||
|
||
parent: "Optional[Sequence]" = None | ||
|
||
def __post_init__(self): | ||
if self.original_text is None: | ||
self.original_text = self.text | ||
|
||
@property | ||
def is_empty(self) -> bool: | ||
"""True if the chunk is empty""" | ||
return self.text == "" | ||
|
||
@staticmethod | ||
def empty() -> "TextChunk": | ||
"""Returns an empty text chunk""" | ||
return TextChunk() | ||
|
||
|
||
class SequenceType(str, Enum): | ||
"""Type of a sequence. Optionals are alternatives with an empty option.""" | ||
|
||
# Sequence of expressions | ||
GROUP = "group" | ||
|
||
# Expressions where only one will be recognized | ||
ALTERNATIVE = "alternative" | ||
|
||
# Permutations of a set of expressions | ||
PERMUTATION = "permutation" | ||
|
||
|
||
@dataclass | ||
class Sequence(Expression): | ||
"""Ordered sequence of expressions. Supports groups, optionals, and alternatives.""" | ||
|
||
# Items in the sequence | ||
items: List[Expression] = field(default_factory=list) | ||
|
||
# Group or alternative | ||
type: SequenceType = SequenceType.GROUP | ||
|
||
is_optional: bool = False | ||
|
||
def text_chunk_count(self) -> int: | ||
"""Return the number of TextChunk expressions in this sequence (recursive).""" | ||
num_text_chunks = 0 | ||
for item in self.items: | ||
if isinstance(item, TextChunk): | ||
num_text_chunks += 1 | ||
elif isinstance(item, Sequence): | ||
seq: Sequence = item | ||
num_text_chunks += seq.text_chunk_count() | ||
|
||
return num_text_chunks | ||
|
||
def list_names( | ||
self, | ||
expansion_rules: Optional[Dict[str, "Sentence"]] = None, | ||
) -> Iterator[str]: | ||
"""Return names of list references (recursive).""" | ||
for item in self.items: | ||
yield from self._list_names(item, expansion_rules) | ||
|
||
def _list_names( | ||
self, | ||
item: Expression, | ||
expansion_rules: Optional[Dict[str, "Sentence"]] = None, | ||
) -> Iterator[str]: | ||
"""Return names of list references (recursive).""" | ||
if isinstance(item, ListReference): | ||
list_ref: ListReference = item | ||
yield list_ref.list_name | ||
elif isinstance(item, Sequence): | ||
seq: Sequence = item | ||
yield from seq.list_names(expansion_rules) | ||
elif isinstance(item, RuleReference): | ||
rule_ref: RuleReference = item | ||
if expansion_rules and (rule_ref.rule_name in expansion_rules): | ||
rule_body = expansion_rules[rule_ref.rule_name] | ||
yield from self._list_names(rule_body, expansion_rules) | ||
|
||
|
||
@dataclass | ||
class RuleReference(Expression): | ||
"""Reference to an expansion rule by <name>.""" | ||
|
||
# Name of referenced rule | ||
rule_name: str = "" | ||
|
||
|
||
@dataclass | ||
class ListReference(Expression): | ||
"""Reference to a list by {name}.""" | ||
|
||
list_name: str = "" | ||
prefix: Optional[str] = None | ||
suffix: Optional[str] = None | ||
_slot_name: Optional[str] = None | ||
|
||
def __post_init__(self): | ||
if ":" in self.list_name: | ||
# list_name:slot_name | ||
self.list_name, self._slot_name = self.list_name.split(":", maxsplit=1) | ||
else: | ||
self._slot_name = self.list_name | ||
|
||
@property | ||
def slot_name(self) -> str: | ||
"""Name of slot to put list value into.""" | ||
assert self._slot_name is not None | ||
return self._slot_name | ||
|
||
|
||
@dataclass | ||
class Sentence(Sequence): | ||
"""Sequence representing a complete sentence template.""" | ||
|
||
text: Optional[str] = None | ||
pattern: Optional[re.Pattern] = None | ||
|
||
def compile(self, expansion_rules: Dict[str, "Sentence"]) -> None: | ||
if self.pattern is not None: | ||
# Already compiled | ||
return | ||
|
||
pattern_chunks: List[str] = [] | ||
self._compile_expression(self, pattern_chunks, expansion_rules) | ||
|
||
pattern_str = "".join(pattern_chunks).replace(r"\ ", r"[ ]*") | ||
self.pattern = re.compile(f"^{pattern_str}$", re.IGNORECASE) | ||
|
||
def _compile_expression( | ||
self, exp: Expression, pattern_chunks: List[str], rules: Dict[str, "Sentence"] | ||
): | ||
if isinstance(exp, TextChunk): | ||
# Literal text | ||
chunk: TextChunk = exp | ||
if chunk.text: | ||
escaped_text = re.escape(chunk.text) | ||
pattern_chunks.append(escaped_text) | ||
elif isinstance(exp, Sequence): | ||
# Linear sequence or alternative choices | ||
seq: Sequence = exp | ||
if seq.type == SequenceType.GROUP: | ||
# Linear sequence | ||
for item in seq.items: | ||
self._compile_expression(item, pattern_chunks, rules) | ||
elif seq.type == SequenceType.ALTERNATIVE: | ||
# Alternative choices | ||
if seq.items: | ||
pattern_chunks.append("(?:") | ||
for item in seq.items: | ||
self._compile_expression(item, pattern_chunks, rules) | ||
pattern_chunks.append("|") | ||
pattern_chunks[-1] = ")" | ||
else: | ||
raise ValueError(seq) | ||
elif isinstance(exp, ListReference): | ||
# Slot list | ||
pattern_chunks.append("(?:.+)") | ||
|
||
elif isinstance(exp, RuleReference): | ||
# Expansion rule | ||
rule_ref: RuleReference = exp | ||
if rule_ref.rule_name not in rules: | ||
raise ValueError(rule_ref) | ||
|
||
e_rule = rules[rule_ref.rule_name] | ||
self._compile_expression(e_rule, pattern_chunks, rules) | ||
else: | ||
raise ValueError(exp) |
Oops, something went wrong.