Skip to content

Commit

Permalink
Bump
Browse files Browse the repository at this point in the history
  • Loading branch information
rattrayalex committed May 18, 2022
1 parent 7bdd1e7 commit 6416022
Show file tree
Hide file tree
Showing 16 changed files with 340 additions and 138 deletions.
227 changes: 158 additions & 69 deletions lithic/_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations
import time
from typing import Any, TypeVar, Union, Optional, Type, Generic, Generator, Iterator, AsyncIterator
from typing import Any, TypeVar, Union, Optional, Type, Generic, Generator, Iterator, AsyncIterator, Mapping, cast
from ._types import Query
from ._core import RequestOptions, FinalRequestOptions

Expand All @@ -18,89 +18,145 @@
FinalRequestOptions,
Rsp,
)
from ._types import ModelT
from .exceptions import make_status_error, APITimeoutError, APIConnectionError, APIResponseValidationError


Item = TypeVar("Item")
PageParams = TypeVar("PageParams")
TPage = TypeVar("TPage", bound="SyncPage")
TAsyncPage = TypeVar("TAsyncPage", bound="AsyncPage")


class SyncPage(BasePage[Item, PageParams], Generic[Item, TPage, PageParams]):
# TODO: make base page type vars covariant
SyncPageT = TypeVar("SyncPageT", bound="BaseSyncPage[Any]")
AsyncPageT = TypeVar("AsyncPageT", bound="BaseAsyncPage[Any]")


class BaseSyncPage(BasePage[ModelT, Mapping[str, object]], Generic[ModelT]):
_client: SyncAPIClient = pydantic.PrivateAttr()
_tmodel: Type[Item] = pydantic.PrivateAttr()
_tpage: Type[TPage] = pydantic.PrivateAttr()

def _init_for_iter(self, client: SyncAPIClient, model: Type[Item], page: Type[TPage], options: FinalRequestOptions):
def _set_private_attributes(
self,
client: SyncAPIClient,
model: Type[ModelT],
options: FinalRequestOptions,
) -> None:
self._model = model
self._client = client
self._tmodel = model
self._tpage = page
self._options = options

def __iter__(self) -> Iterator[Item]:
for page in self._iter_pages():
# Pydantic uses a custom `__iter__` method to support casting BaseModels
# to dictionaries. e.g. dict(model).
# As we want to support `for item in page`, this is inherently incompatible
# with the default pydantic behaviour. It is not possible to support both
# use cases at once. Fortunately, this is not a big deal as all other pydantic
# methods should continue to work as expected as there is an alternative method
# to cast a model to a dictionary, model.dict(), which is used internally
# by pydantic.
def __iter__(self) -> Iterator[ModelT]: # type: ignore
for page in self.iter_pages():
for item in page._get_page_items():
yield item

def _iter_pages(self) -> Iterator[TPage]:
def iter_pages(self: SyncPageT) -> Iterator[SyncPageT]:
page = self
while True:
yield self # type: ignore
if self.has_next_page():
self = self.get_next_page()
yield page
if page.has_next_page():
page = page.get_next_page()
else:
return

def get_next_page(self) -> TPage:
def get_next_page(self: SyncPageT) -> SyncPageT:
page_params = self._next_page_params()
params = self._options.get("params", {}) or {}
options = {**self._options, "params": {**params, **page_params}}
return self._client.request_api_list(self._tmodel, self._tpage, options)

# this explicit type hint is required as the FinalRequestOptions
# TypedDict is incompatible with dict which is what mypy infers options
# to be.
options: Mapping[str, object] = {**self._options, "params": {**params, **page_params}}
return self._client.request_api_list(
self._model,
page=self.__class__,
# TODO: validate that what we pass is actually valid at runtime
options=options, # type: ignore
)

class AsyncPage(BasePage[Item, PageParams], Generic[Item, TAsyncPage, PageParams]):
_client: AsyncAPIClient = pydantic.PrivateAttr()
_tmodel: Type[Item] = pydantic.PrivateAttr()
_tpage: Type[TAsyncPage] = pydantic.PrivateAttr()
_awaited: bool = pydantic.PrivateAttr()

