Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with debouncing commands to (group)players #921

Merged
merged 3 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 16 additions & 28 deletions music_assistant/server/controllers/players.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@
_P = ParamSpec("_P")


def debounce(
def log_player_command(
func: Callable[Concatenate[_PlayerControllerT, _P], Awaitable[_R]]
) -> Callable[Concatenate[_PlayerControllerT, _P], Coroutine[Any, Any, _R | None]]:
"""Log and debounce commands to players."""
"""Check and log commands to players."""

@functools.wraps(func)
async def wrapper(self: _PlayerControllerT, *args: _P.args, **kwargs: _P.kwargs) -> _R | None:
"""Log and debounce commands to players."""
"""Log and log_player_command commands to players."""
player_id = kwargs["player_id"] if "player_id" in kwargs else args[0]
if (player := self._players.get(player_id)) is None or not player.available:
# player not existent
Expand All @@ -59,23 +59,12 @@ async def wrapper(self: _PlayerControllerT, *args: _P.args, **kwargs: _P.kwargs)
player_id,
)
return
debounce_key = f"{player_id}.func.__name__"
# cancel any existing command to this player
existing_timer = self._cmd_debounce.pop(debounce_key, None)
if existing_timer and not existing_timer.cancelled():
existing_timer.cancel()

self.logger.debug(
"Handling command %s for player %s",
func.__name__,
player.display_name,
)

def run():
self.mass.create_task(func(self, *args, **kwargs))

# debounce command with 250ms
self._cmd_debounce[debounce_key] = self.mass.loop.call_later(0.25, run)
await func(self, *args, **kwargs)

return wrapper

Expand All @@ -95,7 +84,6 @@ def __init__(self, *args, **kwargs) -> None:
"Music Assistant's core controller which manages all players from all providers."
)
self.manifest.icon = "speaker-multiple"
self._cmd_debounce: dict[str, asyncio.TimerHandle] = {}
self._poll_task: asyncio.Task | None = None

async def setup(self, config: CoreConfig) -> None: # noqa: ARG002
Expand Down Expand Up @@ -311,7 +299,7 @@ def get_player_provider(self, player_id: str) -> PlayerProvider:
# Player commands

@api_command("players/cmd/stop")
@debounce
@log_player_command
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player.

Expand All @@ -322,7 +310,7 @@ async def cmd_stop(self, player_id: str) -> None:
await player_provider.cmd_stop(player_id)

@api_command("players/cmd/play")
@debounce
@log_player_command
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.

Expand All @@ -333,7 +321,7 @@ async def cmd_play(self, player_id: str) -> None:
await player_provider.cmd_play(player_id)

@api_command("players/cmd/pause")
@debounce
@log_player_command
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.

Expand Down Expand Up @@ -365,7 +353,7 @@ async def _watch_pause(_player_id: str) -> None:
self.mass.create_task(_watch_pause(player_id))

@api_command("players/cmd/play_pause")
@debounce
@log_player_command
async def cmd_play_pause(self, player_id: str) -> None:
"""Toggle play/pause on given player.

Expand All @@ -378,7 +366,7 @@ async def cmd_play_pause(self, player_id: str) -> None:
await self.cmd_play(player_id)

@api_command("players/cmd/power")
@debounce
@log_player_command
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player.

Expand Down Expand Up @@ -448,7 +436,7 @@ async def cmd_power(self, player_id: str, powered: bool) -> None:
await self.mass.player_queues.resume(player_id)

@api_command("players/cmd/volume_set")
@debounce
@log_player_command
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.

Expand All @@ -473,7 +461,7 @@ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
await player_provider.cmd_volume_set(player_id, volume_level)

@api_command("players/cmd/volume_up")
@debounce
@log_player_command
async def cmd_volume_up(self, player_id: str) -> None:
"""Send VOLUME_UP command to given player.

Expand All @@ -483,7 +471,7 @@ async def cmd_volume_up(self, player_id: str) -> None:
await self.cmd_volume_set(player_id, new_volume)

