From 561b9953f76980d200d989c5946262b5efff0810 Mon Sep 17 00:00:00 2001 From: masklinn Date: Tue, 2 May 2023 09:14:08 +0200 Subject: [PATCH] Typed API & parsers API New API with full typing ======================== Seems pretty self-explanatory, rather than returning somewhat ad-hoc dicts this API works off of dataclasses, it should be compatible with the legacy version through the magic of ~~buying two of them~~ `dataclasses.asdict`. Parser API ========== The legacy version had "parsers" which really represent individual parsing rules. In the new API the job of a parser is what the top-level functions did, they wrap around the entire job of parsing a user-agent string. The core API is just `__call__`, with a selection flag for the domains (seems like the least bad term for what "user agent", "os", and "device" are, other alternatives I considered are "component" and "category", but I'm still ambivalent). Overridable helpers are provided which match the old API's methods (with PEP8 conventions), as well as the same style of helpers at the package toplevel. This resolves a number of limitations: Concurrency ----------- While the library should be thread-safe (and I need to find a way to test that) the ability to instantiate parsers should provide the opportunity for things like thread-local parsers, or actual parallelism if we start using native extensions (regex, re2). It also allows running multiple *parser configurations* concurrently, including e.g. multiple independent custom yaml sets. Not sure there's a use for it, but why not? At the very least it should make using custom YAML datasets much easier than having to set envvars. The caching parser being stateful, it's protected by an optional lock seems like the best way to make caching thread-safe. When only using a single thread, or using thread-local parsers, caching can be disabled by using a `contextlib.nullcontext` as lock. Customization ------------- Public APIs are provided both to instantiate and tune parsers, and to set the global parser. Hopefully this makes evaluating proposed parsers as well as evaluating & tuning caches (algorithm & size) easier. Even more so as we should provide some sort of evaluation CLI in #163. Caches ------ In the old API, package-provided API could only be global and with a single implementation as it had to integrate with the toplevel parsing functions. By reifying the parsing job, a cache is just a parser which delegates the parse if it doesn't have a hit. This allows more easily providing, testing, and evolving alternative cache strategies. Bulk APIs --------- The current parser checks rules (regexes) one at a time on the input, but there are advanced regex APIs which can check a regex *set* and return which one(s) matched, allowing much more efficicent bulk matching e.g. google's re2, rust's regex. With the old scheme, this would be a pretty significant change in use / behaviour, obviating the use of the "parsers" with no recourse. Under the new parsing scheme, these can just be different "base" parsers, they can be the default, they can be cached, and users can instantiate their own parser instead. Misc ---- The new API's UA extractor pipeline supports `patch_minor`, though that requires excluding that bit from the tests as there are apparently broken test cases around that item (ua-parser/uap-core#562). Init Helpers ============ Having proper parsers is the opportunity to allow setting parsers at runtime more easily (instead of load-time envvars), however optional constructors (classmethods) turns out to be iffy from an API and typing perspective both. Instead have the "base" parsers (the ones doing the actual parsing of the UAs) just take a uniform parsed data set, and have utility loaders provide that from various data sources (precompiled, preformatted, or data files). This avoids redundancy and the need for mixins / inheritance, and mypy is *much* happier. Legacy Parsers -> New Matchers ============================== The bridging of the legacy parsers and the new results turned out to be pretty mid. Instead, the new API relies on similar but better typed matcher classes, with a slightly different API: they return `None` on a match failure instead of a triplet, which make them compose better in iteration (e.g. can just `filter` them out). Add a `Matchers` alias to carry them around (a tuple of lists of matchers) for convenience, as well as as base parser parameter. Also clarify the replacer rules, and hopefully implement the thing more clearly. Fixes #93, fixes #142, closes #116 --- README.rst | 158 +++++++-------- pyproject.toml | 2 +- setup.py | 77 ++++++-- src/ua_parser/__init__.py | 99 +++++++++- src/ua_parser/_matchers.pyi | 3 + src/ua_parser/_regexes.pyi | 6 + src/ua_parser/basic.py | 64 ++++++ src/ua_parser/caching.py | 128 ++++++++++++ src/ua_parser/core.py | 363 +++++++++++++++++++++++++++++++++++ src/ua_parser/loaders.py | 139 ++++++++++++++ tests/test_caches.py | 98 ++++++++++ tests/test_core.py | 144 ++++++++++++++ tests/test_legacy.py | 6 +- tests/test_parsers_basics.py | 78 ++++++++ tox.ini | 13 +- 15 files changed, 1278 insertions(+), 100 deletions(-) create mode 100644 src/ua_parser/_matchers.pyi create mode 100644 src/ua_parser/_regexes.pyi create mode 100644 src/ua_parser/basic.py create mode 100644 src/ua_parser/caching.py create mode 100644 src/ua_parser/core.py create mode 100644 src/ua_parser/loaders.py create mode 100644 tests/test_caches.py create mode 100644 tests/test_core.py create mode 100644 tests/test_parsers_basics.py diff --git a/README.rst b/README.rst index 05139388..251e7726 100644 --- a/README.rst +++ b/README.rst @@ -1,8 +1,8 @@ uap-python ========== -A python implementation of the UA Parser (https://github.com/ua-parser, -formerly https://github.com/tobie/ua-parser) +Official python implementation of the `User Agent String +Parser `_ project. Build Status ------------ @@ -10,110 +10,118 @@ Build Status .. image:: https://github.com/ua-parser/uap-python/actions/workflows/ci.yml/badge.svg :alt: CI on the master branch - Installing ---------- -Install via pip -~~~~~~~~~~~~~~~ - -Just run: +Just add ``ua-parser`` to your project's dependencies, or run .. code-block:: sh $ pip install ua-parser -Manual install -~~~~~~~~~~~~~~ - -In the top-level directory run: - -.. code-block:: sh - - $ python setup.py install - -Change Log ---------------- -Because this repo is mostly a python wrapper for the User Agent String Parser repo (https://github.com/ua-parser/uap-core), the changes made to this repo are best described by the update diffs in that project. Please see the diffs for this submodule (https://github.com/ua-parser/uap-core/releases) for a list of what has changed between versions of this package. +to install in the current environment. Getting Started --------------- -Retrieve data on a user-agent string -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Retrieve all data on a user-agent string +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python - >>> from ua_parser import user_agent_parser - >>> import pprint - >>> pp = pprint.PrettyPrinter(indent=4) + >>> from ua_parser import parse >>> ua_string = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36' - >>> parsed_string = user_agent_parser.Parse(ua_string) - >>> pp.pprint(parsed_string) - { 'device': {'brand': 'Apple', 'family': 'Mac', 'model': 'Mac'}, - 'os': { 'family': 'Mac OS X', - 'major': '10', - 'minor': '9', - 'patch': '4', - 'patch_minor': None}, - 'string': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) ' - 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 ' - 'Safari/537.36', - 'user_agent': { 'family': 'Chrome', - 'major': '41', - 'minor': '0', - 'patch': '2272'}} - -Extract browser data from user-agent string -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + >>> parse(ua_string) # doctest: +NORMALIZE_WHITESPACE, +ELLIPSIS + ParseResult(user_agent=UserAgent(family='Chrome', + major='41', + minor='0', + patch='2272', + patch_minor='104'), + os=OS(family='Mac OS X', + major='10', + minor='9', + patch='4', + patch_minor=None), + device=Device(family='Mac', + brand='Apple', + model='Mac'), + string='Mozilla/5.0 (Macintosh; Intel Mac OS... + +Any datum not found in the user agent string is set to ``None``:: + + >>> parse("") + ParseResult(user_agent=None, os=None, device=None, string='') + +Extract only browser data from user-agent string +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python - >>> from ua_parser import user_agent_parser - >>> import pprint - >>> pp = pprint.PrettyPrinter(indent=4) + >>> from ua_parser import parse_user_agent >>> ua_string = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36' - >>> parsed_string = user_agent_parser.ParseUserAgent(ua_string) - >>> pp.pprint(parsed_string) - {'family': 'Chrome', 'major': '41', 'minor': '0', 'patch': '2272'} + >>> parse_user_agent(ua_string) + UserAgent(family='Chrome', major='41', minor='0', patch='2272', patch_minor='104') -.. +For specific domains, a match failure just returns ``None``:: - ⚠️Before 0.15, the convenience parsers (``ParseUserAgent``, - ``ParseOs``, and ``ParseDevice``) were not cached, which could - result in degraded performances when parsing large amounts of - identical user-agents (which might occur for real-world datasets). - - For these versions (up to 0.10 included), prefer using ``Parse`` - and extracting the sub-component you need from the resulting - dictionary. + >>> parse_user_agent("") Extract OS information from user-agent string ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python - >>> from ua_parser import user_agent_parser - >>> import pprint - >>> pp = pprint.PrettyPrinter(indent=4) + >>> from ua_parser import parse_os >>> ua_string = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36' - >>> parsed_string = user_agent_parser.ParseOS(ua_string) - >>> pp.pprint(parsed_string) - { 'family': 'Mac OS X', - 'major': '10', - 'minor': '9', - 'patch': '4', - 'patch_minor': None} - -Extract Device information from user-agent string + >>> parse_os(ua_string) + OS(family='Mac OS X', major='10', minor='9', patch='4', patch_minor=None) + +Extract device information from user-agent string ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python - >>> from ua_parser import user_agent_parser - >>> import pprint - >>> pp = pprint.PrettyPrinter(indent=4) + >>> from ua_parser import parse_device >>> ua_string = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.104 Safari/537.36' - >>> parsed_string = user_agent_parser.ParseDevice(ua_string) - >>> pp.pprint(parsed_string) - {'brand': 'Apple', 'family': 'Mac', 'model': 'Mac'} + >>> parse_device(ua_string) + Device(family='Mac', brand='Apple', model='Mac') + +Parser +~~~~~~ + +Parsers expose the same functions (``parse``, ``parse_user_agent``, +``parse_os``, and ``parse_device``) as the top-level of the package, +however these are all *utility* methods. + +The actual protocol of parsers, and the one method which must be +implemented / overridden is:: + + def __call__(self, str, Components, /) -> ParseResult: + +It's similar to but more flexible than ``parse``: + +- The ``str`` is the user agent string. +- The ``Components`` is a hint, through which the caller requests the + domain (component) they are looking for, any combination of + ``Components.USER_AGENT``, ``Components.OS``, and + ``Components.DEVICE``. ``Domains.ALL`` exists as a convenience alias + for the combination of all three. + + The parser *must* return at least the requested information, but if + that's more convenient or no more expensive it *can* return more. +- The ``ParseResult`` is similar to ``CompleteParseResult``, except + all the attributes are ``Optional`` and it has a ``components: + Components`` attribute which specifies whether a component was never + requested (its value for the user agent string is unknown) or it has + been requested but could not be resolved (no match was found for the + user agent). + + ``ParseResult.complete()`` convert to a ``CompleteParseResult`` if + all the components are set, and raise an exception otherwise. If + some of the components are set to ``None``, they'll be swapped for a + default value. + +Calling the parser directly is part of the public API. One of the +advantage is that it does not return default values, as such it allows +more easily differentiating between a non-match (= ``None``) and a +default fallback (``family = "Other"``). diff --git a/pyproject.toml b/pyproject.toml index 48b6aa91..b42d432e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "ua-parser" description = "Python port of Browserscope's user agent parser" -version = "1.0.0a" +version = "1.0.0a1" readme = "README.rst" requires-python = ">=3.8" dependencies = [] diff --git a/setup.py b/setup.py index 0e14118c..d33bc820 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # flake8: noqa +import io from contextlib import suppress from os import fspath from pathlib import Path @@ -51,6 +52,13 @@ def run(self) -> None: f"Unable to find regexes.yaml, should be at {yaml_src!r}" ) + def write_matcher(f, typ: str, fields: List[Optional[object]]): + f.write(f" {typ}(".encode()) + while len(fields) > 1 and fields[-1] is None: + fields = fields[:-1] + f.write(", ".join(map(repr, fields)).encode()) + f.write(b"),\n") + def write_params(fields): # strip trailing None values while len(fields) > 1 and fields[-1] is None: @@ -70,10 +78,20 @@ def write_params(fields): outdir = dist_dir / self.pkg_name outdir.mkdir(parents=True, exist_ok=True) - dest = outdir / "_regexes.py" + dest = outdir / "_matchers.py" + dest_legacy = outdir / "_regexes.py" - with dest.open("wb") as fp: + with dest.open("wb") as f, dest_legacy.open("wb") as fp: # fmt: off + f.write(b"""\ +######################################################## +# NOTICE: this file is autogenerated from regexes.yaml # +######################################################## + +from .core import Matchers, UserAgentMatcher, OSMatcher, DeviceMatcher + +MATCHERS: Matchers = ([ +""") fp.write(b"# -*- coding: utf-8 -*-\n") fp.write(b"########################################################\n") fp.write(b"# NOTICE: This file is autogenerated from regexes.yaml #\n") @@ -87,31 +105,35 @@ def write_params(fields): fp.write(b"\n") fp.write(b"USER_AGENT_PARSERS = [\n") for device_parser in regexes["user_agent_parsers"]: - fp.write(b" UserAgentParser(\n") - write_params([ + write_matcher(f, "UserAgentMatcher", [ device_parser["regex"], device_parser.get("family_replacement"), device_parser.get("v1_replacement"), device_parser.get("v2_replacement"), ]) - fp.write(b" ),\n") - fp.write(b"]\n") - fp.write(b"\n") - fp.write(b"DEVICE_PARSERS = [\n") - for device_parser in regexes["device_parsers"]: - fp.write(b" DeviceParser(\n") + + fp.write(b" UserAgentParser(\n") write_params([ device_parser["regex"], - device_parser.get("regex_flag"), - device_parser.get("device_replacement"), - device_parser.get("brand_replacement"), - device_parser.get("model_replacement"), + device_parser.get("family_replacement"), + device_parser.get("v1_replacement"), + device_parser.get("v2_replacement"), ]) fp.write(b" ),\n") - fp.write(b"]\n") - fp.write(b"\n") + f.write(b" ], [\n") + fp.write(b"]\n\n") + fp.write(b"OS_PARSERS = [\n") for device_parser in regexes["os_parsers"]: + write_matcher(f, "OSMatcher", [ + device_parser["regex"], + device_parser.get("os_replacement"), + device_parser.get("os_v1_replacement"), + device_parser.get("os_v2_replacement"), + device_parser.get("os_v3_replacement"), + device_parser.get("os_v4_replacement"), + ]) + fp.write(b" OSParser(\n") write_params([ device_parser["regex"], @@ -122,6 +144,29 @@ def write_params(fields): device_parser.get("os_v4_replacement"), ]) fp.write(b" ),\n") + f.write(b" ], [\n") + fp.write(b"]\n\n") + + fp.write(b"DEVICE_PARSERS = [\n") + for device_parser in regexes["device_parsers"]: + write_matcher(f, "DeviceMatcher", [ + device_parser["regex"], + device_parser.get("regex_flag"), + device_parser.get("device_replacement"), + device_parser.get("brand_replacement"), + device_parser.get("model_replacement"), + ]) + + fp.write(b" DeviceParser(\n") + write_params([ + device_parser["regex"], + device_parser.get("regex_flag"), + device_parser.get("device_replacement"), + device_parser.get("brand_replacement"), + device_parser.get("model_replacement"), + ]) + fp.write(b" ),\n") + f.write(b"])\n") fp.write(b"]\n") # fmt: on diff --git a/src/ua_parser/__init__.py b/src/ua_parser/__init__.py index f1c0a2a2..734d384d 100644 --- a/src/ua_parser/__init__.py +++ b/src/ua_parser/__init__.py @@ -1 +1,98 @@ -VERSION = (0, 16, 1) +"""The package provides top-level helpers which use a lazily initialised +default parser. These are convenience functions, for more control it +is perfectly acceptable to instantiate and call parsers directly. + +The default parser does use a cache keyed on the user-agent string, +but its exact behaviour is unspecified, if you require a consistent +behaviour or specific algorithm, set up your own parser (global or +not). + +For convenience, direct aliases are also provided for: + +- :mod:`core types <.types>` +- :mod:`caching utilities <.caching>` +- :mod:`ua_parser.basic.Parser` as :class:`BasicParser` + +This way importing anything but the top-level package should not be +necessary unless you want to *implement* a parser. +""" +VERSION = (1, 0, 0) + +from typing import Optional +from .core import ( + DefaultedParseResult, + Device, + DeviceMatcher, + Domain, + Matchers, + OS, + OSMatcher, + ParseResult, + Parser, + PartialParseResult, + UserAgent, + UserAgentMatcher, +) +from .basic import Parser as BasicParser +from .caching import CachingParser, Clearing, LRU +from .loaders import load_builtins, load_data, load_yaml + + +parser: Parser + + +def __getattr__(name): + global parser + if name == "parser": + parser = CachingParser( + BasicParser(load_builtins()), + LRU(200), + ) + return parser + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + +def parse(ua: str) -> ParseResult: + """Parses the :class:`.UserAgent`, :class:`.OS`, and :class:`.Device` + information using the :func:`global parser `. + + Equivalent to calling each of :func:`parse_user_agent`, + :func:`parse_os`, and :func:`parse_device` but *may* be more + efficient than calling them separately depending on the underlying + parser. + + Even in the best case, prefer the domain-specific helpers if + you're not going to use *all* of them. + """ + # import required to trigger __getattr__ and initialise the + # parser, a `global` access fails to and we get a NameError + from . import parser + + return parser.parse(ua) + + +def parse_user_agent(ua: str) -> Optional[UserAgent]: + """Parses the :class:`browser <.UserAgent>` information using the + :func:`global parser `. + """ + from . import parser + + return parser.parse_user_agent(ua) + + +def parse_os(ua: str) -> Optional[OS]: + """Parses the :class:`.OS` information using the :func:`global parser + `. + """ + from . import parser + + return parser.parse_os(ua) + + +def parse_device(ua: str) -> Optional[Device]: + """Parses the :class:`.Device` information using the :func:`global + parser `. + """ + from . import parser + + return parser.parse_device(ua) diff --git a/src/ua_parser/_matchers.pyi b/src/ua_parser/_matchers.pyi new file mode 100644 index 00000000..a27227fd --- /dev/null +++ b/src/ua_parser/_matchers.pyi @@ -0,0 +1,3 @@ +from .core import Matchers + +MATCHERS: Matchers diff --git a/src/ua_parser/_regexes.pyi b/src/ua_parser/_regexes.pyi new file mode 100644 index 00000000..9050f8b0 --- /dev/null +++ b/src/ua_parser/_regexes.pyi @@ -0,0 +1,6 @@ +from typing import List +from .user_agent_parser import UserAgentParser, OSParser, DeviceParser + +USER_AGENT_PARSERS: List[UserAgentParser] +OS_PARSERS: List[OSParser] +DEVICE_PARSERS: List[DeviceParser] diff --git a/src/ua_parser/basic.py b/src/ua_parser/basic.py new file mode 100644 index 00000000..f46f303f --- /dev/null +++ b/src/ua_parser/basic.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import io +import os +from itertools import starmap +from operator import methodcaller +from typing import List + +from .core import ( + Parser as AbstractParser, + PartialParseResult, + Domain, + UserAgent, + OS, + Device, + Matchers, + UserAgentMatcher, + OSMatcher, + DeviceMatcher, +) + + +class Parser(AbstractParser): + """A simple pure-python parser based around trying a numer of regular + expressions in sequence for each domain, and returning a result + when one matches. + """ + + user_agent_parsers: List[UserAgentMatcher] + os_parsers: List[OSMatcher] + device_parsers: List[DeviceMatcher] + + def __init__( + self, + matchers: Matchers, + ) -> None: + self.user_agent_parsers = matchers[0] + self.os_parsers = matchers[1] + self.device_parsers = matchers[2] + + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + parse = methodcaller("__call__", ua) + return PartialParseResult( + domains=domains, + string=ua, + user_agent=next( + filter(None, map(parse, self.user_agent_parsers)), + None, + ) + if Domain.USER_AGENT in domains + else None, + os=next( + filter(None, map(parse, self.os_parsers)), + None, + ) + if Domain.OS in domains + else None, + device=next( + filter(None, map(parse, self.device_parsers)), + None, + ) + if Domain.DEVICE in domains + else None, + ) diff --git a/src/ua_parser/caching.py b/src/ua_parser/caching.py new file mode 100644 index 00000000..4f34b540 --- /dev/null +++ b/src/ua_parser/caching.py @@ -0,0 +1,128 @@ +import abc +from collections import OrderedDict +import threading +from typing import Callable, ContextManager, Dict, Optional, MutableMapping + +from .core import Parser, Domain, PartialParseResult + + +__all__ = [ + "CachingParser", + "Cache", + "Clearing", + "LRU", +] + + +class Cache(abc.ABC): + """Cache abstract protocol. The :class:`CachingParser` will look + values up, merge what was returned (possibly nothing) with what it + got from its actual parser, and *re-set the result*. + + A :class:`Cache` is responsible for its own replacement policy. + """ + + @abc.abstractmethod + def __setitem__(self, key: str, value: PartialParseResult): + """Adds or replace ``value`` to the cache at key ``key``.""" + ... + + @abc.abstractmethod + def __getitem__(self, key: str) -> Optional[PartialParseResult]: + """Returns a partial result for ``key`` if there is any.""" + ... + + +class Clearing(Cache): + """A clearing cache, if the cache is full, just remove all the entries + and re-fill from scratch. + + This can also be used as a cheap permanent cache by setting the + ``maxsize`` to infinity (or at least some very large value), + however this is probably a bad idea as it *will* lead to an + ever-growing memory allocation, until every possible user agent + string has been seen. + """ + + def __init__(self, maxsize: int): + self.maxsize = maxsize + self.cache: Dict[str, PartialParseResult] = {} + + def __getitem__(self, key: str) -> Optional[PartialParseResult]: + return self.cache.get(key) + + def __setitem__(self, key: str, value: PartialParseResult): + if key not in self.cache and len(self.cache) >= self.maxsize: + self.cache.clear() + + self.cache[key] = value + + +class LRU(Cache): + """Cache following a least-recently used replacement policy: when + there is no more room in the cache, whichever entry was last seen + the least recently is removed. + + Note that the cache size is adjusted after inserting the new + entry, so the cache will temporarily contain ``maxsize + 1`` + items. + """ + + def __init__(self, maxsize: int): + self.maxsize = maxsize + self.cache: OrderedDict[str, PartialParseResult] = OrderedDict() + + def __getitem__(self, key: str) -> Optional[PartialParseResult]: + e = self.cache.get(key) + if e: + self.cache.move_to_end(key) + return e + + def __setitem__(self, key: str, value: PartialParseResult): + self.cache[key] = value + self.cache.move_to_end(key) + while len(self.cache) > self.maxsize: + self.cache.popitem(last=False) + + +Lock = Callable[[], ContextManager] + + +class CachingParser(Parser): + """A wrapping parser which takes an underlying concrete :class:`Cache` + for the actual caching and cache strategy. + + The :class:`CachingParser` only interacts with the :class:`Cache` + and delegates to the wrapped parser in case of lookup failure. + + :class:`CachingParser` will set entries back in the cache when + filling them up, it does not update results in place (and can't + really, they're immutable). + """ + + def __init__(self, parser: Parser, cache: Cache, lock: Lock = threading.Lock): + self.parser: Parser = parser + self.cache: Cache = cache + self.lock: ContextManager = lock() + + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + with self.lock: + entry = self.cache[ua] + if entry: + if domains in entry.domains: + return entry + + domains &= ~entry.domains + + r = self.parser(ua, domains) + if entry: + r = PartialParseResult( + string=ua, + domains=entry.domains | r.domains, + user_agent=entry.user_agent or r.user_agent, + os=entry.os or r.os, + device=entry.device or r.device, + ) + with self.lock: + self.cache[ua] = r + return r diff --git a/src/ua_parser/core.py b/src/ua_parser/core.py new file mode 100644 index 00000000..7b34dfd0 --- /dev/null +++ b/src/ua_parser/core.py @@ -0,0 +1,363 @@ +import abc +import re +from dataclasses import dataclass, fields +from enum import Flag, auto +from typing import Literal, Optional, Tuple, List + +__all__ = [ + "DefaultedParseResult", + "Device", + "DeviceMatcher", + "Domain", + "Matchers", + "OS", + "OSMatcher", + "ParseResult", + "Parser", + "PartialParseResult", + "UserAgent", + "UserAgentMatcher", +] + + +@dataclass(frozen=True) +class UserAgent: + """Browser ("user agent" aka the software responsible for the request) + information parsed from the user agent string. + """ + + family: str = "Other" + major: Optional[str] = None + minor: Optional[str] = None + patch: Optional[str] = None + patch_minor: Optional[str] = None + + +@dataclass(frozen=True) +class OS: + """OS information parsed from the user agent string.""" + + family: str = "Other" + major: Optional[str] = None + minor: Optional[str] = None + patch: Optional[str] = None + patch_minor: Optional[str] = None + + +@dataclass(frozen=True) +class Device: + """Device information parsed from the user agent string.""" + + family: str = "Other" + brand: Optional[str] = None + model: Optional[str] = None + + +class Domain(Flag): + """Hint for selecting which domains are requested when asking for a + :class:`ParseResult`. + """ + + #: browser (user agent) domain + USER_AGENT = auto() + #: os domain + OS = auto() + #: device domain + DEVICE = auto() + #: shortcut for all three domains + ALL = USER_AGENT | OS | DEVICE + + +@dataclass(frozen=True) +class DefaultedParseResult: + """Variant of :class:`.ParseResult` where attributes are set + to a default value if the parse fails. + + For all domains, the default value has ``family`` set to + ``"Other"`` and every other attribute set to ``None``. + """ + + user_agent: UserAgent + os: OS + device: Device + string: str + + +@dataclass(frozen=True) +class ParseResult: + """Complete parser result. + + For each attribute (and domain), either the parse was a success (a + match was found) and the corresponding data is set, or it was a + failure and the value is `None`. + """ + + user_agent: Optional[UserAgent] + os: Optional[OS] + device: Optional[Device] + string: str + + def with_defaults(self): + return DefaultedParseResult( + user_agent=self.user_agent or UserAgent(), + os=self.os or OS(), + device=self.device or Device(), + string=self.string, + ) + + +@dataclass(frozen=True) +class PartialParseResult: + """Potentially partial (incomplete) parser result. + + Domain fields (``user_agent``, ``os``, and ``device``) can be: + + - unset if not parsed yet + - set to a parsing failure + - set to a parsing success + + The `domains` flags specify which is which: if a `Domain` + flag is set, the corresponding attribute was looked up and is + either ``None`` for a parsing failure (no match was found) or a + value for a parsing success. + + If the flag is unset, the field has not been looked up yet. + """ + + domains: Domain + user_agent: Optional[UserAgent] + os: Optional[OS] + device: Optional[Device] + string: str + + def complete(self) -> ParseResult: + """Requires that the result be fully resolved (every attribute is set, + even if to a lookup failure). + + Replaces lookup failures by default values. + """ + if self.domains != Domain.ALL: + raise ValueError("Only a result with all attributes set can be completed") + + return ParseResult( + user_agent=self.user_agent, + os=self.os, + device=self.device, + string=self.string, + ) + + +class Parser(abc.ABC): + @abc.abstractmethod + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + """Parses the ``ua`` string, returning a parse result with *at least* + the requested :class:`domains ` resolved (whether to success or + failure). + + A parser may resolve more :class:`domains ` than + requested, but it *must not* resolve less. + """ + ... + + def parse(self, ua: str) -> ParseResult: + """Convenience method for parsing all domains, and falling back to + default values for all failures. + """ + return self(ua, Domain.ALL).complete() + + def parse_user_agent(self, ua: str) -> Optional[UserAgent]: + """Convenience method for parsing the :class:`UserAgent` domain, + falling back to the default value in case of failure. + """ + return self(ua, Domain.USER_AGENT).user_agent + + def parse_os(self, ua: str) -> Optional[OS]: + """Convenience method for parsing the :class:`OS` domain, falling back + to the default value in case of failure. + """ + return self(ua, Domain.OS).os + + def parse_device(self, ua: str) -> Optional[Device]: + """Convenience method for parsing the :class:`Device` domain, falling + back to the default value in case of failure. + """ + return self(ua, Domain.DEVICE).device + + +def _get(m: re.Match, idx: int) -> Optional[str]: + return (m[idx] or None) if 0 < idx <= m.re.groups else None + + +def _replacer(repl: str, m: re.Match) -> Optional[str]: + """The replacement rules are frustratingly subtle and innimical to + standard fallbacks: + + - if there is a non-null replacement pattern, then it must be used with + match groups as template parameters (at indices 1+) + - the result is stripped + - if it is an empty string, then it's replaced by a null + - otherwise fallback to a (possibly optional) match group + - or null (device brand has no fallback) + + Replacement rules only apply to OS and Device matchers, the UA + matcher has bespoke replacement semantics for the family (just + $1), and no replacement for the other fields, either there is a + static replacement or it falls back to the corresponding + (optional) match group. + + """ + if not repl: + return None + + return re.sub(r"\$(\d)", lambda n: _get(m, int(n[1])) or "", repl).strip() or None + + +class UserAgentMatcher: + regex: re.Pattern + family: str + major: Optional[str] + minor: Optional[str] + patch: Optional[str] + patch_minor: Optional[str] + + def __init__( + self, + regex: str, + family: Optional[str] = None, + major: Optional[str] = None, + minor: Optional[str] = None, + patch: Optional[str] = None, + patch_minor: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex) + self.family = family or "$1" + self.major = major + self.minor = minor + self.patch = patch + self.patch_minor = patch_minor + + def __call__(self, ua: str) -> Optional[UserAgent]: + if m := self.regex.search(ua): + return UserAgent( + family=self.family.replace("$1", m[1]) + if "$1" in self.family + else self.family, + major=self.major or _get(m, 2), + minor=self.minor or _get(m, 3), + patch=self.patch or _get(m, 4), + patch_minor=self.patch_minor or _get(m, 5), + ) + return None + + def __repr__(self): + fields = [ + ("family", self.family if self.family != "$1" else None), + ("major", self.major), + ("minor", self.minor), + ("patch", self.patch), + ("patch_minor", self.patch_minor), + ] + args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"UserAgentMatcher({self.regex.pattern!r}{args})" + + +class OSMatcher: + regex: re.Pattern + family: str + major: str + minor: str + patch: str + patch_minor: str + + def __init__( + self, + regex: str, + family: Optional[str] = None, + major: Optional[str] = None, + minor: Optional[str] = None, + patch: Optional[str] = None, + patch_minor: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex) + self.family = family or "$1" + self.major = major or "$2" + self.minor = minor or "$3" + self.patch = patch or "$4" + self.patch_minor = patch_minor or "$5" + + def __call__(self, ua: str) -> Optional[OS]: + if m := self.regex.search(ua): + family = _replacer(self.family, m) + if family is None: + raise ValueError(f"Unable to find OS family in {ua}") + return OS( + family=family, + major=_replacer(self.major, m), + minor=_replacer(self.minor, m), + patch=_replacer(self.patch, m), + patch_minor=_replacer(self.patch_minor, m), + ) + return None + + def __repr__(self): + fields = [ + ("family", self.family if self.family != "$1" else None), + ("major", self.major if self.major != "$2" else None), + ("minor", self.minor if self.minor != "$3" else None), + ("patch", self.patch if self.patch != "$4" else None), + ("patch_minor", self.patch_minor if self.patch_minor != "$5" else None), + ] + args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"OSMatcher({self.regex.pattern!r}{args})" + + +class DeviceMatcher: + regex: re.Pattern + family: str + brand: str + model: str + + def __init__( + self, + regex: str, + regex_flag: Optional[Literal["i"]] = None, + family: Optional[str] = None, + brand: Optional[str] = None, + model: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex, flags=re.IGNORECASE if regex_flag == "i" else 0) + self.family = family or "$1" + self.brand = brand or "" + self.model = model or "$1" + + def __call__(self, ua: str) -> Optional[Device]: + if m := self.regex.search(ua): + family = _replacer(self.family, m) + if family is None: + raise ValueError(f"Unable to find device family in {ua}") + return Device( + family=family, + brand=_replacer(self.brand, m), + model=_replacer(self.model, m), + ) + return None + + def __repr__(self): + fields = [ + ("family", self.family if self.family != "$1" else None), + ("brand", self.brand or None), + ("model", self.model if self.model != "$1" else None), + ] + iflag = ', "i"' if self.regex.flags & re.IGNORECASE else "" + args = iflag + "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"DeviceMatcher({self.regex.pattern!r}{args})" + + +Matchers = Tuple[ + List[UserAgentMatcher], + List[OSMatcher], + List[DeviceMatcher], +] diff --git a/src/ua_parser/loaders.py b/src/ua_parser/loaders.py new file mode 100644 index 00000000..23510eeb --- /dev/null +++ b/src/ua_parser/loaders.py @@ -0,0 +1,139 @@ +__all__ = [ + "load_builtins", + "load_data", + "load_yaml", + "MatchersData", + "UserAgentDict", + "OSDict", + "DeviceDict", +] + +import io +import json +import os +from typing import Callable, List, Optional, Tuple, Type, Union, TypedDict, Literal + +from .core import Matchers, UserAgentMatcher, OSMatcher, DeviceMatcher + +PathOrFile = Union[str, os.PathLike, io.IOBase] +load: Optional[Callable] +SafeLoader: Optional[Type] +try: + from yaml import load, CSafeLoader as SafeLoader +except ImportError: + try: + from yaml import load, SafeLoader + except ImportError: + load = SafeLoader = None + + +def load_builtins() -> Matchers: + from ._matchers import MATCHERS + + return MATCHERS + + +# superclass needed to mix required & optional typed dict entries +# before 3.11 (and Required/NotRequired) +class _RegexDict(TypedDict): + regex: str + + +class UserAgentDict(_RegexDict, total=False): + family_replacement: str + v1_replacement: str + v2_replacement: str + v3_replacement: str + v4_replacement: str + + +class OSDict(_RegexDict, total=False): + os_replacement: str + os_v1_replacement: str + os_v2_replacement: str + os_v3_replacement: str + os_v4_replacement: str + + +class DeviceDict(_RegexDict, total=False): + regex_flag: Literal["i"] + device_replacement: str + brand_replacement: str + model_replacement: str + + +MatchersData = Tuple[List[UserAgentDict], List[OSDict], List[DeviceDict]] + + +def load_data(d: MatchersData) -> Matchers: + return ( + [ + UserAgentMatcher( + p["regex"], + p.get("family_replacement"), + p.get("v1_replacement"), + p.get("v2_replacement"), + p.get("v3_replacement"), + p.get("v4_replacement"), + ) + for p in d[0] + ], + [ + OSMatcher( + p["regex"], + p.get("os_replacement"), + p.get("os_v1_replacement"), + p.get("os_v2_replacement"), + p.get("os_v3_replacement"), + p.get("os_v4_replacement"), + ) + for p in d[1] + ], + [ + DeviceMatcher( + p["regex"], + p.get("regex_flag"), + p.get("device_replacement"), + p.get("brand_replacement"), + p.get("model_replacement"), + ) + for p in d[2] + ], + ) + + +def load_json(f: PathOrFile) -> Matchers: + if isinstance(f, (str, os.PathLike)): + with open(f) as fp: + regexes = json.load(fp) + else: + regexes = json.load(f) + + return load_data( + ( + regexes["user_agent_parsers"], + regexes["os_parsers"], + regexes["device_parsers"], + ) + ) + + +load_yaml: Optional[Callable[[PathOrFile], Matchers]] +if load is None: + load_yaml = None +else: + + def load_yaml(path: PathOrFile) -> Matchers: + if isinstance(path, (str, os.PathLike)): + with open(path) as fp: + regexes = load(fp, Loader=SafeLoader) # type: ignore + else: + regexes = load(path, Loader=SafeLoader) # type: ignore + + return load_data( + ( + regexes["user_agent_parsers"], + regexes["os_parsers"], + regexes["device_parsers"], + ) + ) diff --git a/tests/test_caches.py b/tests/test_caches.py new file mode 100644 index 00000000..3fb078c9 --- /dev/null +++ b/tests/test_caches.py @@ -0,0 +1,98 @@ +from collections import OrderedDict + +from ua_parser import ( + BasicParser, + PartialParseResult, + Domain, + UserAgent, + OS, + Device, + CachingParser, + Clearing, + LRU, + UserAgentMatcher, + OSMatcher, + DeviceMatcher, +) + + +def test_clearing(): + """Tests that the cache correctly gets cleared to make room for new + entries. + """ + cache = Clearing(2) + p = CachingParser(BasicParser(([], [], [])), cache) + + p.parse("a") + p.parse("b") + + assert cache.cache == { + "a": PartialParseResult(Domain.ALL, None, None, None, "a"), + "b": PartialParseResult(Domain.ALL, None, None, None, "b"), + } + + p.parse("c") + assert cache.cache == { + "c": PartialParseResult(Domain.ALL, None, None, None, "c"), + } + + +def test_lru(): + """Tests that the cache entries do get moved when accessed, and are + popped LRU-first. + """ + cache = LRU(2) + p = CachingParser(BasicParser(([], [], [])), cache) + + p.parse("a") + p.parse("b") + + assert cache.cache == OrderedDict( + [ + ("a", PartialParseResult(Domain.ALL, None, None, None, "a")), + ("b", PartialParseResult(Domain.ALL, None, None, None, "b")), + ] + ) + + p.parse("a") + p.parse("c") + assert cache.cache == OrderedDict( + [ + ("a", PartialParseResult(Domain.ALL, None, None, None, "a")), + ("c", PartialParseResult(Domain.ALL, None, None, None, "c")), + ] + ) + + +def test_backfill(): + """Tests that caches handle partial parsing correctly, by updating the + existing entry when new parts get parsed. + """ + cache = Clearing(2) + p = CachingParser( + BasicParser( + ( + [UserAgentMatcher("(a)")], + [OSMatcher("(a)")], + [DeviceMatcher("(a)")], + ) + ), + cache, + ) + + p.parse_user_agent("a") + assert cache.cache == { + "a": PartialParseResult(Domain.USER_AGENT, UserAgent("a"), None, None, "a"), + } + p("a", Domain.OS) + assert cache.cache == { + "a": PartialParseResult( + Domain.USER_AGENT | Domain.OS, UserAgent("a"), OS("a"), None, "a" + ), + } + p.parse("a") + assert cache.cache == { + "a": PartialParseResult( + Domain.ALL, UserAgent("a"), OS("a"), Device("a", None, "a"), "a" + ), + } diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 00000000..9a8e644a --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,144 @@ +"""Tests UAP-Python using the UAP-core test suite +""" +import dataclasses +import logging +import pathlib +import platform +from operator import attrgetter + +import pytest # type: ignore + +if platform.python_implementation() == "PyPy": + from yaml import load, SafeLoader +else: + try: + from yaml import load, CSafeLoader as SafeLoader # type: ignore + except ImportError: + logging.getLogger(__name__).warning( + "PyYaml C extension not available to run tests, this will result " + "in dramatic tests slowdown." + ) + from yaml import load, SafeLoader + +from ua_parser import ( + BasicParser, + UserAgent, + OS, + Device, + ParseResult, + UserAgentMatcher, + load_builtins, + caching, +) + +CORE_DIR = (pathlib.Path(__name__).parent.parent / "uap-core").resolve() + + +PARSERS = [ + pytest.param(BasicParser(load_builtins()), id="basic"), + pytest.param( + caching.CachingParser( + BasicParser(load_builtins()), + caching.Clearing(10), + ), + id="clearing", + ), + pytest.param( + caching.CachingParser( + BasicParser(load_builtins()), + caching.LRU(10), + lock=contextlib.nullcontext, + ), + id="lru", + ), +] + + +UA_FIELDS = {f.name for f in dataclasses.fields(UserAgent)} + + +@pytest.mark.parametrize("parser", PARSERS) +@pytest.mark.parametrize( + "test_file", + [ + CORE_DIR / "tests" / "test_ua.yaml", + CORE_DIR / "test_resources" / "firefox_user_agent_strings.yaml", + CORE_DIR / "test_resources" / "pgts_browser_list.yaml", + ], + ids=attrgetter("name"), +) +def test_ua(parser, test_file): + with test_file.open("rb") as f: + contents = load(f, Loader=SafeLoader) + + for test_case in contents["test_cases"]: + res = {k: v for k, v in test_case.items() if k in UA_FIELDS} + # there seems to be broken test cases which have a patch_minor + # of null where it's not, as well as the reverse, so we can't + # test patch_minor (ua-parser/uap-core#562) + res.pop("patch_minor", None) + r = parser.parse_user_agent(test_case["user_agent_string"]) or UserAgent() + assert dataclasses.asdict(r).items() >= res.items() + + +OS_FIELDS = {f.name for f in dataclasses.fields(OS)} + + +@pytest.mark.parametrize("parser", PARSERS) +@pytest.mark.parametrize( + "test_file", + [ + CORE_DIR / "tests" / "test_os.yaml", + CORE_DIR / "test_resources" / "additional_os_tests.yaml", + ], + ids=attrgetter("name"), +) +def test_os(parser, test_file): + with test_file.open("rb") as f: + contents = load(f, Loader=SafeLoader) + + for test_case in contents["test_cases"]: + res = {k: v for k, v in test_case.items() if k in OS_FIELDS} + r = parser.parse_os(test_case["user_agent_string"]) or OS() + assert dataclasses.asdict(r) == res + + +DEVICE_FIELDS = {f.name for f in dataclasses.fields(Device)} + + +@pytest.mark.parametrize("parser", PARSERS) +@pytest.mark.parametrize( + "test_file", + [ + CORE_DIR / "tests" / "test_device.yaml", + ], + ids=attrgetter("name"), +) +def test_devices(parser, test_file): + with test_file.open("rb") as f: + contents = load(f, Loader=SafeLoader) + + for test_case in contents["test_cases"]: + res = {k: v for k, v in test_case.items() if k in DEVICE_FIELDS} + r = parser.parse_device(test_case["user_agent_string"]) or Device() + assert dataclasses.asdict(r) == res + + +def test_results(): + p = BasicParser(([UserAgentMatcher("(x)")], [], [])) + + assert p.parse_user_agent("x") == UserAgent("x") + assert p.parse_user_agent("y") is None + + assert p.parse("x") == ParseResult( + user_agent=UserAgent("x"), + os=None, + device=None, + string="x", + ) + assert p.parse("y") == ParseResult( + user_agent=None, + os=None, + device=None, + string="y", + ) diff --git a/tests/test_legacy.py b/tests/test_legacy.py index 03feeda3..7ada17c5 100644 --- a/tests/test_legacy.py +++ b/tests/test_legacy.py @@ -1,17 +1,15 @@ import logging import os import platform -import sys -import warnings -import pytest +import pytest # type: ignore import yaml if platform.python_implementation() == "PyPy": from yaml import SafeLoader else: try: - from yaml import CSafeLoader as SafeLoader + from yaml import CSafeLoader as SafeLoader # type: ignore except ImportError: logging.getLogger(__name__).warning( "PyYaml C extension not available to run tests, this will result " diff --git a/tests/test_parsers_basics.py b/tests/test_parsers_basics.py new file mode 100644 index 00000000..af38c7e7 --- /dev/null +++ b/tests/test_parsers_basics.py @@ -0,0 +1,78 @@ +import io +from ua_parser import ( + BasicParser, + PartialParseResult, + Domain, + UserAgent, + load_yaml, + UserAgentMatcher, +) + + +def test_trivial_matching(): + p = BasicParser(([UserAgentMatcher("(a)")], [], [])) + + assert p("x", Domain.ALL) == PartialParseResult( + string="x", + domains=Domain.ALL, + user_agent=None, + os=None, + device=None, + ) + + assert p("a", Domain.ALL) == PartialParseResult( + string="a", + domains=Domain.ALL, + user_agent=UserAgent("a"), + os=None, + device=None, + ) + + +def test_partial(): + p = BasicParser(([UserAgentMatcher("(a)")], [], [])) + + assert p("x", Domain.USER_AGENT) == PartialParseResult( + string="x", + domains=Domain.USER_AGENT, + user_agent=None, + os=None, + device=None, + ) + + assert p("a", Domain.USER_AGENT) == PartialParseResult( + string="a", + domains=Domain.USER_AGENT, + user_agent=UserAgent("a"), + os=None, + device=None, + ) + + +def test_init_yaml(): + assert load_yaml + f = io.BytesIO( + b"""\ +user_agent_parsers: +- regex: (a) +os_parsers: [] +device_parsers: [] +""" + ) + p = BasicParser(load_yaml(f)) + + assert p("x", Domain.USER_AGENT) == PartialParseResult( + string="x", + domains=Domain.USER_AGENT, + user_agent=None, + os=None, + device=None, + ) + + assert p("a", Domain.USER_AGENT) == PartialParseResult( + string="a", + domains=Domain.USER_AGENT, + user_agent=UserAgent("a"), + os=None, + device=None, + ) diff --git a/tox.ini b/tox.ini index 57b17f90..36ac52da 100644 --- a/tox.ini +++ b/tox.ini @@ -2,12 +2,12 @@ min_version = 4.0 env_list = py3{8,9,10,11,12} pypy3.{8,9,10} - flake8, black + flake8, black, typecheck labels = test = py3{8,9,10,11,12},pypy3.{8,9,10} cpy = py3{8,9,10,11,12} pypy = pypy3.{8,9,10} - check = flake8, black + check = flake8, black, typecheck [testenv] # wheel install @@ -30,7 +30,14 @@ commands = flake8 {posargs} [testenv:black] package = skip deps = black -commands = black --check --diff . +commands = black --check --diff {posargs:.} + +[testenv:typecheck] +package = skip +deps = + mypy + types-PyYaml +commands = mypy --check-untyped-defs --no-implicit-optional {posargs:src tests} [flake8] max_line_length = 88