def _init_for_iter(
self, client: AsyncAPIClient, model: Type[Item], page: Type[TAsyncPage], options: FinalRequestOptions
):
class AsyncPaginator(Generic[ModelT, AsyncPageT]):
def __init__(
self,
client: AsyncAPIClient,
options: FinalRequestOptions,
page_cls: Type[AsyncPageT],
model: Type[ModelT],
) -> None:
self._model = model
self._client = client
self._tmodel = model
self._tpage = page
self._options = options
self._awaited = False
self._page_cls = page_cls

def __await__(self) -> Generator[Any, None, TAsyncPage]:
async def awaitable() -> TAsyncPage:
resp: TAsyncPage = await self._client.request(self._tpage, self._options)
resp._init_for_iter(self._client, self._tmodel, self._tpage, self._options)
self._awaited = True
return resp
def __await__(self) -> Generator[Any, None, AsyncPageT]:
return self._get_page().__await__()

return awaitable().__await__()
async def _get_page(self) -> AsyncPageT:
page = await self._client.request(self._page_cls, self._options)
page._set_private_attributes( # pyright: ignore[reportPrivateUsage]
model=self._model,
options=self._options,
client=self._client,
)
return page

async def _iter_pages(self) -> AsyncIterator[TAsyncPage]:
while True:
yield self
if self.has_next_page():
self = await self.get_next_page()
else:
return
async def __aiter__(self) -> AsyncIterator[ModelT]:
# https://github.com/microsoft/pyright/issues/3464
page = cast(
AsyncPageT,
await self, # type: ignore
)
async for item in page:
yield item


class BaseAsyncPage(BasePage[ModelT, Mapping[str, object]], Generic[ModelT]):
_client: AsyncAPIClient = pydantic.PrivateAttr()

def _set_private_attributes(
self,
model: Type[ModelT],
client: AsyncAPIClient,
options: FinalRequestOptions,
) -> None:
self._model = model
self._client = client
self._options = options

async def __aiter__(self) -> AsyncIterator[Item]:
if not self._awaited:
self = await self
async for page in self._iter_pages():
async def __aiter__(self) -> AsyncIterator[ModelT]:
async for page in self.iter_pages():
for item in page._get_page_items():
yield item

async def get_next_page(self) -> TAsyncPage:
async def iter_pages(self: AsyncPageT) -> AsyncIterator[AsyncPageT]:
page = self
while True:
yield page
if page.has_next_page():
page = await page.get_next_page()
else:
return

async def get_next_page(self: AsyncPageT) -> AsyncPageT:
page_params = self._next_page_params()
params = self._options.get("params", {}) or {}
options = {**self._options, "params": {**params, **page_params}}
return await self._client.request_api_list(self._tmodel, self._tpage, options)

# this explicit type hint is required as the FinalRequestOptions
# TypedDict is incompatible with dict which is what mypy infers options
# to be.
options: Mapping[str, object] = {**self._options, "params": {**params, **page_params}}
return await self._client.request_api_list(
self._model,
page=self.__class__,
# TODO: validate that what we pass is actually valid at runtime
options=options, # type: ignore
)


