Skip to content

Commit

Permalink
Merge branch 'main' into implement-async-ssl_assert_hostname
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>
  • Loading branch information
dblock authored Nov 14, 2024
2 parents 5996efc + 1269cdc commit 1bc2c8e
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 4 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## [Unreleased]
### Added
- Added `AsyncSearch#collapse` ([827](https://github.com/opensearch-project/opensearch-py/pull/827))
- Implement `ssl_assert_hostname` boolean parameter for `AsyncOpenSearch.__init__()` ([843](https://github.com/opensearch-project/opensearch-py/pull/843))
- Added `pool_maxsize` to `AsyncOpenSearch` ([845](https://github.com/opensearch-project/opensearch-py/pull/845))
- Added `ssl_assert_hostname` to `AsyncOpenSearch` ([843](https://github.com/opensearch-project/opensearch-py/pull/843))
### Changed
### Deprecated
### Removed
Expand Down
5 changes: 2 additions & 3 deletions opensearchpy/_async/helpers/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

import asyncio
import os
import time
from typing import Any
from unittest import SkipTest

Expand Down Expand Up @@ -37,7 +36,7 @@ async def get_test_client(nowait: bool = False, **kwargs: Any) -> Any:
await client.cluster.health(wait_for_status="yellow")
return client
except ConnectionError:
time.sleep(0.1)
await asyncio.sleep(0.1)
else:
# timeout
raise SkipTest("OpenSearch failed to start.")
5 changes: 5 additions & 0 deletions opensearchpy/_async/http_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def __init__(
url_prefix=url_prefix,
timeout=timeout,
use_ssl=use_ssl,
maxsize=maxsize,
headers=headers,
http_compress=http_compress,
opaque_id=opaque_id,
Expand Down Expand Up @@ -220,6 +221,10 @@ def __init__(
self.loop = loop
self.session = None

# Align with Sync Interface
if "pool_maxsize" in kwargs:
maxsize = kwargs.pop("pool_maxsize")

# Parameters for creating an aiohttp.ClientSession later.
self._limit = maxsize
self._http_auth = http_auth
Expand Down
4 changes: 4 additions & 0 deletions opensearchpy/_async/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(
serializers: Any = None,
default_mimetype: str = "application/json",
max_retries: int = 3,
pool_maxsize: Optional[int] = None,
retry_on_status: Any = (502, 503, 504),
retry_on_timeout: bool = False,
send_get_body_as: str = "GET",
Expand Down Expand Up @@ -102,6 +103,8 @@ def __init__(
:arg default_mimetype: when no mimetype is specified by the server
response assume this mimetype, defaults to `'application/json'`
:arg max_retries: maximum number of retries before an exception is propagated
:arg pool_maxsize: Maximum connection pool size used by pool-manager
For custom connection-pooling on current session
:arg retry_on_status: set of HTTP status codes on which we should retry
on a different node. defaults to ``(502, 503, 504)``
:arg retry_on_timeout: should timeout trigger a retry on different
Expand Down Expand Up @@ -134,6 +137,7 @@ def __init__(
serializers=serializers,
default_mimetype=default_mimetype,
max_retries=max_retries,
pool_maxsize=pool_maxsize,
retry_on_status=retry_on_status,
retry_on_timeout=retry_on_timeout,
send_get_body_as=send_get_body_as,
Expand Down
4 changes: 4 additions & 0 deletions opensearchpy/connection/http_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ def __init__(
self.loop = loop
self.session = None

# Align with Sync Interface
if "pool_maxsize" in kwargs:
maxsize = kwargs.pop("pool_maxsize")

# Parameters for creating an aiohttp.ClientSession later.
self._limit = maxsize
self._http_auth = http_auth
Expand Down
76 changes: 76 additions & 0 deletions test_opensearchpy/test_async/test_aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
import os
from typing import Type

import pytest
from pytest import MarkDecorator

from opensearchpy import (
AIOHttpConnection,
AsyncConnection,
AsyncHttpConnection,
AsyncOpenSearch,
)
from opensearchpy._async.helpers.test import get_test_client

pytestmark: MarkDecorator = pytest.mark.asyncio


class TestAIOHttp:

def test_default(self) -> None:
client = AsyncOpenSearch()
assert client.transport.connection_class == AIOHttpConnection
assert client.transport.pool_maxsize is None

def test_connection_class(self) -> None:
client = AsyncOpenSearch(connection_class=AsyncHttpConnection)
assert client.transport.connection_class == AsyncHttpConnection
assert client.transport.pool_maxsize is None

def test_pool_maxsize(self) -> None:
client = AsyncOpenSearch(connection_class=AsyncHttpConnection, pool_maxsize=42)
assert client.transport.connection_class == AsyncHttpConnection
assert client.transport.pool_maxsize == 42

@pytest.mark.parametrize( # type: ignore[misc]
"connection_class", [AIOHttpConnection, AsyncHttpConnection]
)
async def test_default_limit(self, connection_class: Type[AsyncConnection]) -> None:
client = await get_test_client(
connection_class=connection_class,
verify_certs=False,
http_auth=("admin", os.getenv("OPENSEARCH_PASSWORD", "admin")),
)
assert isinstance(
client.transport.connection_pool.connections[0], connection_class
)
assert (
client.transport.connection_pool.connections[0].session.connector.limit # type: ignore[attr-defined]
== 10
)

@pytest.mark.parametrize( # type: ignore[misc]
"connection_class", [AIOHttpConnection, AsyncHttpConnection]
)
async def test_custom_limit(self, connection_class: Type[AsyncConnection]) -> None:
client = await get_test_client(
connection_class=connection_class,
verify_certs=False,
pool_maxsize=42,
http_auth=("admin", os.getenv("OPENSEARCH_PASSWORD", "admin")),
)
assert isinstance(
client.transport.connection_pool.connections[0], connection_class
)
assert (
client.transport.connection_pool.connections[0].session.connector.limit # type: ignore[attr-defined]
== 42
)

0 comments on commit 1bc2c8e

Please sign in to comment.