Skip to content

Commit

Permalink
feat: search timeout parameter (#335)
Browse files Browse the repository at this point in the history
* feat: add search timeout parameters

* feat: change default value for wishlist timeout to be backwards compatible

* feat: ignore typing for now
  • Loading branch information
JurgenR authored Oct 6, 2024
1 parent 2ce7033 commit fab5016
Show file tree
Hide file tree
Showing 13 changed files with 397 additions and 37 deletions.
26 changes: 15 additions & 11 deletions docs/source/SETTINGS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,21 @@ Interests
Search
------

+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+
| Parameter | Type | Description | Default |
+===============================+===============+=============================================================================================+=========+
| searches.receive.max_results | integer | Maximum amount of search results returned when replying to search requests from other peers | 100 |
+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+
| searches.receive.store_amount | integer | Amount of received searches to store in the client | 500 |
+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+
| searches.send.store_results | boolean | Whether to store search results internally | true |
+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+
| searches.wishlist | array[object] | List of wishlist items. Object definition is defined below | <empty> |
+-------------------------------+---------------+---------------------------------------------------------------------------------------------+---------+
+----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
| Parameter | Type | Description | Default |
+========================================+===============+=================================================================================================================================================================+=========+
| searches.receive.max_results | integer | Maximum amount of search results returned when replying to search requests from other peers | 100 |
+----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
| searches.receive.store_amount | integer | Amount of received searches to store in the client | 500 |
+----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
| searches.send.store_results | boolean | Whether to store search results internally | true |
+----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
| searches.send.request_timeout | integer | Timeout for sent search requests, when the timeout is reached the request will be removed and search results will no longer be accepted (0 = keep indefinitely) | 0 |
+----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
| searches.send.wishlist_request_timeout | integer | Timeout for sent wishlist requests (0 = keep indefinitely, -1 = use the interval advertised by the server) | -1 |
+----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
| searches.wishlist | array[object] | List of wishlist items. Object definition is defined below | <empty> |
+----------------------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+


Following object should be used for ``searches.wishlist`` object:
Expand Down
38 changes: 37 additions & 1 deletion docs/source/USAGE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,33 @@ start a search request for each of the types:
user_request: SearchRequest = await client.searches.search_user('other_user', 'my user query')
Search requests are stored internally and should be removed when no longer needed:
Wishlist Searches
~~~~~~~~~~~~~~~~~

Wishlist searches are periodic searches made by the client to the server. The interval is determined
by the server. To add a wishlist search simply add an entry to the settings, it will be picked up
at the next interval:

.. code-block:: python
from aioslsk.settings import Settings, WishlistSettingEntry
settings: Settings = Settings(...)
settings.searches.wishlist.append(
WishlistSettingEntry(query='test', enabled=True)
)
The :class:`SearchRequestSentEvent` will be emitted when a wishlist search is made. Keep in mind
however that this event is emitted also when making other types of search requests. Look at the type
of the request made to determine whether it is a wishlist search or not.


Manually Removing Requests
~~~~~~~~~~~~~~~~~~~~~~~~~~

Search requests are stored internally but a timeout can be configured to automatically remove them.
Following example shows how to manually remove a search request:

.. code-block:: python
Expand All @@ -155,6 +181,16 @@ Search requests are stored internally and should be removed when no longer neede
After removal there will be no more :class:`SearchResultEvent` events emitted for the removed
request

Automatically Removing Requests
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

A timeout can be configured through two settings:

* ``searches.sent.request_timeout``
* ``searches.sent.wishlist_request_timeout``

When a request gets removed an event will be emitted: :class:`SearchRequestRemovedEvent`


Receiving Results
-----------------
Expand Down
1 change: 1 addition & 0 deletions src/aioslsk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(
shares_cache: Optional[SharesCache] = None, transfer_cache: Optional[TransferCache] = None,
executor_factory: Optional[ExecutorFactory] = None,
event_bus: Optional[EventBus] = None):

self.settings: Settings = settings

self._stop_event: Optional[asyncio.Event] = None
Expand Down
1 change: 1 addition & 0 deletions src/aioslsk/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
DEFAULT_LISTENING_HOST: str = '0.0.0.0'
DEFAULT_PARENT_MIN_SPEED: int = 1
DEFAULT_PARENT_SPEED_RATIO: int = 50
DEFAULT_WISHLIST_INTERVAL: int = 600
DEFAULT_READ_TIMEOUT: float = 60
PEER_CONNECT_TIMEOUT: float = 10
"""Direct connection timeout"""
Expand Down
12 changes: 12 additions & 0 deletions src/aioslsk/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,18 @@ class PublicMessageEvent(Event):
raw_message: PublicChatMessage.Response


@dataclass(frozen=True)
class SearchRequestSentEvent(Event):
"""Emitted when a search request has been sent out"""
query: SearchRequest


@dataclass(frozen=True)
class SearchRequestRemovedEvent(Event):
"""Emitted when a search request has been removed"""
query: SearchRequest


@dataclass(frozen=True)
class SearchResultEvent(Event):
"""Emitted when a search result has been received"""
Expand Down
84 changes: 64 additions & 20 deletions src/aioslsk/search/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
from typing import Deque, Dict, List, Optional, Union

from ..base_manager import BaseManager
from ..constants import DEFAULT_WISHLIST_INTERVAL
from ..events import (
on_message,
build_message_map,
EventBus,
ConnectionStateChangedEvent,
EventBus,
MessageReceivedEvent,
SearchRequestReceivedEvent,
SearchRequestRemovedEvent,
SearchRequestSentEvent,
SearchResultEvent,
SessionDestroyedEvent,
SessionInitializedEvent,
Expand Down Expand Up @@ -41,7 +44,7 @@
from ..shares.manager import SharesManager
from ..shares.utils import convert_items_to_file_data
from ..session import Session
from ..tasks import BackgroundTask
from ..tasks import BackgroundTask, Timer
from ..transfer.interface import UploadInfoProvider
from ..utils import task_counter, ticket_generator
from .model import ReceivedSearch, SearchResult, SearchRequest, SearchType
Expand Down Expand Up @@ -82,7 +85,7 @@ def __init__(

self._search_reply_tasks: List[asyncio.Task] = []
self._wishlist_task: BackgroundTask = BackgroundTask(
interval=600,
interval=DEFAULT_WISHLIST_INTERVAL,
task_coro=self._wishlist_job,
name='wishlist-task'
)
Expand Down Expand Up @@ -120,12 +123,14 @@ async def search(self, query: str) -> SearchRequest:
await self._network.send_server_messages(
FileSearch.Request(ticket, query)
)
self.requests[ticket] = SearchRequest(
request = SearchRequest(
ticket=ticket,
query=query,
search_type=SearchType.NETWORK
)
return self.requests[ticket]
await self._attach_request_timer_and_emit(request)

return request

async def search_room(self, room: Union[str, Room], query: str) -> SearchRequest:
"""Performs a search request on the specific user. The results generated
Expand All @@ -142,13 +147,15 @@ async def search_room(self, room: Union[str, Room], query: str) -> SearchRequest
await self._network.send_server_messages(
RoomSearch.Request(room_name, ticket, query)
)
self.requests[ticket] = SearchRequest(
request = SearchRequest(
ticket=ticket,
query=query,
search_type=SearchType.ROOM,
room=room_name
)
return self.requests[ticket]
await self._attach_request_timer_and_emit(request)

return request

async def search_user(self, username: str, query: str) -> SearchRequest:
"""Performs a search request on the specific user. The results generated
Expand All @@ -164,13 +171,15 @@ async def search_user(self, username: str, query: str) -> SearchRequest:
await self._network.send_server_messages(
UserSearch.Request(username, ticket, query)
)
self.requests[ticket] = SearchRequest(
request = SearchRequest(
ticket=ticket,
query=query,
search_type=SearchType.USER,
username=username
)
return self.requests[ticket]
await self._attach_request_timer_and_emit(request)

return request

async def _query_shares_and_reply(self, ticket: int, username: str, query: str):
"""Performs a query on the shares manager and reports the results to the
Expand Down Expand Up @@ -257,28 +266,63 @@ def _search_reply_task_callback(self, ticket: int, username: str, query: str, ta
async def _wishlist_job(self):
"""Job handling wishlist queries, this method is intended to be run as
a task. This method will run at the given ``interval`` (returned by the
server on start up).
server after logon).
"""
items = self._settings.searches.wishlist

# Remove all current wishlist searches
self.requests = {
ticket: qry for ticket, qry in self.requests.items()
if qry.search_type != SearchType.WISHLIST
}
timeout = self._get_wishlist_request_timeout()

logger.info("starting wishlist search of %d items", len(items))
enabled_items = list(filter(lambda item: item.enabled, items))
logger.info("starting wishlist search of %d items", len(enabled_items))
# Recreate
for item in filter(lambda item: item.enabled, items):
for item in enabled_items:
ticket = next(self._ticket_generator)
self.requests[ticket] = SearchRequest(

await self._network.send_server_messages(
WishlistSearch.Request(ticket, item.query)
)

request = SearchRequest(
ticket,
item.query,
search_type=SearchType.WISHLIST
)
self._network.queue_server_messages(
WishlistSearch.Request(ticket, item.query)
request.timer = Timer(
timeout=timeout,
callback=partial(self._timeout_search_request, request)
) if timeout else None
self.requests[ticket] = request

if request.timer:
request.timer.start()

await self._event_bus.emit(SearchRequestSentEvent(request))

def _get_wishlist_request_timeout(self) -> int:
timeout = self._settings.searches.send.wishlist_request_timeout
if self._settings.searches.send.wishlist_request_timeout < 0:
if self.wishlist_interval is None:
timeout = DEFAULT_WISHLIST_INTERVAL
else:
timeout = self.wishlist_interval

return timeout

async def _attach_request_timer_and_emit(self, request: SearchRequest):
self.requests[request.ticket] = request

if self._settings.searches.send.request_timeout > 0:
request.timer = Timer(
timeout=self._settings.searches.send.request_timeout,
callback=partial(self._timeout_search_request, request)
)
request.timer.start()

await self._event_bus.emit(SearchRequestSentEvent(request))

async def _timeout_search_request(self, request: SearchRequest):
del self.requests[request.ticket]
await self._event_bus.emit(SearchRequestRemovedEvent(request))

async def _on_message_received(self, event: MessageReceivedEvent):
message = event.message
Expand Down
3 changes: 3 additions & 0 deletions src/aioslsk/search/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ..protocol.primitives import FileData
from ..shares.utils import create_term_pattern
from ..tasks import Timer


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,3 +104,5 @@ class SearchRequest:
username: Optional[str] = None
results: List[SearchResult] = field(default_factory=list)
started: datetime.datetime = field(default_factory=datetime.datetime.now)

timer: Optional[Timer] = None
2 changes: 2 additions & 0 deletions src/aioslsk/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def are_configured(self) -> bool:

class SearchSendSettings(BaseModel, validate_assignment=True):
store_results: bool = True
request_timeout: int = 0
wishlist_request_timeout: int = -1


class SearchReceiveSettings(BaseModel, validate_assignment=True):
Expand Down
34 changes: 34 additions & 0 deletions src/aioslsk/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,37 @@ async def runner(self):
next_ival = self.resolve_interval() if job_ival is None else job_ival

await asyncio.sleep(next_ival)


class Timer:

def __init__(self, timeout: float, callback: TaskCoroutine):
self.timeout: float = timeout
self.callback: TaskCoroutine = callback
self._task: Optional[asyncio.Task] = None

def start(self):
self._task = asyncio.create_task(self.runner())
self._task.add_done_callback(self._unset_task)

def cancel(self) -> Optional[asyncio.Task]:
if self._task is None:
return None

task = self._task
self._task.cancel()
self._task = None
return task

async def runner(self):
await asyncio.sleep(self.timeout)
await self.callback() # type: ignore[call-arg]

def reschedule(self, timeout: Optional[float] = None):
self.timeout = self.timeout if timeout is None else timeout

self.cancel()
self.start()

def _unset_task(self, task: asyncio.Future):
self._task = None
9 changes: 7 additions & 2 deletions tests/e2e/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
from pathlib import Path
import pytest_asyncio
from pytest import FixtureRequest
import shutil
from typing import AsyncGenerator, List

Expand Down Expand Up @@ -86,8 +87,12 @@ def set_event(session_init_event):


@pytest_asyncio.fixture
async def mock_server() -> AsyncGenerator[MockServer, None]:
server = MockServer(hostname=DEFAULT_SERVER_HOSTNAME, ports={DEFAULT_SERVER_PORT})
async def mock_server(request: FixtureRequest) -> AsyncGenerator[MockServer, None]:
server = MockServer(
hostname=DEFAULT_SERVER_HOSTNAME,
ports={DEFAULT_SERVER_PORT},
settings=getattr(request, 'param', None)
)
await server.connect(start_serving=False)
await asyncio.gather(
*[
Expand Down
Loading

0 comments on commit fab5016

Please sign in to comment.