diff --git a/docs/source/SETTINGS.rst b/docs/source/SETTINGS.rst index 14145be..ebb3f1a 100644 --- a/docs/source/SETTINGS.rst +++ b/docs/source/SETTINGS.rst @@ -156,17 +156,21 @@ Interests Search ------ -+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+ -| Parameter | Type | Description | Default | -+===============================+===============+=============================================================================================+=========+ -| searches.receive.max_results | integer | Maximum amount of search results returned when replying to search requests from other peers | 100 | -+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+ -| searches.receive.store_amount | integer | Amount of received searches to store in the client | 500 | -+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+ -| searches.send.store_results | boolean | Whether to store search results internally | true | -+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+ -| searches.wishlist | array[object] | List of wishlist items. Object definition is defined below | | -+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+ ++----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ +| Parameter | Type | Description | Default | ++========================================+===============+=================================================================================================================================================================+=========+ +| searches.receive.max_results | integer | Maximum amount of search results returned when replying to search requests from other peers | 100 | ++----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ +| searches.receive.store_amount | integer | Amount of received searches to store in the client | 500 | ++----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ +| searches.send.store_results | boolean | Whether to store search results internally | true | ++----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ +| searches.send.request_timeout | integer | Timeout for sent search requests, when the timeout is reached the request will be removed and search results will no longer be accepted (0 = keep indefinitely) | 0 | ++----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ +| searches.send.wishlist_request_timeout | integer | Timeout for sent wishlist requests (0 = keep indefinitely, -1 = use the interval advertised by the server) | -1 | ++----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ +| searches.wishlist | array[object] | List of wishlist items. Object definition is defined below | | ++----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+ Following object should be used for ``searches.wishlist`` object: diff --git a/docs/source/USAGE.rst b/docs/source/USAGE.rst index ef1067d..e5537be 100644 --- a/docs/source/USAGE.rst +++ b/docs/source/USAGE.rst @@ -141,7 +141,33 @@ start a search request for each of the types: user_request: SearchRequest = await client.searches.search_user('other_user', 'my user query') -Search requests are stored internally and should be removed when no longer needed: +Wishlist Searches +~~~~~~~~~~~~~~~~~ + +Wishlist searches are periodic searches made by the client to the server. The interval is determined +by the server. To add a wishlist search simply add an entry to the settings, it will be picked up +at the next interval: + +.. code-block:: python + + from aioslsk.settings import Settings, WishlistSettingEntry + + settings: Settings = Settings(...) + settings.searches.wishlist.append( + WishlistSettingEntry(query='test', enabled=True) + ) + + +The :class:`SearchRequestSentEvent` will be emitted when a wishlist search is made. Keep in mind +however that this event is emitted also when making other types of search requests. Look at the type +of the request made to determine whether it is a wishlist search or not. + + +Manually Removing Requests +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Search requests are stored internally but a timeout can be configured to automatically remove them. +Following example shows how to manually remove a search request: .. code-block:: python @@ -155,6 +181,16 @@ Search requests are stored internally and should be removed when no longer neede After removal there will be no more :class:`SearchResultEvent` events emitted for the removed request +Automatically Removing Requests +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A timeout can be configured through two settings: + +* ``searches.sent.request_timeout`` +* ``searches.sent.wishlist_request_timeout`` + +When a request gets removed an event will be emitted: :class:`SearchRequestRemovedEvent` + Receiving Results ----------------- diff --git a/src/aioslsk/client.py b/src/aioslsk/client.py index d71ffcd..0839ffa 100644 --- a/src/aioslsk/client.py +++ b/src/aioslsk/client.py @@ -55,6 +55,7 @@ def __init__( shares_cache: Optional[SharesCache] = None, transfer_cache: Optional[TransferCache] = None, executor_factory: Optional[ExecutorFactory] = None, event_bus: Optional[EventBus] = None): + self.settings: Settings = settings self._stop_event: Optional[asyncio.Event] = None diff --git a/src/aioslsk/constants.py b/src/aioslsk/constants.py index c83899d..9073610 100644 --- a/src/aioslsk/constants.py +++ b/src/aioslsk/constants.py @@ -5,6 +5,7 @@ DEFAULT_LISTENING_HOST: str = '0.0.0.0' DEFAULT_PARENT_MIN_SPEED: int = 1 DEFAULT_PARENT_SPEED_RATIO: int = 50 +DEFAULT_WISHLIST_INTERVAL: int = 600 DEFAULT_READ_TIMEOUT: float = 60 PEER_CONNECT_TIMEOUT: float = 10 """Direct connection timeout""" diff --git a/src/aioslsk/events.py b/src/aioslsk/events.py index 98de39b..6dd7587 100644 --- a/src/aioslsk/events.py +++ b/src/aioslsk/events.py @@ -381,6 +381,18 @@ class PublicMessageEvent(Event): raw_message: PublicChatMessage.Response +@dataclass(frozen=True) +class SearchRequestSentEvent(Event): + """Emitted when a search request has been sent out""" + query: SearchRequest + + +@dataclass(frozen=True) +class SearchRequestRemovedEvent(Event): + """Emitted when a search request has been removed""" + query: SearchRequest + + @dataclass(frozen=True) class SearchResultEvent(Event): """Emitted when a search result has been received""" diff --git a/src/aioslsk/search/manager.py b/src/aioslsk/search/manager.py index 1e24b6a..11563ac 100644 --- a/src/aioslsk/search/manager.py +++ b/src/aioslsk/search/manager.py @@ -5,13 +5,16 @@ from typing import Deque, Dict, List, Optional, Union from ..base_manager import BaseManager +from ..constants import DEFAULT_WISHLIST_INTERVAL from ..events import ( on_message, build_message_map, - EventBus, ConnectionStateChangedEvent, + EventBus, MessageReceivedEvent, SearchRequestReceivedEvent, + SearchRequestRemovedEvent, + SearchRequestSentEvent, SearchResultEvent, SessionDestroyedEvent, SessionInitializedEvent, @@ -41,7 +44,7 @@ from ..shares.manager import SharesManager from ..shares.utils import convert_items_to_file_data from ..session import Session -from ..tasks import BackgroundTask +from ..tasks import BackgroundTask, Timer from ..transfer.interface import UploadInfoProvider from ..utils import task_counter, ticket_generator from .model import ReceivedSearch, SearchResult, SearchRequest, SearchType @@ -82,7 +85,7 @@ def __init__( self._search_reply_tasks: List[asyncio.Task] = [] self._wishlist_task: BackgroundTask = BackgroundTask( - interval=600, + interval=DEFAULT_WISHLIST_INTERVAL, task_coro=self._wishlist_job, name='wishlist-task' ) @@ -120,12 +123,14 @@ async def search(self, query: str) -> SearchRequest: await self._network.send_server_messages( FileSearch.Request(ticket, query) ) - self.requests[ticket] = SearchRequest( + request = SearchRequest( ticket=ticket, query=query, search_type=SearchType.NETWORK ) - return self.requests[ticket] + await self._attach_request_timer_and_emit(request) + + return request async def search_room(self, room: Union[str, Room], query: str) -> SearchRequest: """Performs a search request on the specific user. The results generated @@ -142,13 +147,15 @@ async def search_room(self, room: Union[str, Room], query: str) -> SearchRequest await self._network.send_server_messages( RoomSearch.Request(room_name, ticket, query) ) - self.requests[ticket] = SearchRequest( + request = SearchRequest( ticket=ticket, query=query, search_type=SearchType.ROOM, room=room_name ) - return self.requests[ticket] + await self._attach_request_timer_and_emit(request) + + return request async def search_user(self, username: str, query: str) -> SearchRequest: """Performs a search request on the specific user. The results generated @@ -164,13 +171,15 @@ async def search_user(self, username: str, query: str) -> SearchRequest: await self._network.send_server_messages( UserSearch.Request(username, ticket, query) ) - self.requests[ticket] = SearchRequest( + request = SearchRequest( ticket=ticket, query=query, search_type=SearchType.USER, username=username ) - return self.requests[ticket] + await self._attach_request_timer_and_emit(request) + + return request async def _query_shares_and_reply(self, ticket: int, username: str, query: str): """Performs a query on the shares manager and reports the results to the @@ -257,28 +266,63 @@ def _search_reply_task_callback(self, ticket: int, username: str, query: str, ta async def _wishlist_job(self): """Job handling wishlist queries, this method is intended to be run as a task. This method will run at the given ``interval`` (returned by the - server on start up). + server after logon). """ items = self._settings.searches.wishlist - # Remove all current wishlist searches - self.requests = { - ticket: qry for ticket, qry in self.requests.items() - if qry.search_type != SearchType.WISHLIST - } + timeout = self._get_wishlist_request_timeout() - logger.info("starting wishlist search of %d items", len(items)) + enabled_items = list(filter(lambda item: item.enabled, items)) + logger.info("starting wishlist search of %d items", len(enabled_items)) # Recreate - for item in filter(lambda item: item.enabled, items): + for item in enabled_items: ticket = next(self._ticket_generator) - self.requests[ticket] = SearchRequest( + + await self._network.send_server_messages( + WishlistSearch.Request(ticket, item.query) + ) + + request = SearchRequest( ticket, item.query, search_type=SearchType.WISHLIST ) - self._network.queue_server_messages( - WishlistSearch.Request(ticket, item.query) + request.timer = Timer( + timeout=timeout, + callback=partial(self._timeout_search_request, request) + ) if timeout else None + self.requests[ticket] = request + + if request.timer: + request.timer.start() + + await self._event_bus.emit(SearchRequestSentEvent(request)) + + def _get_wishlist_request_timeout(self) -> int: + timeout = self._settings.searches.send.wishlist_request_timeout + if self._settings.searches.send.wishlist_request_timeout < 0: + if self.wishlist_interval is None: + timeout = DEFAULT_WISHLIST_INTERVAL + else: + timeout = self.wishlist_interval + + return timeout + + async def _attach_request_timer_and_emit(self, request: SearchRequest): + self.requests[request.ticket] = request + + if self._settings.searches.send.request_timeout > 0: + request.timer = Timer( + timeout=self._settings.searches.send.request_timeout, + callback=partial(self._timeout_search_request, request) ) + request.timer.start() + + await self._event_bus.emit(SearchRequestSentEvent(request)) + + async def _timeout_search_request(self, request: SearchRequest): + del self.requests[request.ticket] + await self._event_bus.emit(SearchRequestRemovedEvent(request)) async def _on_message_received(self, event: MessageReceivedEvent): message = event.message diff --git a/src/aioslsk/search/model.py b/src/aioslsk/search/model.py index bdc277d..377ed42 100644 --- a/src/aioslsk/search/model.py +++ b/src/aioslsk/search/model.py @@ -7,6 +7,7 @@ from ..protocol.primitives import FileData from ..shares.utils import create_term_pattern +from ..tasks import Timer logger = logging.getLogger(__name__) @@ -103,3 +104,5 @@ class SearchRequest: username: Optional[str] = None results: List[SearchResult] = field(default_factory=list) started: datetime.datetime = field(default_factory=datetime.datetime.now) + + timer: Optional[Timer] = None diff --git a/src/aioslsk/settings.py b/src/aioslsk/settings.py index cf9ec18..d615d16 100644 --- a/src/aioslsk/settings.py +++ b/src/aioslsk/settings.py @@ -82,6 +82,8 @@ def are_configured(self) -> bool: class SearchSendSettings(BaseModel, validate_assignment=True): store_results: bool = True + request_timeout: int = 0 + wishlist_request_timeout: int = -1 class SearchReceiveSettings(BaseModel, validate_assignment=True): diff --git a/src/aioslsk/tasks.py b/src/aioslsk/tasks.py index 80d15fd..6e1c034 100644 --- a/src/aioslsk/tasks.py +++ b/src/aioslsk/tasks.py @@ -72,3 +72,37 @@ async def runner(self): next_ival = self.resolve_interval() if job_ival is None else job_ival await asyncio.sleep(next_ival) + + +class Timer: + + def __init__(self, timeout: float, callback: TaskCoroutine): + self.timeout: float = timeout + self.callback: TaskCoroutine = callback + self._task: Optional[asyncio.Task] = None + + def start(self): + self._task = asyncio.create_task(self.runner()) + self._task.add_done_callback(self._unset_task) + + def cancel(self) -> Optional[asyncio.Task]: + if self._task is None: + return None + + task = self._task + self._task.cancel() + self._task = None + return task + + async def runner(self): + await asyncio.sleep(self.timeout) + await self.callback() # type: ignore[call-arg] + + def reschedule(self, timeout: Optional[float] = None): + self.timeout = self.timeout if timeout is None else timeout + + self.cancel() + self.start() + + def _unset_task(self, task: asyncio.Future): + self._task = None diff --git a/tests/e2e/fixtures.py b/tests/e2e/fixtures.py index 7919a13..6077a79 100644 --- a/tests/e2e/fixtures.py +++ b/tests/e2e/fixtures.py @@ -3,6 +3,7 @@ import os from pathlib import Path import pytest_asyncio +from pytest import FixtureRequest import shutil from typing import AsyncGenerator, List @@ -86,8 +87,12 @@ def set_event(session_init_event): @pytest_asyncio.fixture -async def mock_server() -> AsyncGenerator[MockServer, None]: - server = MockServer(hostname=DEFAULT_SERVER_HOSTNAME, ports={DEFAULT_SERVER_PORT}) +async def mock_server(request: FixtureRequest) -> AsyncGenerator[MockServer, None]: + server = MockServer( + hostname=DEFAULT_SERVER_HOSTNAME, + ports={DEFAULT_SERVER_PORT}, + settings=getattr(request, 'param', None) + ) await server.connect(start_serving=False) await asyncio.gather( *[ diff --git a/tests/e2e/test_e2e_search.py b/tests/e2e/test_e2e_search.py new file mode 100644 index 0000000..c130793 --- /dev/null +++ b/tests/e2e/test_e2e_search.py @@ -0,0 +1,167 @@ +from aioslsk.client import SoulSeekClient +from aioslsk.events import ( + SearchRequestSentEvent, + SearchRequestRemovedEvent, +) +from aioslsk.search.model import SearchRequest, SearchType +from aioslsk.settings import WishlistSettingEntry +from .mock.server import MockServer +from .mock.model import Settings +from .fixtures import mock_server, client_1 +from .utils import ( + wait_until_clients_initialized, + wait_for_listener_awaited, + wait_for_listener_awaited_events, +) +import pytest +from typing import Tuple +from unittest.mock import AsyncMock + + +WISHLIST_INTERVAL = 2 +SERVER_SETTINGS = Settings(wishlist_interval=WISHLIST_INTERVAL) + + +class TestE2ESearch: + + @pytest.mark.asyncio + async def test_search_without_timeout( + self, mock_server: MockServer, client_1: SoulSeekClient): + + await wait_until_clients_initialized(mock_server, amount=1) + + listener = AsyncMock() + client_1.events.register(SearchRequestSentEvent, listener) + + client_1.settings.searches.send.request_timeout = 0 + + query = 'test' + request = await client_1.searches.search(query) + assert request.query == query + assert request.search_type == SearchType.NETWORK + assert request.timer is None + assert request in client_1.searches.requests.values() + + event: SearchRequestSentEvent = await wait_for_listener_awaited(listener) + + assert event.query == request + + @pytest.mark.asyncio + @pytest.mark.parametrize( + 'search_func_name,search_params,search_type', + [ + ('search', ('test', ), SearchType.NETWORK), + ('search_room', ('test', 'room0', ), SearchType.ROOM), + ('search_user', ('test', 'user0', ), SearchType.USER) + ] + ) + async def test_search_with_timeout( + self, mock_server: MockServer, client_1: SoulSeekClient, + search_func_name: str, search_params: Tuple, search_type: SearchType): + + await wait_until_clients_initialized(mock_server, amount=1) + + sent_listener = AsyncMock() + removed_listener = AsyncMock() + client_1.events.register(SearchRequestSentEvent, sent_listener) + client_1.events.register(SearchRequestRemovedEvent, removed_listener) + + client_1.settings.searches.send.request_timeout = 2 + + search_func = getattr(client_1.searches, search_func_name) + request: SearchRequest = await search_func(*search_params) + assert request in client_1.searches.requests.values() + assert request.search_type == search_type + assert request.timer is not None + assert request.timer.timeout == 2 + + sent_event: SearchRequestSentEvent = await wait_for_listener_awaited(sent_listener) + + assert sent_event.query == request + + removed_event: SearchRequestRemovedEvent = await wait_for_listener_awaited(removed_listener, timeout=3) + + assert removed_event.query == request + assert request not in client_1.searches.requests.values() + + @pytest.mark.asyncio + @pytest.mark.parametrize('mock_server', [SERVER_SETTINGS], indirect=True) + async def test_search_wishlist_without_timeout( + self, mock_server: MockServer, client_1: SoulSeekClient): + + await wait_until_clients_initialized(mock_server, amount=1) + + client_1.settings.searches.send.wishlist_request_timeout = 0 + client_1.settings.searches.wishlist.append( + WishlistSettingEntry(query='test', enabled=True) + ) + + sent_listener = AsyncMock() + client_1.events.register(SearchRequestSentEvent, sent_listener) + + event: SearchRequestSentEvent = await wait_for_listener_awaited(sent_listener) + request = event.query + + assert request.query == 'test' + assert request.search_type == SearchType.WISHLIST + assert request.timer is None + assert request in client_1.searches.requests.values() + + + @pytest.mark.asyncio + @pytest.mark.parametrize('mock_server', [SERVER_SETTINGS], indirect=True) + async def test_search_wishlist_with_server_timeout( + self, mock_server: MockServer, client_1: SoulSeekClient): + + await wait_until_clients_initialized(mock_server, amount=1) + + client_1.settings.searches.send.wishlist_request_timeout = -1 + client_1.settings.searches.wishlist.append( + WishlistSettingEntry(query='test', enabled=True) + ) + + sent_listener = AsyncMock() + removed_listener = AsyncMock() + client_1.events.register(SearchRequestSentEvent, sent_listener) + client_1.events.register(SearchRequestRemovedEvent, removed_listener) + + sent_event: SearchRequestSentEvent = await wait_for_listener_awaited(sent_listener) + request = sent_event.query + + assert request.query == 'test' + assert request.search_type == SearchType.WISHLIST + assert request.timer is not None + assert request.timer.timeout == WISHLIST_INTERVAL + assert request in client_1.searches.requests.values() + + removed_event: SearchRequestRemovedEvent = await wait_for_listener_awaited(sent_listener) + assert removed_event.query == request + + @pytest.mark.asyncio + @pytest.mark.parametrize('mock_server', [SERVER_SETTINGS], indirect=True) + async def test_search_wishlist_with_custom_timeout( + self, mock_server: MockServer, client_1: SoulSeekClient): + + await wait_until_clients_initialized(mock_server, amount=1) + + client_1.settings.searches.send.wishlist_request_timeout = 3 + client_1.settings.searches.wishlist.append( + WishlistSettingEntry(query='test', enabled=True) + ) + + sent_listener = AsyncMock() + removed_listener = AsyncMock() + client_1.events.register(SearchRequestSentEvent, sent_listener) + client_1.events.register(SearchRequestRemovedEvent, removed_listener) + + sent_event: SearchRequestSentEvent = await wait_for_listener_awaited(sent_listener) + request = sent_event.query + + assert request.query == 'test' + assert request.search_type == SearchType.WISHLIST + assert request.timer is not None + assert request.timer.timeout == 3 + assert request in client_1.searches.requests.values() + + removed_event: SearchRequestRemovedEvent = await wait_for_listener_awaited(sent_listener) + assert removed_event.query == request diff --git a/tests/e2e/test_e2e_server.py b/tests/e2e/test_e2e_server.py index 7edc5e6..d5dc339 100644 --- a/tests/e2e/test_e2e_server.py +++ b/tests/e2e/test_e2e_server.py @@ -50,7 +50,6 @@ import asyncio import pytest from pytest_unordered import unordered -from typing import Tuple from unittest.mock import AsyncMock @@ -380,7 +379,7 @@ async def test_private_room_grant_membership(self, mock_server: MockServer, clie GrantRoomMembershipCommand(room_name, username2), response=True) event1: RoomMembershipGrantedEvent = await wait_for_listener_awaited(member_granted_listener1) - events2: Tuple[RoomMembershipGrantedEvent] = await wait_for_listener_awaited_events( + events2: tuple[RoomMembershipGrantedEvent] = await wait_for_listener_awaited_events( member_granted_listener2, amount=2 ) diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py index 8959896..4baabd7 100644 --- a/tests/unit/test_tasks.py +++ b/tests/unit/test_tasks.py @@ -1,7 +1,7 @@ import asyncio import pytest from unittest.mock import ANY, AsyncMock, Mock, patch -from aioslsk.tasks import BackgroundTask +from aioslsk.tasks import BackgroundTask, Timer async def dummy_coro(): @@ -17,6 +17,7 @@ def test_start_shouldCreateTask(self): task.start() assert task._task == async_task + assert task.is_running() is True create_task.assert_called_once_with(ANY, name='background-task') def test_start_alreadyStarted_shouldDoNothing(self): @@ -114,3 +115,54 @@ async def test_runner_preempt_returnInterval(self): task_coro.assert_awaited() sleep.assert_any_await(interval) sleep.assert_any_await(interval_after_preempt) + + +class TestTimer: + + def test_start_shouldCreateTask(self): + timer = Timer(1.0, dummy_coro) + async_task = Mock() + with patch('asyncio.create_task', return_value=async_task) as create_task: + timer.start() + + assert timer._task == async_task + create_task.assert_called_once_with(ANY) + + @pytest.mark.asyncio + async def test_cancel_shouldCancelTask(self): + callback = AsyncMock() + timer = Timer(1.0, callback) + timer.start() + await asyncio.sleep(0.5) + + timer.cancel() + assert timer._task is None + callback.assert_not_awaited() + + @pytest.mark.asyncio + async def test_cancel_notStarted_returnsNone(self): + timer = Timer(1.0, dummy_coro) + result = timer.cancel() + assert result is None + + @pytest.mark.asyncio + async def test_reschedule(self): + callback = AsyncMock() + timer = Timer(1.0, callback) + timer.start() + await asyncio.sleep(0.5) + + timer.reschedule(timeout=2.0) + await asyncio.sleep(1.25) + callback.assert_not_awaited() + await asyncio.sleep(1.0) + callback.assert_awaited_once() + + @pytest.mark.asyncio + async def test_reschedule_notStarted_startsTask(self): + callback = AsyncMock() + timer = Timer(0.5, callback) + + timer.reschedule() + await asyncio.sleep(0.75) + callback.assert_awaited_once()