diff --git a/music_assistant/server/controllers/config.py b/music_assistant/server/controllers/config.py index 7bfa26caf..d77462a6c 100644 --- a/music_assistant/server/controllers/config.py +++ b/music_assistant/server/controllers/config.py @@ -394,6 +394,7 @@ async def save_player_config( object_id=config.player_id, data=config, ) + self.mass.players.update(config.player_id, force_update=True) # return full player config (just in case) return await self.get_player_config(player_id) diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 9a39d2761..0b08a31cc 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -309,6 +309,7 @@ def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None: self.mass.create_task(self.resume(queue_id)) else: task_id = f"enqueue_next_{queue_id}" + self.logger.info("Repeat mode detected, enqueue next item") self.mass.call_later(2, self._enqueue_next, queue, queue.current_index, task_id=task_id) @api_command("player_queues/play_media") diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index ecac01266..c0c286126 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -275,7 +275,7 @@ async def cmd_seek(self, player_id: str, position: int) -> None: if PlayerFeature.SEEK not in player.supported_features: msg = f"Player {player.display_name} does not support seeking" raise UnsupportedFeaturedException(msg) - player_prov = self.mass.players.get_player_provider(player.player_id) + player_prov = self.get_player_provider(player.player_id) await player_prov.cmd_seek(player.player_id, position) @api_command("players/cmd/next") @@ -290,7 +290,7 @@ async def cmd_next_track(self, player_id: str) -> None: if PlayerFeature.NEXT_PREVIOUS not in player.supported_features: msg = f"Player {player.display_name} does not support skipping to the next track." raise UnsupportedFeaturedException(msg) - player_prov = self.mass.players.get_player_provider(player.player_id) + player_prov = self.get_player_provider(player.player_id) await player_prov.cmd_next(player.player_id) @api_command("players/cmd/previous") @@ -305,7 +305,7 @@ async def cmd_previous_track(self, player_id: str) -> None: if PlayerFeature.NEXT_PREVIOUS not in player.supported_features: msg = f"Player {player.display_name} does not support skipping to the previous track." raise UnsupportedFeaturedException(msg) - player_prov = self.mass.players.get_player_provider(player.player_id) + player_prov = self.get_player_provider(player.player_id) await player_prov.cmd_previous(player.player_id) @api_command("players/cmd/power") @@ -565,7 +565,7 @@ async def play_media(self, player_id: str, media: PlayerMedia) -> None: # power on the player if needed if not player.powered: await self.cmd_power(player.player_id, True) - player_prov = self.mass.players.get_player_provider(player.player_id) + player_prov = self.get_player_provider(player.player_id) await player_prov.play_media( player_id=player.player_id, media=media, @@ -573,7 +573,7 @@ async def play_media(self, player_id: str, media: PlayerMedia) -> None: async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: """Handle enqueuing of a next media item on the player.""" - player_prov = self.mass.players.get_player_provider(player_id) + player_prov = self.get_player_provider(player_id) async with self._player_throttlers[player_id]: await player_prov.enqueue_next_media(player_id=player_id, media=media) @@ -1058,7 +1058,7 @@ async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[ if player_provider := self.mass.get_provider(config.provider): with suppress(PlayerUnavailableError): await player_provider.on_player_config_change(config, changed_keys) - if not (player := self.mass.players.get(config.player_id)): + if not (player := self.get(config.player_id)): return if player_disabled: # edge case: ensure that the player is powered off if the player gets disabled @@ -1070,14 +1070,13 @@ async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[ # check for group memberships that need to be updated if player_disabled and player.active_group and player_provider: # try to remove from the group - group_player = self.mass.players.get(player.active_group) + group_player = self.get(player.active_group) with suppress(UnsupportedFeaturedException, PlayerCommandFailed): await player_provider.set_members( player.active_group, [x for x in group_player.group_childs if x != player.player_id], ) player.enabled = config.enabled - self.mass.players.update(config.player_id, force_update=True) async def _play_announcement( self, diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index ccdd23247..e57940cb3 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -58,7 +58,6 @@ check_audio_support, crossfade_pcm_parts, get_chunksize, - get_hls_radio_stream, get_hls_substream, get_icy_radio_stream, get_media_stream, @@ -836,21 +835,21 @@ async def get_media_stream( ) elif streamdetails.stream_type == StreamType.ICY: audio_source = get_icy_radio_stream(self.mass, streamdetails.path, streamdetails) - elif streamdetails.stream_type == StreamType.HLS: + elif streamdetails.stream_type in (StreamType.HLS, StreamType.ENCRYPTED_HLS): + substream = await get_hls_substream(self.mass, streamdetails.path) + audio_source = substream.path if streamdetails.media_type == MediaType.RADIO: # Especially the BBC streams struggle when they're played directly - # with ffmpeg, so we use our own HLS stream parser/logic - audio_source = get_hls_radio_stream(self.mass, streamdetails.path, streamdetails) - else: - # normal tracks we just let ffmpeg deal with it - substream = await get_hls_substream(self.mass, streamdetails.path) - audio_source = substream.path - elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP: - audio_source = streamdetails.path - extra_input_args += ["-decryption_key", streamdetails.decryption_key] + # with ffmpeg, where they just stop after some minutes, + # so we tell ffmpeg to loop around in this case. + extra_input_args += ["-stream_loop", "-1", "-re"] else: audio_source = streamdetails.path + # add support for decryption key provided in streamdetails + if streamdetails.decryption_key: + extra_input_args += ["-decryption_key", streamdetails.decryption_key] + # handle seek support if ( streamdetails.seek_position diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 23fa5876f..13f996478 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -570,51 +570,9 @@ async def get_icy_radio_stream( streamdetails.stream_title = cleaned_stream_title -async def get_hls_radio_stream( - mass: MusicAssistant, - url: str, - streamdetails: StreamDetails, -) -> AsyncGenerator[bytes, None]: - """Get radio audio stream from HTTP HLS playlist.""" - logger = LOGGER.getChild("hls_stream") - logger.debug("Start streaming HLS stream for url %s", url) - # we simply select the best quality substream here - # if we ever want to support adaptive stream selection based on bandwidth - # we need to move the substream selection into the loop below and make it - # bandwidth aware. For now we just assume domestic high bandwidth where - # the user wants the best quality possible at all times. - playlist_item = await get_hls_substream(mass, url) - substream_url = playlist_item.path - loops = 50 if streamdetails.media_type != MediaType.RADIO else 1 - while loops: - logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url) - # We simply let ffmpeg deal with parsing the HLS playlist and stichting chunks together. - # However we do not feed the playlist URL to ffmpeg directly to give us the possibility - # to monitor the stream title and other metadata for radio streams in the future. - # Also, we've seen cases where ffmpeg sometimes chokes in a stream and aborts, which is not - # very useful for radio streams which you want to simply go on forever, so we need to loop - # and restart ffmpeg in case of an error. - input_format = AudioFormat(content_type=ContentType.UNKNOWN) - audio_format_detected = False - async for chunk in get_ffmpeg_stream( - audio_input=substream_url, - input_format=input_format, - output_format=AudioFormat(content_type=ContentType.WAV), - ): - yield chunk - if audio_format_detected: - continue - if input_format.content_type not in (ContentType.UNKNOWN, ContentType.WAV): - # we need to determine the audio format from the first chunk - streamdetails.audio_format = input_format - audio_format_detected = True - loops -= 1 - - async def get_hls_substream( mass: MusicAssistant, url: str, - allow_encrypted: bool = False, ) -> PlaylistItem: """Select the (highest quality) HLS substream for given HLS playlist/URL.""" timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) @@ -627,9 +585,6 @@ async def get_hls_substream( raw_data = await resp.read() encoding = resp.charset or await detect_charset(raw_data) master_m3u_data = raw_data.decode(encoding) - if not allow_encrypted and "EXT-X-KEY:METHOD=" in master_m3u_data: - # for now we do not (yet) support encrypted HLS streams - raise InvalidDataError("HLS stream is encrypted, not supported") substreams = parse_m3u(master_m3u_data) # There is a chance that we did not get a master playlist with subplaylists # but just a single master/sub playlist with the actual audio stream(s) diff --git a/music_assistant/server/helpers/ffmpeg.py b/music_assistant/server/helpers/ffmpeg.py index 3e390f658..d283b24df 100644 --- a/music_assistant/server/helpers/ffmpeg.py +++ b/music_assistant/server/helpers/ffmpeg.py @@ -218,16 +218,24 @@ def get_ffmpeg_args( if input_path.startswith("http"): # append reconnect options for direct stream from http input_args += [ - "-reconnect", + # Reconnect automatically when disconnected before EOF is hit. + "reconnect", "1", - "-reconnect_streamed", + # Set the maximum delay in seconds after which to give up reconnecting. + "-reconnect_delay_max", + "30", + # If set then even streamed/non seekable streams will be reconnected on errors. + "reconnect_streamed", "1", ] if major_version > 4: # these options are only supported in ffmpeg > 5 input_args += [ + # Reconnect automatically in case of TCP/TLS errors during connect. "-reconnect_on_network_error", "1", + # A comma separated list of HTTP status codes to reconnect on. + # The list can include specific status codes (e.g. 503) or the strings 4xx / 5xx. "-reconnect_on_http_error", "5xx,4xx", ] diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 2ca3c8cce..1a29387ec 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -2,141 +2,25 @@ from __future__ import annotations -import asyncio -import logging -import os -import platform -import socket -import time -from contextlib import suppress -from random import randint, randrange from typing import TYPE_CHECKING -from zeroconf import IPVersion, ServiceStateChange -from zeroconf.asyncio import AsyncServiceInfo - -from music_assistant.common.helpers.datetime import utc -from music_assistant.common.helpers.util import get_ip_pton, select_free_port from music_assistant.common.models.config_entries import ( - CONF_ENTRY_CROSSFADE, - CONF_ENTRY_CROSSFADE_DURATION, - CONF_ENTRY_EQ_BASS, - CONF_ENTRY_EQ_MID, - CONF_ENTRY_EQ_TREBLE, - CONF_ENTRY_FLOW_MODE_ENFORCED, - CONF_ENTRY_OUTPUT_CHANNELS, - CONF_ENTRY_SYNC_ADJUST, ConfigEntry, ConfigValueType, ProviderConfig, - create_sample_rates_config_entry, -) -from music_assistant.common.models.enums import ( - ConfigEntryType, - ContentType, - MediaType, - PlayerFeature, - PlayerState, - PlayerType, - ProviderFeature, ) -from music_assistant.common.models.media_items import AudioFormat -from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia -from music_assistant.common.models.player_queue import PlayerQueue +from music_assistant.common.models.enums import ConfigEntryType from music_assistant.common.models.provider import ProviderManifest -from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL from music_assistant.server import MusicAssistant -from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params -from music_assistant.server.helpers.process import AsyncProcess, check_output -from music_assistant.server.helpers.util import TaskManager, lock -from music_assistant.server.models.player_provider import PlayerProvider + +from .const import CONF_BIND_INTERFACE +from .provider import AirplayProvider if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType - from music_assistant.server.providers.player_group import PlayerGroupProvider - -DOMAIN = "airplay" - -CONF_ENCRYPTION = "encryption" -CONF_ALAC_ENCODE = "alac_encode" -CONF_VOLUME_START = "volume_start" -CONF_PASSWORD = "password" -CONF_BIND_INTERFACE = "bind_interface" - -PLAYER_CONFIG_ENTRIES = ( - CONF_ENTRY_FLOW_MODE_ENFORCED, - CONF_ENTRY_CROSSFADE, - CONF_ENTRY_CROSSFADE_DURATION, - CONF_ENTRY_EQ_BASS, - CONF_ENTRY_EQ_MID, - CONF_ENTRY_EQ_TREBLE, - CONF_ENTRY_OUTPUT_CHANNELS, - ConfigEntry( - key=CONF_ENCRYPTION, - type=ConfigEntryType.BOOLEAN, - default_value=False, - label="Enable encryption", - description="Enable encrypted communication with the player, " - "some (3rd party) players require this.", - category="airplay", - ), - ConfigEntry( - key=CONF_ALAC_ENCODE, - type=ConfigEntryType.BOOLEAN, - default_value=True, - label="Enable compression", - description="Save some network bandwidth by sending the audio as " - "(lossless) ALAC at the cost of a bit CPU.", - category="airplay", - ), - CONF_ENTRY_SYNC_ADJUST, - ConfigEntry( - key=CONF_PASSWORD, - type=ConfigEntryType.SECURE_STRING, - default_value=None, - required=False, - label="Device password", - description="Some devices require a password to connect/play.", - category="airplay", - ), -) -BACKOFF_TIME_LOWER_LIMIT = 15 # seconds -BACKOFF_TIME_UPPER_LIMIT = 300 # Five minutes - -CONF_CREDENTIALS = "credentials" -CACHE_KEY_PREV_VOLUME = "airplay_prev_volume" -FALLBACK_VOLUME = 20 - -AIRPLAY_FLOW_PCM_FORMAT = AudioFormat( - content_type=ContentType.PCM_F32LE, - sample_rate=44100, - bit_depth=32, -) -AIRPLAY_PCM_FORMAT = AudioFormat( - content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16 -) - -# airplay has fixed sample rate/bit depth so make this config entry static and hidden -CONF_ENTRY_SAMPLE_RATES_AIRPLAY = create_sample_rates_config_entry(44100, 16, 44100, 16, True) - - -# TODO: Airplay provider -# - split up and cleanup the code into more digestable parts -# - Implement authentication for Apple TV -# - Implement volume control for Apple devices using pyatv -# - Implement metadata for Apple Apple devices using pyatv -# - Use pyatv for communicating with original Apple devices -# and use cliraop for actual streaming - - -async def setup( - mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig -) -> ProviderInstanceType: - """Initialize provider(instance) with given configuration.""" - return AirplayProvider(mass, manifest, config) async def get_config_entries( @@ -165,874 +49,8 @@ async def get_config_entries( ) -def convert_airplay_volume(value: float) -> int: - """Remap Airplay Volume to 0..100 scale.""" - airplay_min = -30 - airplay_max = 0 - normal_min = 0 - normal_max = 100 - portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min) - return int(portion + normal_min) - - -def get_model_from_am(am_property: str | None) -> tuple[str, str]: - """Return Manufacturer and Model name from mdns AM property.""" - manufacturer = "Unknown" - model = "Generic Airplay device" - if not am_property: - return (manufacturer, model) - if isinstance(am_property, bytes): - am_property = am_property.decode("utf-8") - if am_property == "AudioAccessory5,1": - model = "HomePod" - manufacturer = "Apple" - elif "AppleTV" in am_property: - model = "Apple TV" - manufacturer = "Apple" - else: - model = am_property - return (manufacturer, model) - - -def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None: - """Get primary IP address from zeroconf discovery info.""" - for address in discovery_info.parsed_addresses(IPVersion.V4Only): - if address.startswith("127"): - # filter out loopback address - continue - if address.startswith("169.254"): - # filter out APIPA address - continue - return address - return None - - -class RaopStream: - """Object that holds the details of a (RAOP) stream job.""" - - def __init__( - self, prov: AirplayProvider, airplay_player: AirPlayPlayer, input_format: AudioFormat - ) -> None: - """Initialize RaopStream.""" - self.prov = prov - self.mass = prov.mass - self.airplay_player = airplay_player - self.input_format = input_format - # always generate a new active remote id to prevent race conditions - # with the named pipe used to send audio - self.active_remote_id: str = str(randint(1000, 8000)) - self.prevent_playback: bool = False - # audio_source_task will only exist for the main player in a sync group - self.audio_source_task: asyncio.Task | None = None - self._log_reader_task: asyncio.Task | None = None - self._cliraop_proc: AsyncProcess | None = None - self._ffmpeg_proc: AsyncProcess | None = None - self._started = asyncio.Event() - self._stopped = False - - @property - def running(self) -> bool: - """Return boolean if this stream is running.""" - return not self._stopped and self._started.is_set() - - async def start(self, start_ntp: int, wait_start: int = 1000) -> None: - """Initialize CLIRaop process for a player.""" - extra_args = [] - player_id = self.airplay_player.player_id - mass_player = self.mass.players.get(player_id) - bind_ip = await self.mass.config.get_provider_config_value( - self.prov.instance_id, CONF_BIND_INTERFACE - ) - extra_args += ["-if", bind_ip] - if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False): - extra_args += ["-encrypt"] - if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True): - extra_args += ["-alac"] - for prop in ("et", "md", "am", "pk", "pw"): - if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop): - extra_args += [f"-{prop}", prop_value] - sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0) - if device_password := self.mass.config.get_raw_player_config_value( - player_id, CONF_PASSWORD, None - ): - extra_args += ["-password", device_password] - if self.prov.logger.isEnabledFor(logging.DEBUG): - extra_args += ["-debug", "5"] - elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): - extra_args += ["-debug", "10"] - - # create os pipes to pipe ffmpeg to cliraop - read, write = await asyncio.to_thread(os.pipe) - - # ffmpeg handles the player specific stream + filters and pipes - # audio to the cliraop process - self._ffmpeg_proc = FFMpeg( - audio_input="-", - input_format=self.input_format, - output_format=AIRPLAY_PCM_FORMAT, - filter_params=get_player_filter_params(self.mass, player_id), - audio_output=write, - ) - await self._ffmpeg_proc.start() - await asyncio.to_thread(os.close, write) - - # cliraop is the binary that handles the actual raop streaming to the player - cliraop_args = [ - self.prov.cliraop_bin, - "-ntpstart", - str(start_ntp), - "-port", - str(self.airplay_player.discovery_info.port), - "-wait", - str(wait_start - sync_adjust), - "-volume", - str(mass_player.volume_level), - *extra_args, - "-dacp", - self.prov.dacp_id, - "-activeremote", - self.active_remote_id, - "-udn", - self.airplay_player.discovery_info.name, - self.airplay_player.address, - "-", - ] - self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop") - if platform.system() == "Darwin": - os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" - await self._cliraop_proc.start() - await asyncio.to_thread(os.close, read) - self._started.set() - self._log_reader_task = self.mass.create_task(self._log_watcher()) - - async def stop(self): - """Stop playback and cleanup.""" - if self._stopped: - return - if self.audio_source_task and not self.audio_source_task.done(): - self.audio_source_task.cancel() - if self._cliraop_proc.proc and not self._cliraop_proc.closed: - await self.send_cli_command("ACTION=STOP") - self._stopped = True # set after send_cli command! - if self._cliraop_proc.proc and not self._cliraop_proc.closed: - await self._cliraop_proc.close(True) - if self._ffmpeg_proc and not self._ffmpeg_proc.closed: - await self._ffmpeg_proc.close(True) - self._cliraop_proc = None - self._ffmpeg_proc = None - - async def write_chunk(self, chunk: bytes) -> None: - """Write a (pcm) audio chunk.""" - if self._stopped: - return - await self._started.wait() - await self._ffmpeg_proc.write(chunk) - - async def write_eof(self) -> None: - """Write EOF.""" - if self._stopped: - return - await self._started.wait() - await self._ffmpeg_proc.write_eof() - - async def send_cli_command(self, command: str) -> None: - """Send an interactive command to the running CLIRaop binary.""" - if self._stopped: - return - await self._started.wait() - - if not command.endswith("\n"): - command += "\n" - - def send_data(): - with suppress(BrokenPipeError), open(named_pipe, "w") as f: - f.write(command) - - named_pipe = f"/tmp/raop-{self.active_remote_id}" # noqa: S108 - self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) - await asyncio.to_thread(send_data) - - async def _log_watcher(self) -> None: - """Monitor stderr for the running CLIRaop process.""" - airplay_player = self.airplay_player - mass_player = self.mass.players.get(airplay_player.player_id) - queue = self.mass.player_queues.get_active_queue(mass_player.active_source) - logger = airplay_player.logger - lost_packets = 0 - prev_metadata_checksum: str = "" - prev_progress_report: float = 0 - async for line in self._cliraop_proc.iter_stderr(): - if "elapsed milliseconds:" in line: - # this is received more or less every second while playing - millis = int(line.split("elapsed milliseconds: ")[1]) - mass_player.elapsed_time = millis / 1000 - mass_player.elapsed_time_last_updated = time.time() - # send metadata to player(s) if needed - # NOTE: this must all be done in separate tasks to not disturb audio - now = time.time() - if ( - mass_player.elapsed_time > 2 - and queue - and queue.current_item - and queue.current_item.streamdetails - ): - metadata_checksum = ( - queue.current_item.streamdetails.stream_title - or queue.current_item.queue_item_id - ) - if prev_metadata_checksum != metadata_checksum: - prev_metadata_checksum = metadata_checksum - prev_progress_report = now - self.mass.create_task(self._send_metadata(queue)) - # send the progress report every 5 seconds - elif now - prev_progress_report >= 5: - prev_progress_report = now - self.mass.create_task(self._send_progress(queue)) - if "set pause" in line or "Pause at" in line: - mass_player.state = PlayerState.PAUSED - self.mass.players.update(airplay_player.player_id) - if "Restarted at" in line or "restarting w/ pause" in line: - mass_player.state = PlayerState.PLAYING - self.mass.players.update(airplay_player.player_id) - if "restarting w/o pause" in line: - # streaming has started - mass_player.state = PlayerState.PLAYING - mass_player.elapsed_time = 0 - mass_player.elapsed_time_last_updated = time.time() - self.mass.players.update(airplay_player.player_id) - if "lost packet out of backlog" in line: - lost_packets += 1 - if lost_packets == 100: - logger.error("High packet loss detected, restarting playback...") - self.mass.create_task(self.mass.player_queues.resume(queue.queue_id)) - else: - logger.warning("Packet loss detected!") - if "end of stream reached" in line: - logger.debug("End of stream reached") - break - - logger.log(VERBOSE_LOG_LEVEL, line) - - # if we reach this point, the process exited - if airplay_player.raop_stream == self: - mass_player.state = PlayerState.IDLE - self.mass.players.update(airplay_player.player_id) - # ensure we're cleaned up afterwards (this also logs the returncode) - await self.stop() - - async def _send_metadata(self, queue: PlayerQueue) -> None: - """Send metadata to player (and connected sync childs).""" - if not queue or not queue.current_item: - return - duration = min(queue.current_item.duration or 0, 3600) - title = queue.current_item.name - artist = "" - album = "" - if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title: - # stream title from radio station - stream_title = queue.current_item.streamdetails.stream_title - if " - " in stream_title: - artist, title = stream_title.split(" - ", 1) - else: - title = stream_title - # set album to radio station name - album = queue.current_item.name - elif media_item := queue.current_item.media_item: - title = media_item.name - if artist_str := getattr(media_item, "artist_str", None): - artist = artist_str - if _album := getattr(media_item, "album", None): - album = _album.name - - cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n" - cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n" - - await self.send_cli_command(cmd) - - # get image - if not queue.current_item.image: - return - - # the image format needs to be 500x500 jpeg for maximum compatibility with players - image_url = self.mass.metadata.get_image_url( - queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg" - ) - await self.send_cli_command(f"ARTWORK={image_url}\n") - - async def _send_progress(self, queue: PlayerQueue) -> None: - """Send progress report to player (and connected sync childs).""" - if not queue or not queue.current_item: - return - progress = int(queue.corrected_elapsed_time) - await self.send_cli_command(f"PROGRESS={progress}\n") - - -class AirPlayPlayer: - """Holds the details of the (discovered) Airplay (RAOP) player.""" - - def __init__( - self, prov: AirplayProvider, player_id: str, discovery_info: AsyncServiceInfo, address: str - ) -> None: - """Initialize AirPlayPlayer.""" - self.prov = prov - self.mass = prov.mass - self.player_id = player_id - self.discovery_info = discovery_info - self.address = address - self.logger = prov.logger.getChild(player_id) - self.raop_stream: RaopStream | None = None - - async def cmd_stop(self, update_state: bool = True) -> None: - """Send STOP command to player.""" - if self.raop_stream: - await self.raop_stream.stop() - if update_state and (mass_player := self.mass.players.get(self.player_id)): - mass_player.state = PlayerState.IDLE - self.mass.players.update(mass_player.player_id) - - async def cmd_play(self) -> None: - """Send PLAY (unpause) command to player.""" - if self.raop_stream and self.raop_stream.running: - await self.raop_stream.send_cli_command("ACTION=PLAY") - - async def cmd_pause(self) -> None: - """Send PAUSE command to player.""" - if not self.raop_stream or not self.raop_stream.running: - return - await self.raop_stream.send_cli_command("ACTION=PAUSE") - - -class AirplayProvider(PlayerProvider): - """Player provider for Airplay based players.""" - - cliraop_bin: str | None = None - _players: dict[str, AirPlayPlayer] - _dacp_server: asyncio.Server = None - _dacp_info: AsyncServiceInfo = None - _play_media_lock: asyncio.Lock = asyncio.Lock() - - @property - def supported_features(self) -> tuple[ProviderFeature, ...]: - """Return the features supported by this Provider.""" - return (ProviderFeature.SYNC_PLAYERS,) - - async def handle_async_init(self) -> None: - """Handle async initialization of the provider.""" - self._players = {} - self.cliraop_bin = await self._getcliraop_binary() - dacp_port = await select_free_port(39831, 49831) - self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}" - self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port) - self._dacp_server = await asyncio.start_server( - self._handle_dacp_request, "0.0.0.0", dacp_port - ) - zeroconf_type = "_dacp._tcp.local." - server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}" - self._dacp_info = AsyncServiceInfo( - zeroconf_type, - name=server_id, - addresses=[await get_ip_pton(self.mass.streams.publish_ip)], - port=dacp_port, - properties={ - "txtvers": "1", - "Ver": "63B5E5C0C201542E", - "DbId": "63B5E5C0C201542E", - "OSsi": "0x1F5", - }, - server=f"{socket.gethostname()}.local", - ) - await self.mass.aiozc.async_register_service(self._dacp_info) - - async def on_mdns_service_state_change( - self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None - ) -> None: - """Handle MDNS service state callback.""" - raw_id, display_name = name.split(".")[0].split("@", 1) - player_id = f"ap{raw_id.lower()}" - # handle removed player - if state_change == ServiceStateChange.Removed: - if mass_player := self.mass.players.get(player_id): - if not mass_player.available: - return - # the player has become unavailable - self.logger.debug("Player offline: %s", display_name) - mass_player.available = False - self.mass.players.update(player_id) - return - # handle update for existing device - if airplay_player := self._players.get(player_id): - if mass_player := self.mass.players.get(player_id): - cur_address = get_primary_ip_address(info) - if cur_address and cur_address != airplay_player.address: - airplay_player.logger.debug( - "Address updated from %s to %s", airplay_player.address, cur_address - ) - airplay_player.address = cur_address - mass_player.device_info = DeviceInfo( - model=mass_player.device_info.model, - manufacturer=mass_player.device_info.manufacturer, - address=str(cur_address), - ) - if not mass_player.available: - self.logger.debug("Player back online: %s", display_name) - mass_player.available = True - # always update the latest discovery info - airplay_player.discovery_info = info - self.mass.players.update(player_id) - return - # handle new player - await self._setup_player(player_id, display_name, info) - - async def unload(self) -> None: - """Handle close/cleanup of the provider.""" - # power off all players (will disconnect and close cliraop) - for player_id in self._players: - await self.cmd_power(player_id, False) - # shutdown DACP server - if self._dacp_server: - self._dacp_server.close() - # shutdown DACP zeroconf service - if self._dacp_info: - await self.mass.aiozc.async_unregister_service(self._dacp_info) - - async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: - """Return all (provider/player specific) Config Entries for the given player (if any).""" - base_entries = await super().get_player_config_entries(player_id) - if player_id not in self._players: - # most probably a syncgroup - return ( - *base_entries, - CONF_ENTRY_CROSSFADE, - CONF_ENTRY_CROSSFADE_DURATION, - CONF_ENTRY_SAMPLE_RATES_AIRPLAY, - CONF_ENTRY_FLOW_MODE_ENFORCED, - ) - return (*base_entries, *PLAYER_CONFIG_ENTRIES, CONF_ENTRY_SAMPLE_RATES_AIRPLAY) - - async def cmd_stop(self, player_id: str) -> None: - """Send STOP command to given player. - - - player_id: player_id of the player to handle the command. - """ - # forward command to player and any connected sync members - async with TaskManager(self.mass) as tg: - for airplay_player in self._get_sync_clients(player_id): - tg.create_task(airplay_player.cmd_stop()) - - async def cmd_play(self, player_id: str) -> None: - """Send PLAY (unpause) command to given player. - - - player_id: player_id of the player to handle the command. - """ - # forward command to player and any connected sync members - async with TaskManager(self.mass) as tg: - for airplay_player in self._get_sync_clients(player_id): - tg.create_task(airplay_player.cmd_play()) - - async def cmd_pause(self, player_id: str) -> None: - """Send PAUSE command to given player. - - - player_id: player_id of the player to handle the command. - """ - player = self.mass.players.get(player_id) - if player.synced_to: - # should not happen, but just in case - raise RuntimeError("Player is synced") - if player.group_childs: - # pause is not supported while synced, use stop instead - self.logger.debug("Player is synced, using STOP instead of PAUSE") - await self.cmd_stop(player_id) - return - airplay_player = self._players[player_id] - await airplay_player.cmd_pause() - - @lock - async def play_media( - self, - player_id: str, - media: PlayerMedia, - ) -> None: - """Handle PLAY MEDIA on given player.""" - async with self._play_media_lock: - player = self.mass.players.get(player_id) - # set the active source for the player to the media queue - # this accounts for syncgroups and linked players (e.g. sonos) - player.active_source = media.queue_id - if player.synced_to: - # should not happen, but just in case - raise RuntimeError("Player is synced") - # always stop existing stream first - async with TaskManager(self.mass) as tg: - for airplay_player in self._get_sync_clients(player_id): - tg.create_task(airplay_player.cmd_stop(update_state=False)) - # select audio source - if media.media_type == MediaType.ANNOUNCEMENT: - # special case: stream announcement - input_format = AIRPLAY_PCM_FORMAT - audio_source = self.mass.streams.get_announcement_stream( - media.custom_data["url"], - output_format=AIRPLAY_PCM_FORMAT, - use_pre_announce=media.custom_data["use_pre_announce"], - ) - elif media.queue_id.startswith("ugp_"): - # special case: UGP stream - ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") - ugp_stream = ugp_provider.ugp_streams[media.queue_id] - input_format = ugp_stream.output_format - audio_source = ugp_stream.subscribe() - elif media.queue_id and media.queue_item_id: - # regular queue (flow) stream request - input_format = AIRPLAY_FLOW_PCM_FORMAT - audio_source = self.mass.streams.get_flow_stream( - queue=self.mass.player_queues.get(media.queue_id), - start_queue_item=self.mass.player_queues.get_item( - media.queue_id, media.queue_item_id - ), - pcm_format=input_format, - ) - else: - # assume url or some other direct path - # NOTE: this will fail if its an uri not playable by ffmpeg - input_format = AIRPLAY_PCM_FORMAT - audio_source = get_ffmpeg_stream( - audio_input=media.uri, - input_format=AudioFormat(ContentType.try_parse(media.uri)), - output_format=AIRPLAY_PCM_FORMAT, - ) - - # Python is not suitable for realtime audio streaming so we do the actual streaming - # of (RAOP) audio using a small executable written in C based on libraop to do - # the actual timestamped playback, which reads pcm audio from stdin - # and we can send some interactive commands using a named pipe. - - # setup RaopStream for player and its sync childs - sync_clients = self._get_sync_clients(player_id) - for airplay_player in sync_clients: - airplay_player.raop_stream = RaopStream( - self, airplay_player, input_format=input_format - ) - - async def audio_streamer() -> None: - async for chunk in audio_source: - await asyncio.gather( - *[x.raop_stream.write_chunk(chunk) for x in sync_clients], - return_exceptions=True, - ) - # entire stream consumed: send EOF - await asyncio.gather( - *[x.raop_stream.write_eof() for x in sync_clients], - return_exceptions=True, - ) - - # get current ntp and start cliraop - _, stdout = await check_output(self.cliraop_bin, "-ntp") - start_ntp = int(stdout.strip()) - wait_start = 1250 + (250 * len(sync_clients)) - await asyncio.gather( - *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients], - return_exceptions=True, - ) - self._players[player_id].raop_stream.audio_source_task = asyncio.create_task( - audio_streamer() - ) - - async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: - """Send VOLUME_SET command to given player. - - - player_id: player_id of the player to handle the command. - - volume_level: volume level (0..100) to set on the player. - """ - airplay_player = self._players[player_id] - if airplay_player.raop_stream and airplay_player.raop_stream.running: - await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n") - mass_player = self.mass.players.get(player_id) - mass_player.volume_level = volume_level - mass_player.volume_muted = volume_level == 0 - self.mass.players.update(player_id) - # store last state in cache - await self.mass.cache.set(player_id, volume_level, base_key=CACHE_KEY_PREV_VOLUME) - - @lock - async def cmd_sync(self, player_id: str, target_player: str) -> None: - """Handle SYNC command for given player. - - Join/add the given player(id) to the given (master) player/sync group. - - - player_id: player_id of the player to handle the command. - - target_player: player_id of the syncgroup master or group player. - """ - if player_id == target_player: - return - child_player = self.mass.players.get(player_id) - assert child_player # guard - parent_player = self.mass.players.get(target_player) - assert parent_player # guard - if parent_player.synced_to: - raise RuntimeError("Player is already synced") - if child_player.synced_to and child_player.synced_to != target_player: - raise RuntimeError("Player is already synced to another player") - # always make sure that the parent player is part of the sync group - parent_player.group_childs.add(parent_player.player_id) - parent_player.group_childs.add(child_player.player_id) - child_player.synced_to = parent_player.player_id - # mark players as powered - parent_player.powered = True - child_player.powered = True - # check if we should (re)start or join a stream session - active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id) - if active_queue.state == PlayerState.PLAYING: - # playback needs to be restarted to form a new multi client stream session - # this could potentially be called by multiple players at the exact same time - # so we debounce the resync a bit here with a timer - self.mass.call_later( - 1, - self.mass.player_queues.resume, - active_queue.queue_id, - fade_in=False, - task_id=f"resume_{active_queue.queue_id}", - ) - else: - # make sure that the player manager gets an update - self.mass.players.update(child_player.player_id, skip_forward=True) - self.mass.players.update(parent_player.player_id, skip_forward=True) - - @lock - async def cmd_unsync(self, player_id: str) -> None: - """Handle UNSYNC command for given player. - - Remove the given player from any syncgroups it currently is synced to. - - - player_id: player_id of the player to handle the command. - """ - player = self.mass.players.get(player_id, raise_unavailable=True) - if player.synced_to: - group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True) - if player_id in group_leader.group_childs: - group_leader.group_childs.remove(player_id) - player.synced_to = None - airplay_player = self._players.get(player_id) - await airplay_player.cmd_stop() - # make sure that the player manager gets an update - self.mass.players.update(player.player_id, skip_forward=True) - self.mass.players.update(group_leader.player_id, skip_forward=True) - - async def _getcliraop_binary(self): - """Find the correct raop/airplay binary belonging to the platform.""" - # ruff: noqa: SIM102 - if self.cliraop_bin is not None: - return self.cliraop_bin - - async def check_binary(cliraop_path: str) -> str | None: - try: - returncode, output = await check_output( - cliraop_path, - "-check", - ) - if returncode == 0 and output.strip().decode() == "cliraop check": - self.cliraop_bin = cliraop_path - return cliraop_path - except OSError: - return None - - base_path = os.path.join(os.path.dirname(__file__), "bin") - system = platform.system().lower().replace("darwin", "macos") - architecture = platform.machine().lower() - - if bridge_binary := await check_binary( - os.path.join(base_path, f"cliraop-{system}-{architecture}") - ): - return bridge_binary - - msg = f"Unable to locate RAOP Play binary for {system}/{architecture}" - raise RuntimeError(msg) - - def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]: - """Get all sync clients for a player.""" - mass_player = self.mass.players.get(player_id, True) - sync_clients: list[AirPlayPlayer] = [] - # we need to return the player itself too - group_child_ids = {player_id} - group_child_ids.update(mass_player.group_childs) - for child_id in group_child_ids: - if client := self._players.get(child_id): - sync_clients.append(client) - return sync_clients - - async def _setup_player( - self, player_id: str, display_name: str, info: AsyncServiceInfo - ) -> None: - """Handle setup of a new player that is discovered using mdns.""" - address = get_primary_ip_address(info) - if address is None: - return - self.logger.debug("Discovered Airplay device %s on %s", display_name, address) - manufacturer, model = get_model_from_am(info.decoded_properties.get("am")) - if "apple tv" in model.lower(): - # For now, we ignore the Apple TV until we implement the authentication. - # maybe we can simply use pyatv only for this part? - # the cliraop application has already been prepared to accept the secret. - self.logger.debug( - "Ignoring %s in discovery due to authentication requirement.", display_name - ) - return - if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True): - self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name) - return - self._players[player_id] = AirPlayPlayer(self, player_id, info, address) - if not (volume := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_VOLUME)): - volume = FALLBACK_VOLUME - mass_player = Player( - player_id=player_id, - provider=self.instance_id, - type=PlayerType.PLAYER, - name=display_name, - available=True, - powered=False, - device_info=DeviceInfo( - model=model, - manufacturer=manufacturer, - address=address, - ), - supported_features=( - PlayerFeature.PAUSE, - PlayerFeature.SYNC, - PlayerFeature.VOLUME_SET, - ), - volume_level=volume, - ) - await self.mass.players.register_or_update(mass_player) - - async def _handle_dacp_request( # noqa: PLR0915 - self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ) -> None: - """Handle new connection on the socket.""" - try: - raw_request = b"" - while recv := await reader.read(1024): - raw_request += recv - if len(recv) < 1024: - break - - request = raw_request.decode("UTF-8") - if "\r\n\r\n" in request: - headers_raw, body = request.split("\r\n\r\n", 1) - else: - headers_raw = request - body = "" - headers_raw = headers_raw.split("\r\n") - headers = {} - for line in headers_raw[1:]: - if ":" not in line: - continue - x, y = line.split(":", 1) - headers[x.strip()] = y.strip() - active_remote = headers.get("Active-Remote") - _, path, _ = headers_raw[0].split(" ") - airplay_player = next( - ( - x - for x in self._players.values() - if x.raop_stream and x.raop_stream.active_remote_id == active_remote - ), - None, - ) - self.logger.debug( - "DACP request for %s (%s): %s -- %s", - airplay_player.discovery_info.name if airplay_player else "UNKNOWN PLAYER", - active_remote, - path, - body, - ) - if not airplay_player: - return - - player_id = airplay_player.player_id - mass_player = self.mass.players.get(player_id) - active_queue = self.mass.player_queues.get_active_queue(player_id) - if path == "/ctrl-int/1/nextitem": - self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id)) - elif path == "/ctrl-int/1/previtem": - self.mass.create_task(self.mass.player_queues.previous(active_queue.queue_id)) - elif path == "/ctrl-int/1/play": - self.mass.create_task(self.mass.player_queues.play(active_queue.queue_id)) - elif path == "/ctrl-int/1/playpause": - self.mass.create_task(self.mass.player_queues.play_pause(active_queue.queue_id)) - elif path == "/ctrl-int/1/stop": - self.mass.create_task(self.mass.player_queues.stop(active_queue.queue_id)) - elif path == "/ctrl-int/1/volumeup": - self.mass.create_task(self.mass.players.cmd_volume_up(player_id)) - elif path == "/ctrl-int/1/volumedown": - self.mass.create_task(self.mass.players.cmd_volume_down(player_id)) - elif path == "/ctrl-int/1/shuffle_songs": - queue = self.mass.player_queues.get(player_id) - self.mass.loop.call_soon( - self.mass.player_queues.set_shuffle( - active_queue.queue_id, not queue.shuffle_enabled - ) - ) - elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"): - self.mass.create_task(self.mass.player_queues.pause(active_queue.queue_id)) - elif "dmcp.device-volume=" in path: - if mass_player.device_info.manufacturer.lower() == "apple": - # Apple devices only report their (new) volume level, they dont request it - return - raop_volume = float(path.split("dmcp.device-volume=", 1)[-1]) - volume = convert_airplay_volume(raop_volume) - if volume != mass_player.volume_level: - self.mass.create_task(self.cmd_volume_set(player_id, volume)) - # optimistically set the new volume to prevent bouncing around - mass_player.volume_level = volume - elif "dmcp.volume=" in path: - volume = int(path.split("dmcp.volume=", 1)[-1]) - if volume != mass_player.volume_level: - self.mass.create_task(self.cmd_volume_set(player_id, volume)) - # optimistically set the new volume to prevent bouncing around - mass_player.volume_level = volume - elif "device-prevent-playback=1" in path: - # device switched to another source (or is powered off) - if raop_stream := airplay_player.raop_stream: - # ignore this if we just started playing to prevent false positives - if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING: - raop_stream.prevent_playback = True - self.mass.create_task(self.monitor_prevent_playback(player_id)) - elif "device-prevent-playback=0" in path: - # device reports that its ready for playback again - if raop_stream := airplay_player.raop_stream: - raop_stream.prevent_playback = False - - # send response - date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S") - response = ( - f"HTTP/1.0 204 No Content\r\nDate: {date_str} " - "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: " - "application/x-dmap-tagged\r\nContent-Length: 0\r\n" - "Connection: close\r\n\r\n" - ) - writer.write(response.encode()) - await writer.drain() - finally: - writer.close() - - async def monitor_prevent_playback(self, player_id: str): - """Monitor the prevent playback state of an airplay player.""" - count = 0 - if not (airplay_player := self._players.get(player_id)): - return - prev_active_remote_id = airplay_player.raop_stream.active_remote_id - while count < 40: - count += 1 - if not (airplay_player := self._players.get(player_id)): - return - if not (raop_stream := airplay_player.raop_stream): - return - if raop_stream.active_remote_id != prev_active_remote_id: - # checksum - return - if not raop_stream.prevent_playback: - return - await asyncio.sleep(0.5) - - airplay_player.logger.info( - "Player has been in prevent playback mode for too long, powering off.", - ) - await self.mass.players.cmd_power(airplay_player.player_id, False) +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return AirplayProvider(mass, manifest, config) diff --git a/music_assistant/server/providers/airplay/const.py b/music_assistant/server/providers/airplay/const.py new file mode 100644 index 000000000..3d9ecd8d0 --- /dev/null +++ b/music_assistant/server/providers/airplay/const.py @@ -0,0 +1,31 @@ +"""Constants for the AirPlay provider.""" + +from __future__ import annotations + +from music_assistant.common.models.enums import ContentType +from music_assistant.common.models.media_items import AudioFormat + +DOMAIN = "airplay" + +CONF_ENCRYPTION = "encryption" +CONF_ALAC_ENCODE = "alac_encode" +CONF_VOLUME_START = "volume_start" +CONF_PASSWORD = "password" +CONF_BIND_INTERFACE = "bind_interface" +CONF_READ_AHEAD_BUFFER = "read_ahead_buffer" + +BACKOFF_TIME_LOWER_LIMIT = 15 # seconds +BACKOFF_TIME_UPPER_LIMIT = 300 # Five minutes + +CONF_CREDENTIALS = "credentials" +CACHE_KEY_PREV_VOLUME = "airplay_prev_volume" +FALLBACK_VOLUME = 20 + +AIRPLAY_FLOW_PCM_FORMAT = AudioFormat( + content_type=ContentType.PCM_F32LE, + sample_rate=44100, + bit_depth=32, +) +AIRPLAY_PCM_FORMAT = AudioFormat( + content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16 +) diff --git a/music_assistant/server/providers/airplay/helpers.py b/music_assistant/server/providers/airplay/helpers.py new file mode 100644 index 000000000..fe8f51807 --- /dev/null +++ b/music_assistant/server/providers/airplay/helpers.py @@ -0,0 +1,52 @@ +"""Various helpers/utilities for the Airplay provider.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from zeroconf import IPVersion + +if TYPE_CHECKING: + from zeroconf.asyncio import AsyncServiceInfo + + +def convert_airplay_volume(value: float) -> int: + """Remap Airplay Volume to 0..100 scale.""" + airplay_min = -30 + airplay_max = 0 + normal_min = 0 + normal_max = 100 + portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min) + return int(portion + normal_min) + + +def get_model_from_am(am_property: str | None) -> tuple[str, str]: + """Return Manufacturer and Model name from mdns AM property.""" + manufacturer = "Unknown" + model = "Generic Airplay device" + if not am_property: + return (manufacturer, model) + if isinstance(am_property, bytes): + am_property = am_property.decode("utf-8") + if am_property == "AudioAccessory5,1": + model = "HomePod" + manufacturer = "Apple" + elif "AppleTV" in am_property: + model = "Apple TV" + manufacturer = "Apple" + else: + model = am_property + return (manufacturer, model) + + +def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None: + """Get primary IP address from zeroconf discovery info.""" + for address in discovery_info.parsed_addresses(IPVersion.V4Only): + if address.startswith("127"): + # filter out loopback address + continue + if address.startswith("169.254"): + # filter out APIPA address + continue + return address + return None diff --git a/music_assistant/server/providers/airplay/player.py b/music_assistant/server/providers/airplay/player.py new file mode 100644 index 000000000..48e3a1b3d --- /dev/null +++ b/music_assistant/server/providers/airplay/player.py @@ -0,0 +1,49 @@ +"""AirPlay Player definition.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from music_assistant.common.models.enums import PlayerState + +if TYPE_CHECKING: + from zeroconf.asyncio import AsyncServiceInfo + + from .provider import AirplayProvider + from .raop import RaopStream + + +class AirPlayPlayer: + """Holds the details of the (discovered) Airplay (RAOP) player.""" + + def __init__( + self, prov: AirplayProvider, player_id: str, discovery_info: AsyncServiceInfo, address: str + ) -> None: + """Initialize AirPlayPlayer.""" + self.prov = prov + self.mass = prov.mass + self.player_id = player_id + self.discovery_info = discovery_info + self.address = address + self.logger = prov.logger.getChild(player_id) + self.raop_stream: RaopStream | None = None + + async def cmd_stop(self, update_state: bool = True) -> None: + """Send STOP command to player.""" + if self.raop_stream: + # forward stop to the entire stream session + await self.raop_stream.session.stop() + if update_state and (mass_player := self.mass.players.get(self.player_id)): + mass_player.state = PlayerState.IDLE + self.mass.players.update(mass_player.player_id) + + async def cmd_play(self) -> None: + """Send PLAY (unpause) command to player.""" + if self.raop_stream and self.raop_stream.running: + await self.raop_stream.send_cli_command("ACTION=PLAY") + + async def cmd_pause(self) -> None: + """Send PAUSE command to player.""" + if not self.raop_stream or not self.raop_stream.running: + return + await self.raop_stream.send_cli_command("ACTION=PAUSE") diff --git a/music_assistant/server/providers/airplay/provider.py b/music_assistant/server/providers/airplay/provider.py new file mode 100644 index 000000000..51276baf7 --- /dev/null +++ b/music_assistant/server/providers/airplay/provider.py @@ -0,0 +1,621 @@ +"""Airplay Player provider for Music Assistant.""" + +from __future__ import annotations + +import asyncio +import os +import platform +import socket +from random import randrange +from typing import TYPE_CHECKING + +from zeroconf import ServiceStateChange +from zeroconf.asyncio import AsyncServiceInfo + +from music_assistant.common.helpers.datetime import utc +from music_assistant.common.helpers.util import get_ip_pton, select_free_port +from music_assistant.common.models.config_entries import ( + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, + CONF_ENTRY_EQ_BASS, + CONF_ENTRY_EQ_MID, + CONF_ENTRY_EQ_TREBLE, + CONF_ENTRY_FLOW_MODE_ENFORCED, + CONF_ENTRY_OUTPUT_CHANNELS, + CONF_ENTRY_SYNC_ADJUST, + ConfigEntry, + create_sample_rates_config_entry, +) +from music_assistant.common.models.enums import ( + ConfigEntryType, + ContentType, + MediaType, + PlayerFeature, + PlayerState, + PlayerType, + ProviderFeature, +) +from music_assistant.common.models.media_items import AudioFormat +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia +from music_assistant.server.helpers.audio import get_ffmpeg_stream +from music_assistant.server.helpers.process import check_output +from music_assistant.server.helpers.util import TaskManager, lock +from music_assistant.server.models.player_provider import PlayerProvider +from music_assistant.server.providers.airplay.raop import RaopStreamSession + +from .const import ( + AIRPLAY_FLOW_PCM_FORMAT, + AIRPLAY_PCM_FORMAT, + CACHE_KEY_PREV_VOLUME, + CONF_ALAC_ENCODE, + CONF_ENCRYPTION, + CONF_PASSWORD, + CONF_READ_AHEAD_BUFFER, + FALLBACK_VOLUME, +) +from .helpers import convert_airplay_volume, get_model_from_am, get_primary_ip_address +from .player import AirPlayPlayer + +if TYPE_CHECKING: + from music_assistant.server.providers.player_group import PlayerGroupProvider + + +PLAYER_CONFIG_ENTRIES = ( + CONF_ENTRY_FLOW_MODE_ENFORCED, + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, + CONF_ENTRY_EQ_BASS, + CONF_ENTRY_EQ_MID, + CONF_ENTRY_EQ_TREBLE, + CONF_ENTRY_OUTPUT_CHANNELS, + ConfigEntry( + key=CONF_ENCRYPTION, + type=ConfigEntryType.BOOLEAN, + default_value=False, + label="Enable encryption", + description="Enable encrypted communication with the player, " + "some (3rd party) players require this.", + category="airplay", + ), + ConfigEntry( + key=CONF_ALAC_ENCODE, + type=ConfigEntryType.BOOLEAN, + default_value=True, + label="Enable compression", + description="Save some network bandwidth by sending the audio as " + "(lossless) ALAC at the cost of a bit CPU.", + category="airplay", + ), + CONF_ENTRY_SYNC_ADJUST, + ConfigEntry( + key=CONF_PASSWORD, + type=ConfigEntryType.SECURE_STRING, + default_value=None, + required=False, + label="Device password", + description="Some devices require a password to connect/play.", + category="airplay", + ), + ConfigEntry( + key=CONF_READ_AHEAD_BUFFER, + type=ConfigEntryType.INTEGER, + default_value=1000, + required=False, + label="Audio buffer (ms)", + description="Amount of buffer (in milliseconds), " + "the player should keep to absorb network throughput jitter. " + "If you experience audio dropouts, try increasing this value.", + category="airplay", + range=(500, 3000), + ), + # airplay has fixed sample rate/bit depth so make this config entry static and hidden + create_sample_rates_config_entry(44100, 16, 44100, 16, True), +) + + +# TODO: Airplay provider +# - Implement authentication for Apple TV +# - Implement volume control for Apple devices using pyatv +# - Implement metadata for Apple Apple devices using pyatv +# - Use pyatv for communicating with original Apple devices (and use cliraop for actual streaming) +# - Implement Airplay 2 support +# - Implement late joining to existing stream (instead of restarting it) + + +class AirplayProvider(PlayerProvider): + """Player provider for Airplay based players.""" + + cliraop_bin: str | None = None + _players: dict[str, AirPlayPlayer] + _dacp_server: asyncio.Server = None + _dacp_info: AsyncServiceInfo = None + _play_media_lock: asyncio.Lock = asyncio.Lock() + + @property + def supported_features(self) -> tuple[ProviderFeature, ...]: + """Return the features supported by this Provider.""" + return (ProviderFeature.SYNC_PLAYERS,) + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + self._players = {} + self.cliraop_bin = await self._getcliraop_binary() + dacp_port = await select_free_port(39831, 49831) + self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}" + self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port) + self._dacp_server = await asyncio.start_server( + self._handle_dacp_request, "0.0.0.0", dacp_port + ) + zeroconf_type = "_dacp._tcp.local." + server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}" + self._dacp_info = AsyncServiceInfo( + zeroconf_type, + name=server_id, + addresses=[await get_ip_pton(self.mass.streams.publish_ip)], + port=dacp_port, + properties={ + "txtvers": "1", + "Ver": "63B5E5C0C201542E", + "DbId": "63B5E5C0C201542E", + "OSsi": "0x1F5", + }, + server=f"{socket.gethostname()}.local", + ) + await self.mass.aiozc.async_register_service(self._dacp_info) + + async def on_mdns_service_state_change( + self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None + ) -> None: + """Handle MDNS service state callback.""" + raw_id, display_name = name.split(".")[0].split("@", 1) + player_id = f"ap{raw_id.lower()}" + # handle removed player + if state_change == ServiceStateChange.Removed: + if mass_player := self.mass.players.get(player_id): + if not mass_player.available: + return + # the player has become unavailable + self.logger.debug("Player offline: %s", display_name) + mass_player.available = False + self.mass.players.update(player_id) + return + # handle update for existing device + if airplay_player := self._players.get(player_id): + if mass_player := self.mass.players.get(player_id): + cur_address = get_primary_ip_address(info) + if cur_address and cur_address != airplay_player.address: + airplay_player.logger.debug( + "Address updated from %s to %s", airplay_player.address, cur_address + ) + airplay_player.address = cur_address + mass_player.device_info = DeviceInfo( + model=mass_player.device_info.model, + manufacturer=mass_player.device_info.manufacturer, + address=str(cur_address), + ) + if not mass_player.available: + self.logger.debug("Player back online: %s", display_name) + mass_player.available = True + # always update the latest discovery info + airplay_player.discovery_info = info + self.mass.players.update(player_id) + return + # handle new player + await self._setup_player(player_id, display_name, info) + + async def unload(self) -> None: + """Handle close/cleanup of the provider.""" + # power off all players (will disconnect and close cliraop) + for player_id in self._players: + await self.cmd_power(player_id, False) + # shutdown DACP server + if self._dacp_server: + self._dacp_server.close() + # shutdown DACP zeroconf service + if self._dacp_info: + await self.mass.aiozc.async_unregister_service(self._dacp_info) + + async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: + """Return all (provider/player specific) Config Entries for the given player (if any).""" + base_entries = await super().get_player_config_entries(player_id) + return (*base_entries, *PLAYER_CONFIG_ENTRIES) + + async def cmd_stop(self, player_id: str) -> None: + """Send STOP command to given player. + + - player_id: player_id of the player to handle the command. + """ + if airplay_player := self._players.get(player_id): + await airplay_player.cmd_stop() + + async def cmd_play(self, player_id: str) -> None: + """Send PLAY (unpause) command to given player. + + - player_id: player_id of the player to handle the command. + """ + if airplay_player := self._players.get(player_id): + await airplay_player.cmd_play() + + async def cmd_pause(self, player_id: str) -> None: + """Send PAUSE command to given player. + + - player_id: player_id of the player to handle the command. + """ + player = self.mass.players.get(player_id) + if player.group_childs: + # pause is not supported while synced, use stop instead + self.logger.debug("Player is synced, using STOP instead of PAUSE") + await self.cmd_stop(player_id) + return + airplay_player = self._players[player_id] + await airplay_player.cmd_pause() + + @lock + async def play_media( + self, + player_id: str, + media: PlayerMedia, + ) -> None: + """Handle PLAY MEDIA on given player.""" + async with self._play_media_lock: + player = self.mass.players.get(player_id) + # set the active source for the player to the media queue + # this accounts for syncgroups and linked players (e.g. sonos) + player.active_source = media.queue_id + if player.synced_to: + # should not happen, but just in case + raise RuntimeError("Player is synced") + # always stop existing stream first + async with TaskManager(self.mass) as tg: + for airplay_player in self._get_sync_clients(player_id): + tg.create_task(airplay_player.cmd_stop(update_state=False)) + # select audio source + if media.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + input_format = AIRPLAY_PCM_FORMAT + audio_source = self.mass.streams.get_announcement_stream( + media.custom_data["url"], + output_format=AIRPLAY_PCM_FORMAT, + use_pre_announce=media.custom_data["use_pre_announce"], + ) + elif media.queue_id.startswith("ugp_"): + # special case: UGP stream + ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") + ugp_stream = ugp_provider.ugp_streams[media.queue_id] + input_format = ugp_stream.output_format + audio_source = ugp_stream.subscribe() + elif media.queue_id and media.queue_item_id: + # regular queue (flow) stream request + input_format = AIRPLAY_FLOW_PCM_FORMAT + audio_source = self.mass.streams.get_flow_stream( + queue=self.mass.player_queues.get(media.queue_id), + start_queue_item=self.mass.player_queues.get_item( + media.queue_id, media.queue_item_id + ), + pcm_format=input_format, + ) + else: + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + input_format = AIRPLAY_PCM_FORMAT + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(ContentType.try_parse(media.uri)), + output_format=AIRPLAY_PCM_FORMAT, + ) + # setup RaopStreamSession for player (and its sync childs if any) + sync_clients = self._get_sync_clients(player_id) + raop_stream_session = RaopStreamSession(self, sync_clients, input_format, audio_source) + await raop_stream_session.start() + + async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: + """Send VOLUME_SET command to given player. + + - player_id: player_id of the player to handle the command. + - volume_level: volume level (0..100) to set on the player. + """ + airplay_player = self._players[player_id] + if airplay_player.raop_stream and airplay_player.raop_stream.running: + await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n") + mass_player = self.mass.players.get(player_id) + mass_player.volume_level = volume_level + mass_player.volume_muted = volume_level == 0 + self.mass.players.update(player_id) + # store last state in cache + await self.mass.cache.set(player_id, volume_level, base_key=CACHE_KEY_PREV_VOLUME) + + @lock + async def cmd_sync(self, player_id: str, target_player: str) -> None: + """Handle SYNC command for given player. + + Join/add the given player(id) to the given (master) player/sync group. + + - player_id: player_id of the player to handle the command. + - target_player: player_id of the syncgroup master or group player. + """ + if player_id == target_player: + return + child_player = self.mass.players.get(player_id) + assert child_player # guard + parent_player = self.mass.players.get(target_player) + assert parent_player # guard + if parent_player.synced_to: + raise RuntimeError("Player is already synced") + if child_player.synced_to and child_player.synced_to != target_player: + raise RuntimeError("Player is already synced to another player") + if player_id in parent_player.group_childs: + # nothing to do: player is already part of the group + return + # ensure the child does not have an existing steam session active + if airplay_player := self._players.get(player_id): + if airplay_player.raop_stream and airplay_player.raop_stream.running: + await airplay_player.raop_stream.session.remove_client(airplay_player) + # always make sure that the parent player is part of the sync group + parent_player.group_childs.add(parent_player.player_id) + parent_player.group_childs.add(child_player.player_id) + child_player.synced_to = parent_player.player_id + # mark players as powered + parent_player.powered = True + child_player.powered = True + # check if we should (re)start or join a stream session + active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id) + if active_queue.state == PlayerState.PLAYING: + # playback needs to be restarted to form a new multi client stream session + # this could potentially be called by multiple players at the exact same time + # so we debounce the resync a bit here with a timer + self.mass.call_later( + 1, + self.mass.player_queues.resume, + active_queue.queue_id, + fade_in=False, + task_id=f"resume_{active_queue.queue_id}", + ) + else: + # make sure that the player manager gets an update + self.mass.players.update(child_player.player_id, skip_forward=True) + self.mass.players.update(parent_player.player_id, skip_forward=True) + + @lock + async def cmd_unsync(self, player_id: str) -> None: + """Handle UNSYNC command for given player. + + Remove the given player from any syncgroups it currently is synced to. + + - player_id: player_id of the player to handle the command. + """ + mass_player = self.mass.players.get(player_id, raise_unavailable=True) + if not mass_player.synced_to: + return + ap_player = self._players[player_id] + if ap_player.raop_stream and ap_player.raop_stream.running: + await ap_player.raop_stream.session.remove_client(ap_player) + group_leader = self.mass.players.get(mass_player.synced_to, raise_unavailable=True) + if player_id in group_leader.group_childs: + group_leader.group_childs.remove(player_id) + mass_player.synced_to = None + airplay_player = self._players.get(player_id) + await airplay_player.cmd_stop() + # make sure that the player manager gets an update + self.mass.players.update(mass_player.player_id, skip_forward=True) + self.mass.players.update(group_leader.player_id, skip_forward=True) + + async def _getcliraop_binary(self): + """Find the correct raop/airplay binary belonging to the platform.""" + # ruff: noqa: SIM102 + if self.cliraop_bin is not None: + return self.cliraop_bin + + async def check_binary(cliraop_path: str) -> str | None: + try: + returncode, output = await check_output( + cliraop_path, + "-check", + ) + if returncode == 0 and output.strip().decode() == "cliraop check": + self.cliraop_bin = cliraop_path + return cliraop_path + except OSError: + return None + + base_path = os.path.join(os.path.dirname(__file__), "bin") + system = platform.system().lower().replace("darwin", "macos") + architecture = platform.machine().lower() + + if bridge_binary := await check_binary( + os.path.join(base_path, f"cliraop-{system}-{architecture}") + ): + return bridge_binary + + msg = f"Unable to locate RAOP Play binary for {system}/{architecture}" + raise RuntimeError(msg) + + def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]: + """Get all sync clients for a player.""" + mass_player = self.mass.players.get(player_id, True) + sync_clients: list[AirPlayPlayer] = [] + # we need to return the player itself too + group_child_ids = {player_id} + group_child_ids.update(mass_player.group_childs) + for child_id in group_child_ids: + if client := self._players.get(child_id): + sync_clients.append(client) + return sync_clients + + async def _setup_player( + self, player_id: str, display_name: str, info: AsyncServiceInfo + ) -> None: + """Handle setup of a new player that is discovered using mdns.""" + address = get_primary_ip_address(info) + if address is None: + return + self.logger.debug("Discovered Airplay device %s on %s", display_name, address) + manufacturer, model = get_model_from_am(info.decoded_properties.get("am")) + if "apple tv" in model.lower(): + # For now, we ignore the Apple TV until we implement the authentication. + # maybe we can simply use pyatv only for this part? + # the cliraop application has already been prepared to accept the secret. + self.logger.debug( + "Ignoring %s in discovery due to authentication requirement.", display_name + ) + return + if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True): + self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name) + return + self._players[player_id] = AirPlayPlayer(self, player_id, info, address) + if not (volume := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_VOLUME)): + volume = FALLBACK_VOLUME + mass_player = Player( + player_id=player_id, + provider=self.instance_id, + type=PlayerType.PLAYER, + name=display_name, + available=True, + powered=False, + device_info=DeviceInfo( + model=model, + manufacturer=manufacturer, + address=address, + ), + supported_features=( + PlayerFeature.PAUSE, + PlayerFeature.SYNC, + PlayerFeature.VOLUME_SET, + ), + volume_level=volume, + ) + await self.mass.players.register_or_update(mass_player) + + async def _handle_dacp_request( # noqa: PLR0915 + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + """Handle new connection on the socket.""" + try: + raw_request = b"" + while recv := await reader.read(1024): + raw_request += recv + if len(recv) < 1024: + break + + request = raw_request.decode("UTF-8") + if "\r\n\r\n" in request: + headers_raw, body = request.split("\r\n\r\n", 1) + else: + headers_raw = request + body = "" + headers_raw = headers_raw.split("\r\n") + headers = {} + for line in headers_raw[1:]: + if ":" not in line: + continue + x, y = line.split(":", 1) + headers[x.strip()] = y.strip() + active_remote = headers.get("Active-Remote") + _, path, _ = headers_raw[0].split(" ") + airplay_player = next( + ( + x + for x in self._players.values() + if x.raop_stream and x.raop_stream.active_remote_id == active_remote + ), + None, + ) + self.logger.debug( + "DACP request for %s (%s): %s -- %s", + airplay_player.discovery_info.name if airplay_player else "UNKNOWN PLAYER", + active_remote, + path, + body, + ) + if not airplay_player: + return + + player_id = airplay_player.player_id + mass_player = self.mass.players.get(player_id) + active_queue = self.mass.player_queues.get_active_queue(player_id) + if path == "/ctrl-int/1/nextitem": + self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id)) + elif path == "/ctrl-int/1/previtem": + self.mass.create_task(self.mass.player_queues.previous(active_queue.queue_id)) + elif path == "/ctrl-int/1/play": + self.mass.create_task(self.mass.player_queues.play(active_queue.queue_id)) + elif path == "/ctrl-int/1/playpause": + self.mass.create_task(self.mass.player_queues.play_pause(active_queue.queue_id)) + elif path == "/ctrl-int/1/stop": + self.mass.create_task(self.mass.player_queues.stop(active_queue.queue_id)) + elif path == "/ctrl-int/1/volumeup": + self.mass.create_task(self.mass.players.cmd_volume_up(player_id)) + elif path == "/ctrl-int/1/volumedown": + self.mass.create_task(self.mass.players.cmd_volume_down(player_id)) + elif path == "/ctrl-int/1/shuffle_songs": + queue = self.mass.player_queues.get(player_id) + self.mass.loop.call_soon( + self.mass.player_queues.set_shuffle( + active_queue.queue_id, not queue.shuffle_enabled + ) + ) + elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"): + self.mass.create_task(self.mass.player_queues.pause(active_queue.queue_id)) + elif "dmcp.device-volume=" in path: + if mass_player.device_info.manufacturer.lower() == "apple": + # Apple devices only report their (new) volume level, they dont request it + return + raop_volume = float(path.split("dmcp.device-volume=", 1)[-1]) + volume = convert_airplay_volume(raop_volume) + if volume != mass_player.volume_level: + self.mass.create_task(self.cmd_volume_set(player_id, volume)) + # optimistically set the new volume to prevent bouncing around + mass_player.volume_level = volume + elif "dmcp.volume=" in path: + volume = int(path.split("dmcp.volume=", 1)[-1]) + if volume != mass_player.volume_level: + self.mass.create_task(self.cmd_volume_set(player_id, volume)) + # optimistically set the new volume to prevent bouncing around + mass_player.volume_level = volume + elif "device-prevent-playback=1" in path: + # device switched to another source (or is powered off) + if raop_stream := airplay_player.raop_stream: + # ignore this if we just started playing to prevent false positives + if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING: + raop_stream.prevent_playback = True + self.mass.create_task(self.monitor_prevent_playback(player_id)) + elif "device-prevent-playback=0" in path: + # device reports that its ready for playback again + if raop_stream := airplay_player.raop_stream: + raop_stream.prevent_playback = False + + # send response + date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S") + response = ( + f"HTTP/1.0 204 No Content\r\nDate: {date_str} " + "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: " + "application/x-dmap-tagged\r\nContent-Length: 0\r\n" + "Connection: close\r\n\r\n" + ) + writer.write(response.encode()) + await writer.drain() + finally: + writer.close() + + async def monitor_prevent_playback(self, player_id: str): + """Monitor the prevent playback state of an airplay player.""" + count = 0 + if not (airplay_player := self._players.get(player_id)): + return + prev_active_remote_id = airplay_player.raop_stream.active_remote_id + while count < 40: + count += 1 + if not (airplay_player := self._players.get(player_id)): + return + if not (raop_stream := airplay_player.raop_stream): + return + if raop_stream.active_remote_id != prev_active_remote_id: + # checksum + return + if not raop_stream.prevent_playback: + return + await asyncio.sleep(0.5) + + airplay_player.logger.info( + "Player has been in prevent playback mode for too long, powering off.", + ) + await self.mass.players.cmd_power(airplay_player.player_id, False) diff --git a/music_assistant/server/providers/airplay/raop.py b/music_assistant/server/providers/airplay/raop.py new file mode 100644 index 000000000..3424750eb --- /dev/null +++ b/music_assistant/server/providers/airplay/raop.py @@ -0,0 +1,399 @@ +"""Logic for RAOP (AirPlay 1) audio streaming to Airplay devices.""" + +from __future__ import annotations + +import asyncio +import logging +import os +import platform +import time +from collections.abc import AsyncGenerator +from contextlib import suppress +from random import randint +from typing import TYPE_CHECKING + +from music_assistant.common.models.enums import PlayerState +from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL +from music_assistant.server.helpers.audio import get_player_filter_params +from music_assistant.server.helpers.ffmpeg import FFMpeg +from music_assistant.server.helpers.process import AsyncProcess, check_output +from music_assistant.server.helpers.util import close_async_generator + +from .const import ( + AIRPLAY_PCM_FORMAT, + CONF_ALAC_ENCODE, + CONF_BIND_INTERFACE, + CONF_ENCRYPTION, + CONF_PASSWORD, + CONF_READ_AHEAD_BUFFER, +) + +if TYPE_CHECKING: + from music_assistant.common.models.media_items import AudioFormat + from music_assistant.common.models.player_queue import PlayerQueue + + from .player import AirPlayPlayer + from .provider import AirplayProvider + + +class RaopStreamSession: + """Object that holds the details of a (RAOP) stream session to one or more players.""" + + def __init__( + self, + airplay_provider: AirplayProvider, + sync_clients: list[AirPlayPlayer], + input_format: AudioFormat, + audio_source: AsyncGenerator[bytes, None], + ) -> None: + """Initialize RaopStreamSession.""" + assert sync_clients + self.prov = airplay_provider + self.mass = airplay_provider.mass + self.input_format = input_format + self._sync_clients = sync_clients + self._audio_source = audio_source + self._audio_source_task: asyncio.Task | None = None + self._stopped: bool = False + self._lock = asyncio.Lock() + + async def start(self) -> None: + """Initialize RaopStreamSession.""" + # initialize raop stream for all players + for airplay_player in self._sync_clients: + if airplay_player.raop_stream and airplay_player.raop_stream.running: + raise RuntimeError("Player already has an active stream") + airplay_player.raop_stream = RaopStream(self, airplay_player) + + async def audio_streamer() -> None: + """Stream audio to all players.""" + generator_exhausted = False + try: + async for chunk in self._audio_source: + if not self._sync_clients: + return + async with self._lock: + await asyncio.gather( + *[x.raop_stream.write_chunk(chunk) for x in self._sync_clients], + return_exceptions=True, + ) + # entire stream consumed: send EOF + generator_exhausted = True + async with self._lock: + await asyncio.gather( + *[x.raop_stream.write_eof() for x in self._sync_clients], + return_exceptions=True, + ) + finally: + if not generator_exhausted: + await close_async_generator(self._audio_source) + + # get current ntp and start RaopStream per player + _, stdout = await check_output(self.prov.cliraop_bin, "-ntp") + start_ntp = int(stdout.strip()) + wait_start = 1500 + (250 * len(self._sync_clients)) + async with self._lock: + await asyncio.gather( + *[x.raop_stream.start(start_ntp, wait_start) for x in self._sync_clients], + return_exceptions=True, + ) + self._audio_source_task = asyncio.create_task(audio_streamer()) + + async def stop(self) -> None: + """Stop playback and cleanup.""" + if self._stopped: + return + self._stopped = True + if self._audio_source_task and not self._audio_source_task.done(): + self._audio_source_task.cancel() + await asyncio.gather( + *[self.remove_client(x) for x in self._sync_clients], + return_exceptions=True, + ) + + async def remove_client(self, airplay_player: AirPlayPlayer) -> None: + """Remove a sync client from the session.""" + if airplay_player not in self._sync_clients: + return + assert airplay_player.raop_stream.session == self + async with self._lock: + self._sync_clients.remove(airplay_player) + await airplay_player.raop_stream.stop() + airplay_player.raop_stream = None + + async def add_client(self, airplay_player: AirPlayPlayer) -> None: + """Add a sync client to the session.""" + # TODO: Add the ability to add a new client to an existing session + # e.g. by counting the number of frames sent etc. + raise NotImplementedError("Adding clients to a session is not yet supported") + + +class RaopStream: + """ + RAOP (Airplay 1) Audio Streamer. + + Python is not suitable for realtime audio streaming so we do the actual streaming + of (RAOP) audio using a small executable written in C based on libraop to do + the actual timestamped playback, which reads pcm audio from stdin + and we can send some interactive commands using a named pipe. + """ + + def __init__( + self, + session: RaopStreamSession, + airplay_player: AirPlayPlayer, + ) -> None: + """Initialize RaopStream.""" + self.session = session + self.prov = session.prov + self.mass = session.prov.mass + self.airplay_player = airplay_player + + # always generate a new active remote id to prevent race conditions + # with the named pipe used to send audio + self.active_remote_id: str = str(randint(1000, 8000)) + self.prevent_playback: bool = False + self._log_reader_task: asyncio.Task | None = None + self._cliraop_proc: AsyncProcess | None = None + self._ffmpeg_proc: AsyncProcess | None = None + self._started = asyncio.Event() + self._stopped = False + + @property + def running(self) -> bool: + """Return boolean if this stream is running.""" + return not self._stopped and self._started.is_set() + + async def start(self, start_ntp: int, wait_start: int = 1000) -> None: + """Initialize CLIRaop process for a player.""" + extra_args = [] + player_id = self.airplay_player.player_id + mass_player = self.mass.players.get(player_id) + bind_ip = await self.mass.config.get_provider_config_value( + self.prov.instance_id, CONF_BIND_INTERFACE + ) + extra_args += ["-if", bind_ip] + if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False): + extra_args += ["-encrypt"] + if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True): + extra_args += ["-alac"] + for prop in ("et", "md", "am", "pk", "pw"): + if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop): + extra_args += [f"-{prop}", prop_value] + sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0) + if device_password := self.mass.config.get_raw_player_config_value( + player_id, CONF_PASSWORD, None + ): + extra_args += ["-password", device_password] + if self.prov.logger.isEnabledFor(logging.DEBUG): + extra_args += ["-debug", "5"] + elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): + extra_args += ["-debug", "10"] + read_ahead = await self.mass.config.get_player_config_value( + player_id, CONF_READ_AHEAD_BUFFER + ) + + # create os pipes to pipe ffmpeg to cliraop + read, write = await asyncio.to_thread(os.pipe) + + # ffmpeg handles the player specific stream + filters and pipes + # audio to the cliraop process + self._ffmpeg_proc = FFMpeg( + audio_input="-", + input_format=self.session.input_format, + output_format=AIRPLAY_PCM_FORMAT, + filter_params=get_player_filter_params(self.mass, player_id), + audio_output=write, + ) + await self._ffmpeg_proc.start() + await asyncio.to_thread(os.close, write) + + # cliraop is the binary that handles the actual raop streaming to the player + cliraop_args = [ + self.prov.cliraop_bin, + "-ntpstart", + str(start_ntp), + "-port", + str(self.airplay_player.discovery_info.port), + "-wait", + str(wait_start - sync_adjust), + "-latency", + str(read_ahead), + "-volume", + str(mass_player.volume_level), + *extra_args, + "-dacp", + self.prov.dacp_id, + "-activeremote", + self.active_remote_id, + "-udn", + self.airplay_player.discovery_info.name, + self.airplay_player.address, + "-", + ] + self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop") + if platform.system() == "Darwin": + os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" + await self._cliraop_proc.start() + await asyncio.to_thread(os.close, read) + self._started.set() + self._log_reader_task = self.mass.create_task(self._log_watcher()) + + async def stop(self): + """Stop playback and cleanup.""" + if self._stopped: + return + if self._cliraop_proc.proc and not self._cliraop_proc.closed: + await self.send_cli_command("ACTION=STOP") + self._stopped = True # set after send_cli command! + if self._cliraop_proc.proc and not self._cliraop_proc.closed: + await self._cliraop_proc.close(True) + if self._ffmpeg_proc and not self._ffmpeg_proc.closed: + await self._ffmpeg_proc.close(True) + self._cliraop_proc = None + self._ffmpeg_proc = None + + async def write_chunk(self, chunk: bytes) -> None: + """Write a (pcm) audio chunk.""" + if self._stopped: + return + await self._started.wait() + await self._ffmpeg_proc.write(chunk) + + async def write_eof(self) -> None: + """Write EOF.""" + if self._stopped: + return + await self._started.wait() + await self._ffmpeg_proc.write_eof() + + async def send_cli_command(self, command: str) -> None: + """Send an interactive command to the running CLIRaop binary.""" + if self._stopped: + return + await self._started.wait() + + if not command.endswith("\n"): + command += "\n" + + def send_data(): + with suppress(BrokenPipeError), open(named_pipe, "w") as f: + f.write(command) + + named_pipe = f"/tmp/raop-{self.active_remote_id}" # noqa: S108 + self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) + await asyncio.to_thread(send_data) + + async def _log_watcher(self) -> None: + """Monitor stderr for the running CLIRaop process.""" + airplay_player = self.airplay_player + mass_player = self.mass.players.get(airplay_player.player_id) + queue = self.mass.player_queues.get_active_queue(mass_player.active_source) + logger = airplay_player.logger + lost_packets = 0 + prev_metadata_checksum: str = "" + prev_progress_report: float = 0 + async for line in self._cliraop_proc.iter_stderr(): + if "elapsed milliseconds:" in line: + # this is received more or less every second while playing + millis = int(line.split("elapsed milliseconds: ")[1]) + mass_player.elapsed_time = millis / 1000 + mass_player.elapsed_time_last_updated = time.time() + # send metadata to player(s) if needed + # NOTE: this must all be done in separate tasks to not disturb audio + now = time.time() + if ( + mass_player.elapsed_time > 2 + and queue + and queue.current_item + and queue.current_item.streamdetails + ): + metadata_checksum = ( + queue.current_item.streamdetails.stream_title + or queue.current_item.queue_item_id + ) + if prev_metadata_checksum != metadata_checksum: + prev_metadata_checksum = metadata_checksum + prev_progress_report = now + self.mass.create_task(self._send_metadata(queue)) + # send the progress report every 5 seconds + elif now - prev_progress_report >= 5: + prev_progress_report = now + self.mass.create_task(self._send_progress(queue)) + if "set pause" in line or "Pause at" in line: + mass_player.state = PlayerState.PAUSED + self.mass.players.update(airplay_player.player_id) + if "Restarted at" in line or "restarting w/ pause" in line: + mass_player.state = PlayerState.PLAYING + self.mass.players.update(airplay_player.player_id) + if "restarting w/o pause" in line: + # streaming has started + mass_player.state = PlayerState.PLAYING + mass_player.elapsed_time = 0 + mass_player.elapsed_time_last_updated = time.time() + self.mass.players.update(airplay_player.player_id) + if "lost packet out of backlog" in line: + lost_packets += 1 + if lost_packets == 100: + logger.error("High packet loss detected, restarting playback...") + self.mass.create_task(self.mass.player_queues.resume(queue.queue_id)) + else: + logger.warning("Packet loss detected!") + if "end of stream reached" in line: + logger.debug("End of stream reached") + break + + logger.log(VERBOSE_LOG_LEVEL, line) + + # if we reach this point, the process exited + if airplay_player.raop_stream == self: + mass_player.state = PlayerState.IDLE + self.mass.players.update(airplay_player.player_id) + # ensure we're cleaned up afterwards (this also logs the returncode) + await self.stop() + + async def _send_metadata(self, queue: PlayerQueue) -> None: + """Send metadata to player (and connected sync childs).""" + if not queue or not queue.current_item: + return + duration = min(queue.current_item.duration or 0, 3600) + title = queue.current_item.name + artist = "" + album = "" + if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title: + # stream title from radio station + stream_title = queue.current_item.streamdetails.stream_title + if " - " in stream_title: + artist, title = stream_title.split(" - ", 1) + else: + title = stream_title + # set album to radio station name + album = queue.current_item.name + elif media_item := queue.current_item.media_item: + title = media_item.name + if artist_str := getattr(media_item, "artist_str", None): + artist = artist_str + if _album := getattr(media_item, "album", None): + album = _album.name + + cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n" + cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n" + + await self.send_cli_command(cmd) + + # get image + if not queue.current_item.image: + return + + # the image format needs to be 500x500 jpeg for maximum compatibility with players + image_url = self.mass.metadata.get_image_url( + queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg" + ) + await self.send_cli_command(f"ARTWORK={image_url}\n") + + async def _send_progress(self, queue: PlayerQueue) -> None: + """Send progress report to player (and connected sync childs).""" + if not queue or not queue.current_item: + return + progress = int(queue.corrected_elapsed_time) + await self.send_cli_command(f"PROGRESS={progress}\n") diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index f1b62be14..dc355bf7f 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -22,6 +22,7 @@ from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, + CONF_ENTRY_ENFORCE_MP3, CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED, ConfigEntry, ConfigValueType, @@ -29,6 +30,7 @@ ) from music_assistant.common.models.enums import ( ConfigEntryType, + ContentType, EventType, PlayerFeature, PlayerState, @@ -533,6 +535,7 @@ async def get_player_config_entries( *await super().get_player_config_entries(player_id), CONF_ENTRY_CROSSFADE, CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED, + CONF_ENTRY_ENFORCE_MP3, create_sample_rates_config_entry(48000, 24, 48000, 24, True), ) if not (sonos_player := self.sonos_players.get(player_id)): @@ -666,6 +669,10 @@ async def play_media( return # play a single uri/url + if self.mass.config.get_raw_player_config_value( + player_id, CONF_ENTRY_ENFORCE_MP3.key, CONF_ENTRY_ENFORCE_MP3.default_value + ): + media.uri = media.uri.replace(".flac", ".mp3") await sonos_player.client.player.group.play_stream_url( media.uri, {"name": media.title, "type": "track"} ) @@ -773,6 +780,9 @@ async def _handle_sonos_queue_itemwindow(self, request: web.Request) -> web.Resp limit=upcoming_window_size + previous_window_size, offset=max(queue_index - previous_window_size, 0), ) + enforce_mp3 = self.mass.config.get_raw_player_config_value( + sonos_player_id, CONF_ENTRY_ENFORCE_MP3.key, CONF_ENTRY_ENFORCE_MP3.default_value + ) sonos_queue_items = [ { "id": item.queue_item_id, @@ -780,7 +790,9 @@ async def _handle_sonos_queue_itemwindow(self, request: web.Request) -> web.Resp "policies": {}, "track": { "type": "track", - "mediaUrl": self.mass.streams.resolve_stream_url(item), + "mediaUrl": self.mass.streams.resolve_stream_url( + item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC + ), "contentType": "audio/flac", "service": { "name": "Music Assistant",