From 6039d08774b7bcd0784d8caa075f0aacaf94b8c3 Mon Sep 17 00:00:00 2001 From: fselmo Date: Wed, 26 Jul 2023 18:19:40 -0600 Subject: [PATCH 1/8] Add AsyncIterator functionality for ``_PersistentConnectionWeb3`` - AsyncIterator support: Add ``__aiter__()`` method to ``_PersistentConnectionWeb3`` in order to support infinite websocket connections. --- web3/main.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/web3/main.py b/web3/main.py index 2330c8fc32..7db7f5778e 100644 --- a/web3/main.py +++ b/web3/main.py @@ -1,3 +1,4 @@ +import asyncio import decimal import warnings from types import ( @@ -33,6 +34,7 @@ ) from typing import ( Any, + AsyncIterator, Dict, List, Optional, @@ -538,6 +540,16 @@ def __init__( ) AsyncWeb3.__init__(self, provider, middlewares, modules, external_modules, ens) + # async for w3 in w3.persistent_websocket(provider) + async def __aiter__(self) -> AsyncIterator["_PersistentConnectionWeb3"]: + while True: + try: + yield self + except Exception: + # provider should handle connection / reconnection + continue + + # async with w3.persistent_websocket(provider) as w3 async def __aenter__(self) -> "_PersistentConnectionWeb3": await self.provider.connect() return self From c2794d9a505e89530101df73ef7d054ba24ca43d Mon Sep 17 00:00:00 2001 From: fselmo Date: Thu, 27 Jul 2023 11:04:18 -0600 Subject: [PATCH 2/8] Use unsolicited pong for heartbeat + add retries for connecting wsV2 - If we don't care about latency, we should just use an unsolicited pong for the heartbeat: https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#websockets.client.WebSocketClientProtocol.pong - Add a max for connection retries, set a default, and add an exponential backoff rate when attempting to connect. --- web3/providers/websocket/websocket_v2.py | 39 ++++++++++++++++++------ 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/web3/providers/websocket/websocket_v2.py b/web3/providers/websocket/websocket_v2.py index fc4ad70e51..8dd62ce412 100644 --- a/web3/providers/websocket/websocket_v2.py +++ b/web3/providers/websocket/websocket_v2.py @@ -58,11 +58,14 @@ def __init__( endpoint_uri: Optional[Union[URI, str]] = None, websocket_kwargs: Optional[Dict[str, Any]] = None, call_timeout: Optional[int] = None, + max_connection_retries: Optional[int] = 5, ) -> None: self.endpoint_uri = URI(endpoint_uri) if self.endpoint_uri is None: self.endpoint_uri = get_default_endpoint() + self.max_connection_retries = max_connection_retries + if not any( self.endpoint_uri.startswith(prefix) for prefix in VALID_WEBSOCKET_URI_PREFIXES @@ -94,11 +97,10 @@ async def is_connected(self, show_traceback: bool = False) -> bool: return False try: - pong_waiter = await self.ws.ping() - await asyncio.wait_for(pong_waiter, timeout=self.call_timeout) + await self.ws.pong() return True - except (WebSocketException, asyncio.TimeoutError) as e: + except WebSocketException as e: if show_traceback: raise ProviderConnectionError( f"Error connecting to endpoint: '{self.endpoint_uri}'" @@ -106,12 +108,31 @@ async def is_connected(self, show_traceback: bool = False) -> bool: return False async def connect(self) -> None: - try: - self.ws = await connect(self.endpoint_uri, **self.websocket_kwargs) - except Exception as e: - raise ProviderConnectionError( - f"Could not connect to endpoint: {self.endpoint_uri}" - ) from e + _connection_attempts = 0 + _backoff_rate_change = 1.75 + _backoff_time = 1.75 + + while True: + try: + _connection_attempts += 1 + self.ws = await connect(self.endpoint_uri, **self.websocket_kwargs) + break + except WebSocketException as e: + if ( + self.max_connection_retries + and _connection_attempts > self.max_connection_retries + ): + raise ProviderConnectionError( + f"Could not connect to endpoint: {self.endpoint_uri}. " + f"Retries exceeded max of {self.max_connection_retries}." + ) from e + self.logger.info( + f"Could not connect to endpoint: {self.endpoint_uri}. Retrying in " + f"{round(_backoff_time, 1)} seconds.", + exc_info=True, + ) + await asyncio.sleep(_backoff_time) + _backoff_time *= _backoff_rate_change async def disconnect(self) -> None: await self.ws.close() From ef4d836e7b83249d2eea48d4be0becdf4f4f1b8d Mon Sep 17 00:00:00 2001 From: fselmo Date: Thu, 27 Jul 2023 13:29:01 -0600 Subject: [PATCH 3/8] Some minor tweaks for WebsocketProviderV2 --- web3/providers/websocket/websocket_v2.py | 1 + 1 file changed, 1 insertion(+) diff --git a/web3/providers/websocket/websocket_v2.py b/web3/providers/websocket/websocket_v2.py index 8dd62ce412..7f6451841d 100644 --- a/web3/providers/websocket/websocket_v2.py +++ b/web3/providers/websocket/websocket_v2.py @@ -136,6 +136,7 @@ async def connect(self) -> None: async def disconnect(self) -> None: await self.ws.close() + self.ws = None # clear the provider request cache after disconnecting self._async_response_processing_cache.clear() From 668ed1de57741fc18c6ac66630c730c9dabdba37 Mon Sep 17 00:00:00 2001 From: fselmo Date: Thu, 27 Jul 2023 13:37:44 -0600 Subject: [PATCH 4/8] Add a test suite for AsyncWeb3 w/ ws v2 as an ``AsyncIterator`` - Separate the goethereum_ws_v2 tests into a directory with ``AsyncGenerator`` / "async with w3..." style tests and ``AsyncIterator`` / "async for w3..." style tests --- .../test_goethereum_ws_v2/__init__.py | 0 .../test_goethereum_ws_v2/conftest.py | 49 +++++++++++++ .../test_async_generator_w3.py} | 54 ++------------- .../test_async_iterator_w3.py | 69 +++++++++++++++++++ tox.ini | 4 +- web3/main.py | 1 - 6 files changed, 125 insertions(+), 52 deletions(-) create mode 100644 tests/integration/go_ethereum/test_goethereum_ws_v2/__init__.py create mode 100644 tests/integration/go_ethereum/test_goethereum_ws_v2/conftest.py rename tests/integration/go_ethereum/{test_goethereum_ws_v2.py => test_goethereum_ws_v2/test_async_generator_w3.py} (60%) create mode 100644 tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_iterator_w3.py diff --git a/tests/integration/go_ethereum/test_goethereum_ws_v2/__init__.py b/tests/integration/go_ethereum/test_goethereum_ws_v2/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/integration/go_ethereum/test_goethereum_ws_v2/conftest.py b/tests/integration/go_ethereum/test_goethereum_ws_v2/conftest.py new file mode 100644 index 0000000000..c2c904d5e6 --- /dev/null +++ b/tests/integration/go_ethereum/test_goethereum_ws_v2/conftest.py @@ -0,0 +1,49 @@ +import pytest + +from tests.integration.common import ( + COINBASE, +) +from tests.utils import ( + get_open_port, +) + + +@pytest.fixture(scope="module") +def ws_port(): + return get_open_port() + + +@pytest.fixture(scope="module") +def endpoint_uri(ws_port): + return f"ws://localhost:{ws_port}" + + +def _geth_command_arguments(ws_port, base_geth_command_arguments, geth_version): + yield from base_geth_command_arguments + if geth_version.major == 1: + yield from ( + "--miner.etherbase", + COINBASE[2:], + "--ws", + "--ws.port", + ws_port, + "--ws.api", + "admin,eth,net,web3,personal,miner", + "--ws.origins", + "*", + "--ipcdisable", + "--allow-insecure-unlock", + ) + if geth_version.minor not in [10, 11]: + raise AssertionError("Unsupported Geth version") + else: + raise AssertionError("Unsupported Geth version") + + +@pytest.fixture(scope="module") +def geth_command_arguments( + geth_binary, get_geth_version, datadir, ws_port, base_geth_command_arguments +): + return _geth_command_arguments( + ws_port, base_geth_command_arguments, get_geth_version + ) diff --git a/tests/integration/go_ethereum/test_goethereum_ws_v2.py b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_generator_w3.py similarity index 60% rename from tests/integration/go_ethereum/test_goethereum_ws_v2.py rename to tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_generator_w3.py index 610dc78940..01d3bace3a 100644 --- a/tests/integration/go_ethereum/test_goethereum_ws_v2.py +++ b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_generator_w3.py @@ -2,12 +2,6 @@ import pytest_asyncio -from tests.integration.common import ( - COINBASE, -) -from tests.utils import ( - get_open_port, -) from web3 import ( AsyncWeb3, WebsocketProviderV2, @@ -19,56 +13,15 @@ GoEthereumAsyncPersonalModuleTest, ) -from .common import ( +from ..common import ( GoEthereumAsyncEthModuleTest, GoEthereumAsyncNetModuleTest, ) -from .utils import ( +from ..utils import ( wait_for_aiohttp, ) -@pytest.fixture(scope="module") -def ws_port(): - return get_open_port() - - -@pytest.fixture(scope="module") -def endpoint_uri(ws_port): - return f"ws://localhost:{ws_port}" - - -def _geth_command_arguments(ws_port, base_geth_command_arguments, geth_version): - yield from base_geth_command_arguments - if geth_version.major == 1: - yield from ( - "--miner.etherbase", - COINBASE[2:], - "--ws", - "--ws.port", - ws_port, - "--ws.api", - "admin,eth,net,web3,personal,miner", - "--ws.origins", - "*", - "--ipcdisable", - "--allow-insecure-unlock", - ) - if geth_version.minor not in [10, 11]: - raise AssertionError("Unsupported Geth version") - else: - raise AssertionError("Unsupported Geth version") - - -@pytest.fixture(scope="module") -def geth_command_arguments( - geth_binary, get_geth_version, datadir, ws_port, base_geth_command_arguments -): - return _geth_command_arguments( - ws_port, base_geth_command_arguments, get_geth_version - ) - - @pytest_asyncio.fixture(scope="module") async def async_w3(geth_process, endpoint_uri): await wait_for_aiohttp(endpoint_uri) @@ -79,12 +32,14 @@ async def async_w3(geth_process, endpoint_uri): class TestGoEthereumAsyncAdminModuleTest(GoEthereumAsyncAdminModuleTest): + @pytest.mark.asyncio @pytest.mark.xfail( reason="running geth with the --nodiscover flag doesn't allow peer addition" ) async def test_admin_peers(self, async_w3: "AsyncWeb3") -> None: await super().test_admin_peers(async_w3) + @pytest.mark.asyncio async def test_admin_start_stop_http(self, async_w3: "AsyncWeb3") -> None: # This test causes all tests after it to fail on CI if it's allowed to run pytest.xfail( @@ -92,6 +47,7 @@ async def test_admin_start_stop_http(self, async_w3: "AsyncWeb3") -> None: ) await super().test_admin_start_stop_http(async_w3) + @pytest.mark.asyncio async def test_admin_start_stop_ws(self, async_w3: "AsyncWeb3") -> None: # This test inconsistently causes all tests after it to # fail on CI if it's allowed to run diff --git a/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_iterator_w3.py b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_iterator_w3.py new file mode 100644 index 0000000000..fc321fc455 --- /dev/null +++ b/tests/integration/go_ethereum/test_goethereum_ws_v2/test_async_iterator_w3.py @@ -0,0 +1,69 @@ +import pytest + +import pytest_asyncio + +from web3 import ( + AsyncWeb3, + WebsocketProviderV2, +) +from web3._utils.module_testing.go_ethereum_admin_module import ( + GoEthereumAsyncAdminModuleTest, +) +from web3._utils.module_testing.go_ethereum_personal_module import ( + GoEthereumAsyncPersonalModuleTest, +) + +from ..common import ( + GoEthereumAsyncEthModuleTest, + GoEthereumAsyncNetModuleTest, +) +from ..utils import ( + wait_for_aiohttp, +) + + +@pytest_asyncio.fixture(scope="module") +async def async_w3(geth_process, endpoint_uri): + await wait_for_aiohttp(endpoint_uri) + async for w3 in AsyncWeb3.persistent_websocket( + WebsocketProviderV2(endpoint_uri, call_timeout=30) + ): + return w3 + + +class TestGoEthereumAsyncAdminModuleTest(GoEthereumAsyncAdminModuleTest): + @pytest.mark.asyncio + @pytest.mark.xfail( + reason="running geth with the --nodiscover flag doesn't allow peer addition" + ) + async def test_admin_peers(self, async_w3: "AsyncWeb3") -> None: + await super().test_admin_peers(async_w3) + + @pytest.mark.asyncio + async def test_admin_start_stop_http(self, async_w3: "AsyncWeb3") -> None: + # This test causes all tests after it to fail on CI if it's allowed to run + pytest.xfail( + reason="Only one HTTP endpoint is allowed to be active at any time" + ) + await super().test_admin_start_stop_http(async_w3) + + @pytest.mark.asyncio + async def test_admin_start_stop_ws(self, async_w3: "AsyncWeb3") -> None: + # This test inconsistently causes all tests after it to + # fail on CI if it's allowed to run + pytest.xfail( + reason="Only one WebSocket endpoint is allowed to be active at any time" + ) + await super().test_admin_start_stop_ws(async_w3) + + +class TestGoEthereumAsyncEthModuleTest(GoEthereumAsyncEthModuleTest): + pass + + +class TestGoEthereumAsyncNetModuleTest(GoEthereumAsyncNetModuleTest): + pass + + +class TestGoEthereumAsyncPersonalModuleTest(GoEthereumAsyncPersonalModuleTest): + pass diff --git a/tox.ini b/tox.ini index d9d2f6ec79..b03efe0026 100644 --- a/tox.ini +++ b/tox.ini @@ -30,8 +30,8 @@ commands= integration-goethereum-http_flaky: pytest {posargs:tests/integration/go_ethereum/test_goethereum_http.py --flaky} integration-goethereum-ws: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws.py} integration-goethereum-ws_flaky: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws.py --flaky} - integration-goethereum-ws-v2: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws_v2.py} - integration-goethereum-ws-v2_flaky: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws_v2.py --flaky} + integration-goethereum-ws-v2: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws_v2} + integration-goethereum-ws-v2_flaky: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws_v2 --flaky} integration-ethtester: pytest {posargs:tests/integration/test_ethereum_tester.py} docs: make -C {toxinidir} validate-docs deps = diff --git a/web3/main.py b/web3/main.py index 7db7f5778e..4191fc795e 100644 --- a/web3/main.py +++ b/web3/main.py @@ -1,4 +1,3 @@ -import asyncio import decimal import warnings from types import ( From 7f0b3883ef5a5a36e40d4e20ca2a33e863eec0b7 Mon Sep 17 00:00:00 2001 From: fselmo Date: Fri, 28 Jul 2023 12:11:28 -0600 Subject: [PATCH 5/8] Fix typing for optional eth_subscription params --- web3/eth/async_eth.py | 8 +++++--- web3/types.py | 4 ---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/web3/eth/async_eth.py b/web3/eth/async_eth.py index 92cf5c917b..ebbc81f0dc 100644 --- a/web3/eth/async_eth.py +++ b/web3/eth/async_eth.py @@ -86,7 +86,6 @@ TxData, TxParams, TxReceipt, - TxTypeSubscriptionArg, Wei, _Hash32, ) @@ -669,7 +668,7 @@ async def uninstall_filter(self, filter_id: HexStr) -> bool: Callable[ [ SubscriptionType, - Optional[Union[LogsSubscriptionArg, TxTypeSubscriptionArg]], + Optional[Union[LogsSubscriptionArg, bool]], ], Awaitable[HexStr], ] @@ -682,7 +681,10 @@ async def subscribe( self, subscription_type: SubscriptionType, subscription_arg: Optional[ - Union[LogsSubscriptionArg, TxTypeSubscriptionArg] + Union[ + LogsSubscriptionArg, # logs, optional filter params + bool, # newPendingTransactions, full_transactions + ] ] = None, ) -> HexStr: if not isinstance(self.w3.provider, PersistentConnectionProvider): diff --git a/web3/types.py b/web3/types.py index 4616963094..4a975d50e5 100644 --- a/web3/types.py +++ b/web3/types.py @@ -546,7 +546,3 @@ class LogsSubscriptionArg(TypedDict, total=False): Sequence[Union[Address, ChecksumAddress, ENS]], ] topics: Sequence[Union[HexStr, Sequence[HexStr]]] - - -class TxTypeSubscriptionArg(TypedDict, total=False): - full_transactions: bool From 40adf70ce76ed0b67e626b956fa6b110defc59e4 Mon Sep 17 00:00:00 2001 From: fselmo Date: Fri, 28 Jul 2023 15:55:12 -0600 Subject: [PATCH 6/8] Add docs for WebsocketProviderV2 as asynchronous iterator --- docs/providers.rst | 55 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/docs/providers.rst b/docs/providers.rst index 1b40660204..43be484d37 100644 --- a/docs/providers.rst +++ b/docs/providers.rst @@ -233,7 +233,24 @@ WebsocketProviderV2 (beta) * ``call_timeout`` is the timeout in seconds, used when receiving or sending data over the connection. Defaults to ``None`` (no timeout). - .. code-block:: python + Under the hood, the ``WebsocketProviderV2`` uses the python websockets library for + making requests. If you would like to modify how requests are made, you can + use the ``websocket_kwargs`` to do so. See the `websockets documentation`_ for + available arguments. + + The timeout for each call to send or receive is controlled by a ``call_timeout`` + argument. This is set to ``None`` by default, which means no timeout. + +Usage +~~~~~ + +The ``AsyncWeb3`` class may be used as a context manager, utilizing the ``async with`` +syntax, when connecting via ``persistent_connection()`` using the +``WebsocketProviderV2``. This will automatically close the connection when the context +manager exits. A similar example, using the ``websockets`` connection as an +asynchronous context manager, can be found in the `websockets connection`_ docs. + +.. code-block:: python >>> import asyncio >>> from web3 import AsyncWeb3 @@ -246,7 +263,7 @@ WebsocketProviderV2 (beta) ... logger.setLevel(logging.DEBUG) ... logger.addHandler(logging.StreamHandler()) - >>> async def ws_v2_subscription_example(): + >>> async def ws_v2_subscription_context_manager_example(): ... async with AsyncWeb3.persistent_websocket( ... WebsocketProviderV2(f"ws://127.0.0.1:8546") ... ) as w3: @@ -272,16 +289,36 @@ WebsocketProviderV2 (beta) ... # the connection closes automatically when exiting the context ... # manager (the `async with` block) - >>> asyncio.run(ws_v2_subscription_example()) + >>> asyncio.run(ws_v2_subscription_context_manager_example()) - Under the hood, the ``WebsocketProviderV2`` uses the python websockets library for - making requests. If you would like to modify how requests are made, you can - use the ``websocket_kwargs`` to do so. See the `websockets documentation`_ for - available arguments. +The ``AsyncWeb3`` class may also be used as an asynchronous iterator, utilizing the +``async for`` syntax, when connecting via ``persistent_connection()`` using the +``WebsocketProviderV2``. This may be used to set up an indefinite websocket connection +and reconnect automatically if the connection is lost. A similar example, using the +``websockets`` connection as an asynchronous iterator, can be found in the +`websockets connection`_ docs. + +.. _`websockets connection`: https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#websockets.client.connect + +.. code-block:: python + + >>> import asyncio + >>> from web3 import AsyncWeb3 + >>> from web3.providers import WebsocketProviderV2 + >>> import websockets + + >>> async def ws_v2_subscription_iterator_example(): + ... async for w3 in AsyncWeb3.persistent_websocket( + ... WebsocketProviderV2(f"ws://127.0.0.1:8546") + ... ): + ... try: + ... ... + ... except websockets.ConnectionClosed: + ... continue + + >>> asyncio.run(ws_v2_subscription_iterator_example()) - The timeout for each call to send or receive is controlled by a ``call_timeout`` - argument. This is set to ``None`` by default, which means no timeout. AutoProvider From ca8985f2b10d7e31dbf316163c50311c535a336d Mon Sep 17 00:00:00 2001 From: fselmo Date: Fri, 28 Jul 2023 16:18:32 -0600 Subject: [PATCH 7/8] Add relevant newsfragments for #3067 --- newsfragments/3067.bugfix.rst | 1 + newsfragments/3067.feature.rst | 1 + 2 files changed, 2 insertions(+) create mode 100644 newsfragments/3067.bugfix.rst create mode 100644 newsfragments/3067.feature.rst diff --git a/newsfragments/3067.bugfix.rst b/newsfragments/3067.bugfix.rst new file mode 100644 index 0000000000..dcb89c316d --- /dev/null +++ b/newsfragments/3067.bugfix.rst @@ -0,0 +1 @@ +Fix the type for the optional param asking for "full transactions" when subscribing to ``newPendingTransactions`` via ``eth_subscribe`` to ``bool``. diff --git a/newsfragments/3067.feature.rst b/newsfragments/3067.feature.rst new file mode 100644 index 0000000000..a0c2ae3066 --- /dev/null +++ b/newsfragments/3067.feature.rst @@ -0,0 +1 @@ +Asynchronous iterator support for ``AsyncWeb3`` with ``WebsocketProviderV2`` using ``async for`` syntax. From 9fdd4ec1ef3a0e711bf8e19ba41c72cea206018a Mon Sep 17 00:00:00 2001 From: fselmo Date: Fri, 28 Jul 2023 16:20:31 -0600 Subject: [PATCH 8/8] Internalize the max connection retries for now - Due to the exponential backoff, it is already pretty reasonable to have this number at 5 but more than this might add long wait times. If there is demand, we can open this up to an init arg in the future. --- web3/providers/websocket/websocket_v2.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/web3/providers/websocket/websocket_v2.py b/web3/providers/websocket/websocket_v2.py index 7f6451841d..213a761d25 100644 --- a/web3/providers/websocket/websocket_v2.py +++ b/web3/providers/websocket/websocket_v2.py @@ -52,20 +52,18 @@ def get_default_endpoint() -> URI: class WebsocketProviderV2(PersistentConnectionProvider): logger = logging.getLogger("web3.providers.WebsocketProviderV2") is_async: bool = True + _max_connection_retries: int = 5 def __init__( self, endpoint_uri: Optional[Union[URI, str]] = None, websocket_kwargs: Optional[Dict[str, Any]] = None, call_timeout: Optional[int] = None, - max_connection_retries: Optional[int] = 5, ) -> None: self.endpoint_uri = URI(endpoint_uri) if self.endpoint_uri is None: self.endpoint_uri = get_default_endpoint() - self.max_connection_retries = max_connection_retries - if not any( self.endpoint_uri.startswith(prefix) for prefix in VALID_WEBSOCKET_URI_PREFIXES @@ -112,19 +110,16 @@ async def connect(self) -> None: _backoff_rate_change = 1.75 _backoff_time = 1.75 - while True: + while _connection_attempts != self._max_connection_retries: try: _connection_attempts += 1 self.ws = await connect(self.endpoint_uri, **self.websocket_kwargs) break except WebSocketException as e: - if ( - self.max_connection_retries - and _connection_attempts > self.max_connection_retries - ): + if _connection_attempts == self._max_connection_retries: raise ProviderConnectionError( f"Could not connect to endpoint: {self.endpoint_uri}. " - f"Retries exceeded max of {self.max_connection_retries}." + f"Retries exceeded max of {self._max_connection_retries}." ) from e self.logger.info( f"Could not connect to endpoint: {self.endpoint_uri}. Retrying in "