class SyncAPIClient(BaseClient):
Expand All @@ -122,15 +178,17 @@ def __init__(
self._client = httpx.Client(
base_url=base_url,
timeout=timeout,
proxies=proxies,
transport=transport,
proxies=proxies, # type: ignore
transport=transport, # type: ignore
headers={"Accept": "application/json"},
)

def request(self, model: Type[Rsp], options: FinalRequestOptions, remaining_retries: Optional[int] = None) -> Rsp:
retries = self.remaining_retries(remaining_retries, options)
req_args = self.prepare_request_args(options)
request = self._client.build_request(**req_args)

# https://github.com/encode/httpx/discussions/2181
request = self._client.build_request(**req_args) # pyright: ignore[reportUnknownMemberType]

try:
response = self._client.send(request)
Expand Down Expand Up @@ -174,9 +232,18 @@ def retry_request(
remaining_retries=remaining,
)

def request_api_list(self, model: Type[Item], page: Type[TPage], options: FinalRequestOptions) -> TPage:
resp: TPage = self.request(page, options)
resp._init_for_iter(self, model, page, options)
def request_api_list(
self,
model: Type[ModelT],
page: Type[SyncPageT],
options: FinalRequestOptions,
) -> SyncPageT:
resp = self.request(page, options)
resp._set_private_attributes( # pyright: ignore[reportPrivateUsage]
client=self,
model=model,
options=options,
)
return resp

def get(self, path: str, *, query: Query | None = None, model: Type[Rsp], options: RequestOptions) -> Rsp:
Expand All @@ -200,8 +267,14 @@ def delete(self, path: str, *, body: Query | None = None, model: Type[Rsp], opti
return self.request(model, opts)

def get_api_list(
self, path: str, *, query: Query | None = None, model: Type[Item], page: Type[TPage], options: RequestOptions
) -> TPage:
self,
path: str,
*,
query: Query | None = None,
model: Type[ModelT],
page: Type[SyncPageT],
options: RequestOptions,
) -> SyncPageT:
opts = FinalRequestOptions(method="get", url=path, params=query, **options) # type: ignore[misc]
return self.request_api_list(model, page, opts)

Expand All @@ -225,8 +298,8 @@ def __init__(
self._client = httpx.AsyncClient(
base_url=base_url,
timeout=timeout,
proxies=proxies,
transport=transport,
proxies=proxies, # type: ignore
transport=transport, # type: ignore
headers={"Accept": "application/json"},
)

Expand All @@ -235,14 +308,27 @@ async def request(
) -> Rsp:
retries = self.remaining_retries(remaining_retries, options)
req_args = self.prepare_request_args(options)
request = self._client.build_request(**req_args)

# https://github.com/encode/httpx/discussions/2181
request = self._client.build_request(**req_args) # pyright: ignore[reportUnknownMemberType]

try:
response = await self._client.send(request)
response.raise_for_status()
except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code
if retries > 0 and self.should_retry(err.response):
return await self.retry_request(options, model, retries, err.response.headers)
raise make_status_error(request, err.response)
except httpx.ConnectTimeout as err:
if retries > 0:
return await self.retry_request(options, model, retries)
raise APITimeoutError(request=request) from err
except httpx.ReadTimeout as err:
# We explicitly do not retry on ReadTimeout errors as this means
# that the server processing the request has taken 60 seconds
# (our default timeout). This likely indicates that something
# is not working as expected on the server side.
raise
except httpx.TimeoutException as err:
if retries > 0:
return await self.retry_request(options, model, retries)
Expand Down Expand Up @@ -276,10 +362,13 @@ async def retry_request(
remaining_retries=remaining,
)

def request_api_list(self, model: Type[Item], page: Type[TAsyncPage], options: FinalRequestOptions) -> TAsyncPage:
p = page.construct() # construct() here is necessary to instanciate without triggering pydantic validation
p._init_for_iter(self, model, page, options)
return p
def request_api_list(
self,
model: Type[ModelT],
page: Type[AsyncPageT],
options: FinalRequestOptions,
) -> AsyncPaginator[ModelT, AsyncPageT]:
return AsyncPaginator(client=self, options=options, page_cls=page, model=model)

async def get(self, path: str, *, query: Query | None = None, model: Type[Rsp], options: RequestOptions) -> Rsp:
opts = FinalRequestOptions(method="get", url=path, params=query, **options) # type: ignore[misc]
Expand All @@ -306,9 +395,9 @@ def get_api_list(
path: str,
*,
query: Query | None = None,
model: Type[Item],
page: Type[TAsyncPage],
model: Type[ModelT],
page: Type[AsyncPageT],
options: RequestOptions,
) -> TAsyncPage:
) -> AsyncPaginator[ModelT, AsyncPageT]:
opts = FinalRequestOptions(method="get", url=path, params=query, **options) # type: ignore[misc]
return self.request_api_list(model, page, opts)
Loading

0 comments on commit 6416022

Please sign in to comment.