From 63d8e31aca985fef784b7d4519c791cc2feedd06 Mon Sep 17 00:00:00 2001 From: "John \"Preston\" Mille" Date: Tue, 2 Apr 2024 08:06:24 +0100 Subject: [PATCH] Using async HTTPx client --- .isort.cfg | 2 +- kafka_schema_registry_admin/__init__.py | 33 +- .../async_schema_registry.py | 503 ++++++++++++++++++ .../client_wrapper/__init__.py | 2 +- .../client_wrapper/async_client.py | 102 ++++ .../client_wrapper/errors.py | 2 +- .../kafka_schema_registry_admin.py | 27 +- poetry.lock | 109 +++- pyproject.toml | 2 + tests/conftest.py | 42 ++ tests/test_async_registry_config.py | 42 ++ tests/test_async_subjects_schemas.py | 113 ++++ tests/test_registry_config.py | 3 +- tests/test_subjects_schemas.py | 29 +- tests/test_z_subjects_recovery.py | 2 +- 15 files changed, 952 insertions(+), 61 deletions(-) create mode 100644 kafka_schema_registry_admin/async_schema_registry.py create mode 100644 kafka_schema_registry_admin/client_wrapper/async_client.py create mode 100644 tests/test_async_registry_config.py create mode 100644 tests/test_async_subjects_schemas.py diff --git a/.isort.cfg b/.isort.cfg index b76bbbf..4ed7d00 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -1,2 +1,2 @@ [settings] -known_third_party = pytest,requests,testcontainers +known_third_party = httpx,pytest,requests,testcontainers diff --git a/kafka_schema_registry_admin/__init__.py b/kafka_schema_registry_admin/__init__.py index e51adea..0f741c3 100644 --- a/kafka_schema_registry_admin/__init__.py +++ b/kafka_schema_registry_admin/__init__.py @@ -1,12 +1,39 @@ # SPDX-License-Identifier: Apache License 2.0 -# Copyright 2021 John Mille +# Copyright 2024 John Mille """Top-level package for Kafka schema registry admin.""" +from __future__ import annotations + __author__ = """JohnPreston""" __email__ = "john@ews-network.net" __version__ = "0.4.1" -from .kafka_schema_registry_admin import CompatibilityMode, RegistryMode, SchemaRegistry +from enum import Enum + + +class RegistryMode(Enum): + IMPORT = "IMPORT" + READONLY = "READONLY" + READWRITE = "READWRITE" + + +class CompatibilityMode(Enum): + BACKWARD = "BACKWARD" + BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE" + FORWARD = "FORWARD" + FORWARD_TRANSITIVE = "FORWARD_TRANSITIVE" + FULL = "FULL" + FULL_TRANSITIVE = "FULL_TRANSITIVE" + NONE = "NONE" + + +class Type(Enum): + AVRO = "AVRO" + JSON = "JSON" + PROTOBUFF = "PROTOBUF" + + +from .kafka_schema_registry_admin import SchemaRegistry -__all__ = ["SchemaRegistry", "RegistryMode", "CompatibilityMode"] +__all__ = ["SchemaRegistry"] diff --git a/kafka_schema_registry_admin/async_schema_registry.py b/kafka_schema_registry_admin/async_schema_registry.py new file mode 100644 index 0000000..6cd0df2 --- /dev/null +++ b/kafka_schema_registry_admin/async_schema_registry.py @@ -0,0 +1,503 @@ +# SPDX-License-Identifier: Apache License 2.0 +# Copyright 2024 John Mille + +""" +Main module for schema_registry_admin +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import httpx + +from . import CompatibilityMode, RegistryMode, Type + +if TYPE_CHECKING: + from httpx import Response + +import json +from logging import getLogger +from urllib.parse import urlencode + +from .client_wrapper.async_client import Client +from .client_wrapper.errors import NotFoundException + +LOG = getLogger() +LOG.setLevel("WARN") + + +class SchemaRegistry: + + def __init__( + self, base_url: str, async_client: httpx.AsyncClient = None, *args, **kwargs + ): + username = kwargs.get("basic_auth.username", None) + password = kwargs.get("basic_auth.password", None) + basic_auth: dict = {} + if username and password: + basic_auth: dict = { + "basic_auth.username": username, + "basic_auth.password": password, + } + self.client: Client = Client(str(base_url), basic_auth) + self.async_client = async_client if async_client else httpx.AsyncClient() + + def get_async_client( + self, async_client: httpx.AsyncClient = None + ) -> httpx.AsyncClient | None: + if self.async_client: + return self.async_client + if async_client: + return async_client + return None + + async def get_all_subjects( + self, + subject_prefix: str = None, + deleted: bool = False, + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + Method to get the list of subjects in the schema registry + https://docs.confluent.io/platform/current/schema-registry/develop/api.html#get--subjects + + :raises: requests.exceptions.HTTPError + """ + url_path: str = "/subjects" + if subject_prefix and deleted: + url_path += "?" + urlencode( + {"subjectPrefix": subject_prefix, "deleted": "true"} + ) + elif subject_prefix: + url_path += f"?subjectPrefix={subject_prefix}" + elif deleted: + url_path += f"?deleted=true" + print(url_path) + return await self.client.get( + url_path, async_client=self.get_async_client(async_client) + ) + + async def get_subject_versions( + self, subject_name: str, async_client: httpx.AsyncClient = None + ) -> Response: + """ + Method to get the list of subjects in the schema registry + https://docs.confluent.io/platform/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions + """ + return await self.client.get( + f"/subjects/{subject_name}/versions", + async_client=self.get_async_client(async_client), + ) + + async def get_subject_version_id( + self, + subject_name: str, + version_id: str | int = "latest", + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + Method to get the list of subjects in the schema registry + `API doc `__ + """ + url_path: str = f"/subjects/{subject_name}/versions/{version_id}" + LOG.debug(url_path) + return await self.client.get( + url_path, async_client=self.get_async_client(async_client) + ) + + async def get_subject_version_id_schema( + self, + subject_name: str, + version_id: str | int = "latest", + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + Method to get the list of subjects in the schema registry + `API Doc `__ + """ + url_path: str = f"/subjects/{subject_name}/versions/{version_id}/schema" + LOG.debug(url_path) + return await self.client.get( + url_path, async_client=self.get_async_client(async_client) + ) + + async def get_subject_versions_referencedby( + self, subject_name, version_id, async_client: httpx.AsyncClient = None + ) -> Response: + """ + `API Doc `__ + """ + req = self.client.get( + f"/subjects/{subject_name}/versions/{version_id}/referencedby", + async_client=self.get_async_client(async_client), + ) + return await req + + async def post_subject_schema( + self, + subject_name, + definition, + schema_type=None, + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + Checks if the schema definition has already been registered with the subject. + Succeeds only if both the schema and the subject are registered. Returns 404 otherwise. + 40401 - Subject not found + 40403 - Schema not found + `API Doc `__ + """ + if isinstance(definition, dict): + definition = str(json.dumps(definition)) + if schema_type is None: + schema_type = Type["AVRO"].value + else: + schema_type = Type[schema_type].value + + payload = {"schema": definition, "schemaType": schema_type} + url = f"/subjects/{subject_name}" + req = self.client.post( + url, async_client=self.get_async_client(async_client), json=payload + ) + + return await req + + async def post_subject_schema_version( + self, + subject_name, + definition, + normalize: bool = False, + schema_type=None, + version_id: int = None, + schema_id: int = None, + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + `API Doc `__ + Creates a new schema version for the given subject (and registers the subject if did not exist). + Returns the schema ID. + """ + exists_r = await self.post_subject_schema( + subject_name, + definition, + schema_type, + async_client=self.get_async_client(async_client), + ) + if exists_r.status_code == 404: + if isinstance(definition, dict): + definition = str(json.dumps(definition)) + if schema_type is None: + schema_type = Type["AVRO"].value + else: + schema_type = Type[schema_type].value + + payload = {"schema": definition, "schemaType": schema_type} + url = f"/subjects/{subject_name}/versions" + if normalize: + url = f"{url}?normalize=true" + """When trying to do recovery, SR must be in import mode either globally or for the subject itself.""" + if version_id and schema_id: + payload["version"] = version_id + payload["id"] = schema_id + + req = self.client.post( + url, async_client=self.get_async_client(async_client), json=payload + ) + return await req + + async def delete_subject( + self, + subject_name, + version_id=None, + permanent=False, + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + Method to delete a subject entirely or a specific version + https://docs.confluent.io/platform/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject) + https://docs.confluent.io/platform/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions + """ + url = f"/subjects/{subject_name}" + if version_id: + url = f"{url}/versions/{version_id}" + try: + return await self.client.delete( + url, async_client=self.get_async_client(async_client) + ) + except NotFoundException: + subjects = ( + await self.get_all_subjects(subject_prefix=subject_name, deleted=True) + ).json() + if subject_name in subjects and permanent: + return await self.client.delete( + f"{url}?permanent=true", + async_client=self.get_async_client(async_client), + ) + if permanent: + permanent_url = f"{url}?permanent=true" + return await self.client.delete( + permanent_url, async_client=self.get_async_client(async_client) + ) + + async def get_schema_types( + self, async_client: httpx.AsyncClient = None + ) -> Response: + """ + Method to get the list of schema types and return the request object + """ + url = f"/schemas/types" + req = self.client.get(url, async_client=self.get_async_client(async_client)) + return await req + + async def get_schema_from_id( + self, schema_id, async_client: httpx.AsyncClient = None + ) -> Response: + url = f"/schemas/ids/{schema_id}" + LOG.debug(url) + req = self.client.get(url, async_client=self.get_async_client(async_client)) + return await req + + async def get_schema_versions_from_id(self, schema_id): + """ + Retrieve the versions for a given schema by its ID + """ + url = f"/schemas/ids/{schema_id}/versions" + req = self.client.get(url) + return await req + + async def post_compatibility_subject_versions( + self, + subject_name, + definition, + verbose: bool = False, + schema_type: str | Type = None, + references: list = None, + async_client: httpx.AsyncClient = None, + ) -> Response: + url = f"/compatibility/subjects/{subject_name}/versions" + payload = self.set_subject_validity_payload( + url, definition, schema_type, verbose=verbose, references=references + ) + return await self.client.post( + url, async_client=self.get_async_client(async_client), json=payload + ) + + async def post_compatibility_subject_version_id( + self, + subject_name, + version_id, + definition, + verbose: bool = False, + schema_type: str | Type = None, + references: list = None, + async_client: httpx.AsyncClient = None, + ) -> Response: + url = f"/compatibility/subjects/{subject_name}/versions/{version_id}" + payload = self.set_subject_validity_payload( + url, definition, schema_type, verbose=verbose, references=references + ) + return await self.client.post( + url, async_client=self.get_async_client(async_client), json=payload + ) + + @staticmethod + def set_subject_validity_payload( + url: str, + definition, + schema_type, + verbose: bool = False, + references: list = None, + ) -> dict: + if verbose: + url = f"{url}?verbose=true" + LOG.debug(url) + if isinstance(definition, dict): + definition = str(json.dumps(definition)) + if schema_type is None: + schema_type = Type["AVRO"].value + else: + schema_type = Type[schema_type].value + + payload = {"schema": definition, "schemaType": schema_type} + if references and isinstance(references, list): + payload["references"] = references + return payload + + async def get_compatibility_subject_config( + self, subject_name, async_client: httpx.AsyncClient = None + ) -> Response: + url = f"/config/{subject_name}/" + req = self.client.get(url, async_client=self.get_async_client(async_client)) + return await req + + async def put_compatibility_subject_config( + self, subject_name, compatibility, async_client: httpx.AsyncClient = None + ) -> Response: + url = f"/config/{subject_name}/" + payload = {"compatibility": compatibility} + req = self.client.put( + url, async_client=self.get_async_client(async_client), json=payload + ) + return await req + + async def get_mode( + self, as_str: bool = False, async_client: httpx.AsyncClient = None + ) -> Response | str: + """ + `API Doc `__ + """ + url_path: str = "/mode" + req = self.client.get( + url_path, async_client=self.get_async_client(async_client) + ) + if as_str: + return RegistryMode[(await req).json().get("mode")].value + return await req + + async def put_mode( + self, + mode: str | RegistryMode, + force: bool = False, + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + `API Doc `__ + """ + url_path: str = "/mode" + if force: + url_path += "?force=true" + req = self.client.put( + url_path, + async_client=self.get_async_client(async_client), + json={"mode": mode}, + ) + return await req + + async def get_subject_mode( + self, subject_name: str, async_client: httpx.AsyncClient = None + ) -> Response: + """ + `API Doc `__ + """ + url_path: str = f"/mode/{subject_name}" + req = self.client.get( + url_path, async_client=self.get_async_client(async_client) + ) + return await req + + async def put_subject_mode( + self, + subject_name: str, + mode: str | RegistryMode, + force: bool = False, + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + `API Doc `__ + """ + url_path: str = f"/mode/{subject_name}" + if force: + url_path += "?force=true" + req = self.client.put( + url_path, + async_client=self.get_async_client(async_client), + json={"mode": mode}, + ) + return await req + + async def get_config(self, async_client: httpx.AsyncClient = None): + """ + `API Doc `__ + """ + url_path: str = "/config" + req = self.client.get( + url_path, async_client=self.get_async_client(async_client) + ) + return await req + + async def put_config( + self, + alias: str = None, + normalize: bool = False, + compatibility: str | CompatibilityMode = "NONE", + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + `API Doc `__ + """ + url_path: str = "/config" + payload: dict = {} + if compatibility: + payload["compatibility"] = compatibility + if alias: + payload["alias"] = alias + if normalize: + payload["normalize"] = normalize + req = self.client.put( + url_path, async_client=self.get_async_client(async_client), json=payload + ) + return await req + + async def get_subject_config( + self, + subject: str, + default: bool = False, + alias: str = None, + normalize: bool = False, + compatibility: str | CompatibilityMode = "NONE", + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + `API Doc `__ + """ + url_path: str = f"/config/{subject}" + if default: + url_path += "?defaultToGlobal=true" + payload: dict = {} + if compatibility: + payload["compatibility"] = compatibility + if alias: + payload["alias"] = alias + if normalize: + payload["normalize"] = normalize + req = self.client.get( + url_path, async_client=self.get_async_client(async_client), json=payload + ) + return await req + + async def put_subject_config( + self, + subject: str, + alias: str = None, + normalize: bool = False, + compatibility: str | CompatibilityMode = "NONE", + async_client: httpx.AsyncClient = None, + ) -> Response: + """ + `API Doc `__ + """ + url_path: str = f"/config/{subject}" + payload: dict = {} + if compatibility: + payload["compatibility"] = compatibility + if alias: + payload["alias"] = alias + if normalize: + payload["normalize"] = normalize + req = self.client.put( + url_path, async_client=self.get_async_client(async_client), json=payload + ) + return await req + + async def delete_subject_config( + self, subject: str, async_client: httpx.AsyncClient = None + ): + """ + `API Doc `__ + """ + url_path: str = f"/config/{subject}" + req = self.client.delete( + url_path, async_client=self.get_async_client(async_client) + ) + return req diff --git a/kafka_schema_registry_admin/client_wrapper/__init__.py b/kafka_schema_registry_admin/client_wrapper/__init__.py index b6f5328..bf31f0f 100644 --- a/kafka_schema_registry_admin/client_wrapper/__init__.py +++ b/kafka_schema_registry_admin/client_wrapper/__init__.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: Apache License 2.0 -# Copyright 2021 John Mille +# Copyright 2024 John Mille from __future__ import annotations diff --git a/kafka_schema_registry_admin/client_wrapper/async_client.py b/kafka_schema_registry_admin/client_wrapper/async_client.py new file mode 100644 index 0000000..c18f6d0 --- /dev/null +++ b/kafka_schema_registry_admin/client_wrapper/async_client.py @@ -0,0 +1,102 @@ +# SPDX-License-Identifier: Apache License 2.0 +# Copyright 2024 John Mille + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from httpx import Response as Response + +from urllib.parse import urlparse + +import httpx + + +class Client: + """API Client wrapper around the httpx library""" + + def __init__(self, base_url: str, basic_auth: dict = None): + self._base_url = base_url + + self._default_headers: dict = { + "Accept": "application/json", + } + self._post_headers: dict = { + "Accept": "application/json", + "Content-Type": "application/vnd.schemaregistry.v1+json", + } + self.auth = None + if basic_auth: + self.auth = ( + basic_auth["basic_auth.username"], + basic_auth["basic_auth.password"], + ) + + async def get( + self, api_path: str, async_client: httpx.AsyncClient = None, *args, **kwargs + ) -> Response: + """Get the data from the api_path""" + headers = kwargs.get("headers", {}) + if not headers: + kwargs["headers"] = headers + headers.update(self._default_headers) + url: str = urlparse(self._base_url + api_path).geturl() + if not async_client: + async with httpx.AsyncClient() as async_client: + response = await async_client.get(url, auth=self.auth, *args, **kwargs) + else: + response = await async_client.get(url, auth=self.auth, *args, **kwargs) + return response + + async def post( + self, api_path: str, async_client: httpx.AsyncClient = None, *args, **kwargs + ) -> Response: + """POST the data from the api_path""" + headers = kwargs.get("headers", {}) + if not headers: + kwargs["headers"] = headers + headers.update(self._default_headers) + headers.update(self._post_headers) + url: str = urlparse(self._base_url + api_path).geturl() + if not async_client: + async with httpx.AsyncClient() as async_client: + response = await async_client.post(url, auth=self.auth, *args, **kwargs) + else: + response = await async_client.post(url, auth=self.auth, *args, **kwargs) + return response + + async def put( + self, api_path: str, async_client: httpx.AsyncClient = None, *args, **kwargs + ) -> Response: + """PUT the data from the api_path""" + headers = kwargs.get("headers", {}) + if not headers: + kwargs["headers"] = headers + headers.update(self._default_headers) + url: str = urlparse(self._base_url + api_path).geturl() + if not async_client: + async with httpx.AsyncClient() as async_client: + response = await async_client.put(url, auth=self.auth, *args, **kwargs) + else: + response = await async_client.put(url, auth=self.auth, *args, **kwargs) + return response + + async def delete( + self, api_path: str, async_client: httpx.AsyncClient = None, *args, **kwargs + ) -> Response: + """DELETE the data from the api_path""" + headers = kwargs.get("headers", {}) + if not headers: + kwargs["headers"] = headers + headers.update(self._default_headers) + + url: str = urlparse(self._base_url + api_path).geturl() + if not async_client: + async with httpx.AsyncClient() as async_client: + response = await async_client.delete( + url, auth=self.auth, *args, **kwargs + ) + else: + response = await async_client.delete(url, auth=self.auth, *args, **kwargs) + return response diff --git a/kafka_schema_registry_admin/client_wrapper/errors.py b/kafka_schema_registry_admin/client_wrapper/errors.py index ed77bf7..3efb66c 100644 --- a/kafka_schema_registry_admin/client_wrapper/errors.py +++ b/kafka_schema_registry_admin/client_wrapper/errors.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: Apache License 2.0 -# Copyright 2021 John Mille +# Copyright 2024 John Mille from __future__ import annotations diff --git a/kafka_schema_registry_admin/kafka_schema_registry_admin.py b/kafka_schema_registry_admin/kafka_schema_registry_admin.py index 7476a31..72f8a94 100644 --- a/kafka_schema_registry_admin/kafka_schema_registry_admin.py +++ b/kafka_schema_registry_admin/kafka_schema_registry_admin.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: Apache License 2.0 -# Copyright 2021 John Mille +# Copyright 2024 John Mille """ Main module for schema_registry_admin @@ -9,11 +9,12 @@ from typing import TYPE_CHECKING +from . import CompatibilityMode, RegistryMode, Type + if TYPE_CHECKING: from requests import Response import json -from enum import Enum from logging import getLogger from urllib.parse import urlencode @@ -24,28 +25,6 @@ LOG.setLevel("WARN") -class Type(Enum): - AVRO = "AVRO" - JSON = "JSON" - PROTOBUFF = "PROTOBUF" - - -class RegistryMode(Enum): - IMPORT = "IMPORT" - READONLY = "READONLY" - READWRITE = "READWRITE" - - -class CompatibilityMode(Enum): - BACKWARD = "BACKWARD" - BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE" - FORWARD = "FORWARD" - FORWARD_TRANSITIVE = "FORWARD_TRANSITIVE" - FULL = "FULL" - FULL_TRANSITIVE = "FULL_TRANSITIVE" - NONE = "NONE" - - class SchemaRegistry: def __init__(self, base_url: str, *args, **kwargs): diff --git a/poetry.lock b/poetry.lock index 7f517d2..20bdba7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -11,6 +11,28 @@ files = [ {file = "alabaster-0.7.16.tar.gz", hash = "sha256:75a8b99c28a5dad50dd7f8ccdd447a121ddb3892da9e53d1ca5cca3106d58d65"}, ] +[[package]] +name = "anyio" +version = "4.3.0" +description = "High level compatibility layer for multiple asynchronous event loop implementations" +optional = false +python-versions = ">=3.8" +files = [ + {file = "anyio-4.3.0-py3-none-any.whl", hash = "sha256:048e05d0f6caeed70d731f3db756d35dcc1f35747c8c403364a8332c630441b8"}, + {file = "anyio-4.3.0.tar.gz", hash = "sha256:f75253795a87df48568485fd18cdd2a3fa5c4f7c5be8e5e36637733fce06fed6"}, +] + +[package.dependencies] +exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""} +idna = ">=2.8" +sniffio = ">=1.1" +typing-extensions = {version = ">=4.1", markers = "python_version < \"3.11\""} + +[package.extras] +doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] +trio = ["trio (>=0.23)"] + [[package]] name = "babel" version = "2.14.0" @@ -391,6 +413,62 @@ docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1 testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)"] typing = ["typing-extensions (>=4.8)"] +[[package]] +name = "h11" +version = "0.14.0" +description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +optional = false +python-versions = ">=3.7" +files = [ + {file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"}, + {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, +] + +[[package]] +name = "httpcore" +version = "1.0.5" +description = "A minimal low-level HTTP client." +optional = false +python-versions = ">=3.8" +files = [ + {file = "httpcore-1.0.5-py3-none-any.whl", hash = "sha256:421f18bac248b25d310f3cacd198d55b8e6125c107797b609ff9b7a6ba7991b5"}, + {file = "httpcore-1.0.5.tar.gz", hash = "sha256:34a38e2f9291467ee3b44e89dd52615370e152954ba21721378a87b2960f7a61"}, +] + +[package.dependencies] +certifi = "*" +h11 = ">=0.13,<0.15" + +[package.extras] +asyncio = ["anyio (>=4.0,<5.0)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +trio = ["trio (>=0.22.0,<0.26.0)"] + +[[package]] +name = "httpx" +version = "0.27.0" +description = "The next generation HTTP client." +optional = false +python-versions = ">=3.8" +files = [ + {file = "httpx-0.27.0-py3-none-any.whl", hash = "sha256:71d5465162c13681bff01ad59b2cc68dd838ea1f10e51574bac27103f00c91a5"}, + {file = "httpx-0.27.0.tar.gz", hash = "sha256:a0cb88a46f32dc874e04ee956e4c2764aba2aa228f650b06788ba6bda2962ab5"}, +] + +[package.dependencies] +anyio = "*" +certifi = "*" +httpcore = "==1.*" +idna = "*" +sniffio = "*" + +[package.extras] +brotli = ["brotli", "brotlicffi"] +cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] + [[package]] name = "identify" version = "2.5.35" @@ -689,6 +767,24 @@ tomli = {version = ">=1", markers = "python_version < \"3.11\""} [package.extras] testing = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +[[package]] +name = "pytest-asyncio" +version = "0.23.6" +description = "Pytest support for asyncio" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pytest-asyncio-0.23.6.tar.gz", hash = "sha256:ffe523a89c1c222598c76856e76852b787504ddb72dd5d9b6617ffa8aa2cde5f"}, + {file = "pytest_asyncio-0.23.6-py3-none-any.whl", hash = "sha256:68516fdd1018ac57b846c9846b954f0393b26f094764a28c955eabb0536a4e8a"}, +] + +[package.dependencies] +pytest = ">=7.0.0,<9" + +[package.extras] +docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"] +testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] + [[package]] name = "pywin32" version = "306" @@ -823,6 +919,17 @@ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.2)", "pip (>=19.1)", "pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] testing-integration = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.2)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] +[[package]] +name = "sniffio" +version = "1.3.1" +description = "Sniff out which async library your code is running under" +optional = false +python-versions = ">=3.7" +files = [ + {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, + {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, +] + [[package]] name = "snowballstemmer" version = "2.2.0" @@ -1209,4 +1316,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "aab36f29d9e37eca28e6abfd6891589315d5c968c61a9eefb4e2aa118d78ab0f" +content-hash = "581da2dd04671604d1bb7cea6584d7faad8447027c6cf137d2da9f36515f8374" diff --git a/pyproject.toml b/pyproject.toml index ec528db..52b9ee9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ readme = "README.rst" [tool.poetry.dependencies] python = "^3.9" requests = "^2.28.0" +httpx = "^0.27.0" [tool.poetry.group.dev.dependencies] testcontainers = "^4.0.0" @@ -30,6 +31,7 @@ Sphinx = "^7.2" pre-commit = "^3.6" pytest = "^8.1" tbump = "^6.9.0" +pytest-asyncio = "^0.23.6" [build-system] requires = ["poetry-core"] diff --git a/tests/conftest.py b/tests/conftest.py index d56b354..8ab27a6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,10 +2,12 @@ from os import path +import httpx import pytest from testcontainers.compose import DockerCompose from kafka_schema_registry_admin import SchemaRegistry +from kafka_schema_registry_admin.async_schema_registry import SchemaRegistry as AsyncSR HERE = path.abspath(path.dirname(__file__)) @@ -24,6 +26,11 @@ docker_compose.wait_for(f"{base_url}/subjects") +@pytest.fixture(scope="session") +def async_client(): + return httpx.AsyncClient() + + @pytest.fixture(scope="session") def local_registry(): return SchemaRegistry(base_url) @@ -37,7 +44,42 @@ def authed_local_registry(): ) +@pytest.fixture(scope="session") +def async_local_registry(async_client): + return AsyncSR( + base_url, + **{"basic_auth.username": "confluent", "basic_auth.password": "confluent"}, + ) + + def pytest_sessionfinish(session, exitstatus): docker_compose.stop() print("Testing session has finished") print(f"Exit status: {exitstatus}") + + +@pytest.fixture(scope="session") +def schema_sample(): + return { + "type": "record", + "namespace": "com.mycorp.mynamespace", + "name": "value_test_subject", + "doc": "Sample schema to help you get started.", + "fields": [ + { + "name": "myField1", + "type": "int", + "doc": "The int type is a 32-bit signed integer.", + }, + { + "name": "myField2", + "type": "double", + "doc": "The double type is a double precision (64-bit) IEEE 754 floating-point number.", + }, + { + "name": "myField3", + "type": "string", + "doc": "The string is a unicode character sequence.", + }, + ], + } diff --git a/tests/test_async_registry_config.py b/tests/test_async_registry_config.py new file mode 100644 index 0000000..775f2db --- /dev/null +++ b/tests/test_async_registry_config.py @@ -0,0 +1,42 @@ +# SPDX-License-Identifier: Apache License 2.0 +# Copyright 2024 John Mille + +import json +from copy import deepcopy +from os import path + +import pytest + +from kafka_schema_registry_admin.async_schema_registry import ( + CompatibilityMode, + RegistryMode, +) + + +@pytest.mark.asyncio(scope="session") +async def test_changing_compatibility(async_local_registry): + config = (await async_local_registry.get_config()).json() + print("CONFIG 1?", config) + await async_local_registry.put_config( + compatibility=CompatibilityMode.FULL.value, normalize=True + ) + config = (await async_local_registry.get_config()).json() + print("CONFIG 2?", config) + + +@pytest.mark.asyncio(scope="session") +async def test_changing_mode(async_local_registry, async_client): + mode = await async_local_registry.get_mode(async_client=async_client, as_str=True) + assert mode == RegistryMode.READWRITE.value + + await async_local_registry.put_mode(RegistryMode.READONLY.value) + assert (await async_local_registry.get_mode()).json()[ + "mode" + ] == RegistryMode.READONLY.value + + await async_local_registry.put_mode(RegistryMode.IMPORT.value) + assert (await async_local_registry.get_mode()).json()[ + "mode" + ] == RegistryMode.IMPORT.value + + await async_local_registry.put_mode(RegistryMode.READWRITE.value) diff --git a/tests/test_async_subjects_schemas.py b/tests/test_async_subjects_schemas.py new file mode 100644 index 0000000..7438dbc --- /dev/null +++ b/tests/test_async_subjects_schemas.py @@ -0,0 +1,113 @@ +# SPDX-License-Identifier: Apache License 2.0 +# Copyright 2024 John Mille +import json +from copy import deepcopy + +import pytest + +from kafka_schema_registry_admin.client_wrapper.errors import ( + IncompatibleSchema, + NotFoundException, +) + + +@pytest.mark.asyncio(scope="session") +async def test_reset_sr_mode(async_local_registry): + await async_local_registry.put_mode("READWRITE") + + +@pytest.mark.asyncio(scope="session") +async def test_register_new_definition(async_local_registry, schema_sample): + c = await async_local_registry.post_subject_schema_version( + "test-subject4", schema_sample, normalize=True + ) + r = await async_local_registry.get_schema_from_id(c.json()["id"]) + assert "test-subject4" in (await async_local_registry.get_all_subjects()).json() + r = await async_local_registry.get_subject_versions("test-subject4") + assert 1 in r.json() + full_details = ( + await async_local_registry.get_subject_version_id("test-subject4", 1) + ).json() + schema = ( + await async_local_registry.get_subject_version_id_schema("test-subject4", 1) + ).json() + assert len(schema["fields"]) == len(json.loads(full_details["schema"])["fields"]) + + +@pytest.mark.asyncio(scope="session") +async def test_subject_existing_schema_definition(async_local_registry, schema_sample): + r = await async_local_registry.post_subject_schema( + "test-subject4", schema_sample, "AVRO" + ) + r = await async_local_registry.get_schema_versions_from_id(r.json()["id"]) + + +@pytest.mark.asyncio(scope="session") +async def test_register_new_definition_updated(async_local_registry, schema_sample): + new_version = deepcopy(schema_sample) + new_version["fields"].append( + { + "doc": "The string is a unicode character sequence.", + "name": "myField4", + "type": "string", + } + ) + test = await async_local_registry.post_subject_schema( + "test-subject4", schema_sample + ) + latest = await async_local_registry.get_subject_versions_referencedby( + "test-subject4", test.json()["version"] + ) + + new_compat = await async_local_registry.put_compatibility_subject_config( + "test-subject4", "BACKWARD" + ) + assert new_compat.json()["compatibility"] == "BACKWARD" + compat = await async_local_registry.post_compatibility_subject_version_id( + "test-subject4", + test.json()["version"], + new_version, + verbose=True, + ) + assert isinstance(compat.json()["is_compatible"], bool) + is_compatible = compat.json()["is_compatible"] + if is_compatible: + r = await async_local_registry.post_subject_schema_version( + "test-subject4", new_version, schema_type="AVRO" + ) + + await async_local_registry.put_compatibility_subject_config( + "test-subject4", "FORWARD" + ) + new_version["fields"].pop(0) + + +@pytest.mark.asyncio(scope="session") +async def test_get_all_subjects(async_local_registry): + r = (await async_local_registry.get_all_subjects()).json() + assert isinstance(r, list) and r + r = await async_local_registry.get_all_subjects(subject_prefix="test-subject4") + assert "test-subject4" in r.json() + + +@pytest.mark.asyncio(scope="session") +async def test_get_all_schema_types(async_local_registry): + r = (await async_local_registry.get_schema_types()).json() + assert isinstance(r, list) and r + + +@pytest.mark.asyncio(scope="session") +async def test_delete_subject(async_local_registry): + versions = (await async_local_registry.get_subject_versions("test-subject4")).json() + await async_local_registry.delete_subject( + "test-subject4", permanent=False, version_id=versions[-1] + ) + await async_local_registry.delete_subject("test-subject4", permanent=False) + r = (await async_local_registry.get_all_subjects(deleted=True)).json() + print(r) + r = await async_local_registry.get_all_subjects( + deleted=True, subject_prefix="test-subject4" + ) + print("R2", r.json()) + await async_local_registry.delete_subject("test-subject4", permanent=True) + r = (await async_local_registry.get_all_subjects(deleted=True)).json() diff --git a/tests/test_registry_config.py b/tests/test_registry_config.py index d518be4..f41dc27 100644 --- a/tests/test_registry_config.py +++ b/tests/test_registry_config.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: Apache License 2.0 -# Copyright 2020-2021 John Mille +# Copyright 2024 John Mille import json from copy import deepcopy from os import path @@ -21,6 +21,7 @@ def test_changing_compatibility(authed_local_registry): def test_changing_mode(authed_local_registry): mode = authed_local_registry.get_mode(as_str=True) + print("MODE?", mode) assert mode == RegistryMode.READWRITE.value authed_local_registry.put_mode(RegistryMode.READONLY.value) diff --git a/tests/test_subjects_schemas.py b/tests/test_subjects_schemas.py index 2106354..116c3e9 100644 --- a/tests/test_subjects_schemas.py +++ b/tests/test_subjects_schemas.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: Apache License 2.0 -# Copyright 2020-2021 John Mille +# Copyright 2024 John Mille import json from copy import deepcopy @@ -11,33 +11,6 @@ ) -@pytest.fixture() -def schema_sample(): - return { - "type": "record", - "namespace": "com.mycorp.mynamespace", - "name": "value_test_subject", - "doc": "Sample schema to help you get started.", - "fields": [ - { - "name": "myField1", - "type": "int", - "doc": "The int type is a 32-bit signed integer.", - }, - { - "name": "myField2", - "type": "double", - "doc": "The double type is a double precision (64-bit) IEEE 754 floating-point number.", - }, - { - "name": "myField3", - "type": "string", - "doc": "The string is a unicode character sequence.", - }, - ], - } - - def test_reset_sr_mode(local_registry): local_registry.put_mode("READWRITE") diff --git a/tests/test_z_subjects_recovery.py b/tests/test_z_subjects_recovery.py index 205b2c2..4788540 100644 --- a/tests/test_z_subjects_recovery.py +++ b/tests/test_z_subjects_recovery.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: Apache License 2.0 -# Copyright 2020-2021 John Mille +# Copyright 2024 John Mille import json from copy import deepcopy