@api_command("players/cmd/volume_down")
@debounce
@log_player_command
async def cmd_volume_down(self, player_id: str) -> None:
"""Send VOLUME_DOWN command to given player.

Expand All @@ -493,7 +481,7 @@ async def cmd_volume_down(self, player_id: str) -> None:
await self.cmd_volume_set(player_id, new_volume)

@api_command("players/cmd/group_volume")
@debounce
@log_player_command
async def cmd_group_volume(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given playergroup.

Expand All @@ -517,7 +505,7 @@ async def cmd_group_volume(self, player_id: str, volume_level: int) -> None:
await asyncio.gather(*coros)

@api_command("players/cmd/volume_mute")
@debounce
@log_player_command
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send VOLUME_MUTE command to given player.

Expand All @@ -544,7 +532,7 @@ async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
await player_provider.cmd_volume_mute(player_id, muted)

@api_command("players/cmd/sync")
@debounce
@log_player_command
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.

Expand Down Expand Up @@ -589,7 +577,7 @@ async def cmd_sync(self, player_id: str, target_player: str) -> None:
await player_provider.cmd_sync(player_id, target_player)

@api_command("players/cmd/unsync")
@debounce
@log_player_command
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player.

Expand Down
32 changes: 11 additions & 21 deletions music_assistant/server/providers/ugp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ async def cmd_play_url(
# due to many child players being powered on (or resynced) at the same time
# debounce the command a bit by only letting through the last one.
self.debounce_id = debounce_id = shortuuid.uuid()
await asyncio.sleep(200)
await asyncio.sleep(100)
if self.debounce_id != debounce_id:
return
# power ON
Expand Down Expand Up @@ -418,7 +418,7 @@ async def on_child_power(self, player_id: str, child_player: Player, new_power:
"""
Call when a power command was executed on one of the child players.

This is used to handle special actions such as muting as power or (re)syncing.
This is used to handle special actions such as mute-as-power or (re)syncing.
"""
group_player = self.mass.players.get(player_id)

Expand All @@ -435,44 +435,34 @@ async def on_child_power(self, player_id: str, child_player: Player, new_power:
self.logger.debug(
"Group %s has no more powered members, turning off group player", player_id
)
self.mass.create_task(self.cmd_power, player_id, False)
self.mass.create_task(self.cmd_power(player_id, False))
return False

group_playing = group_player.extra_data["optimistic_state"] == PlayerState.PLAYING
# if a child player turned ON while the group player is already playing
# we need to resync/resume
if (
group_player.powered
and new_power
and group_player.extra_data["optimistic_state"] == PlayerState.PLAYING
and child_player.state != PlayerState.PLAYING
):
if group_player.state == PlayerState.PLAYING and (
sync_leader := next(
(
x
for x in child_player.can_sync_with
if x in self.prev_sync_leaders[player_id]
),
None,
)
if new_power and group_playing:
if sync_leader := next(
(x for x in child_player.can_sync_with if x in self.prev_sync_leaders[player_id]),
None,
):
# prevent resume when player platform supports sync
# and one of its players is already playing
self.logger.debug(
"Groupplayer %s forced resync due to groupmember change", player_id
)
self.mass.create_task(
self.mass.players.cmd_sync, child_player.player_id, sync_leader
self.mass.players.cmd_sync(child_player.player_id, sync_leader)
)
else:
# send active source because the group may be within another group
self.logger.debug(
"Groupplayer %s forced resume due to groupmember change", player_id
)
self.mass.create_task(self.mass.player_queues.resume, group_player.active_source)
self.mass.create_task(self.mass.player_queues.resume(group_player.active_source))
elif (
not new_power
and group_player.extra_data["optimistic_state"] == PlayerState.PLAYING
and group_playing
and child_player.player_id in self.prev_sync_leaders[player_id]
and not child_player.mute_as_power
):
Expand Down