From 0f8c8fcc3db484f67b7930f528ceb30a33ca4ab2 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sat, 19 Oct 2024 20:51:07 +0200 Subject: [PATCH] Fix parsing of HLS (sub)streams (#1727) --- music_assistant/server/helpers/audio.py | 118 ++++++------------ music_assistant/server/helpers/ffmpeg.py | 4 +- music_assistant/server/helpers/playlists.py | 10 +- .../server/providers/apple_music/__init__.py | 12 +- 4 files changed, 54 insertions(+), 90 deletions(-) diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index bcb672318..23fa5876f 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -8,7 +8,6 @@ import re import struct import time -from collections import deque from collections.abc import AsyncGenerator from io import BytesIO from typing import TYPE_CHECKING @@ -44,7 +43,6 @@ from .ffmpeg import FFMpeg, get_ffmpeg_stream from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u from .process import AsyncProcess, check_output, communicate -from .tags import parse_tags from .throttle_retry import BYPASS_THROTTLER from .util import TimedAsyncGenerator, create_tempfile, detect_charset @@ -294,7 +292,7 @@ async def get_media_stream( try: await ffmpeg_proc.start() async for chunk in TimedAsyncGenerator( - ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 60 + ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 300 ): # for radio streams we just yield all chunks directly if streamdetails.media_type == MediaType.RADIO: @@ -580,10 +578,6 @@ async def get_hls_radio_stream( """Get radio audio stream from HTTP HLS playlist.""" logger = LOGGER.getChild("hls_stream") logger.debug("Start streaming HLS stream for url %s", url) - timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60) - prev_chunks: deque[str] = deque(maxlen=50) - has_playlist_metadata: bool | None = None - has_id3_metadata: bool | None = None # we simply select the best quality substream here # if we ever want to support adaptive stream selection based on bandwidth # we need to move the substream selection into the loop below and make it @@ -591,76 +585,30 @@ async def get_hls_radio_stream( # the user wants the best quality possible at all times. playlist_item = await get_hls_substream(mass, url) substream_url = playlist_item.path - empty_loops = 0 - while True: + loops = 50 if streamdetails.media_type != MediaType.RADIO else 1 + while loops: logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url) - async with mass.http_session.get( - substream_url, headers=HTTP_HEADERS, timeout=timeout - ) as resp: - resp.raise_for_status() - raw_data = await resp.read() - encoding = resp.charset or await detect_charset(raw_data) - substream_m3u_data = raw_data.decode(encoding) - # get chunk-parts from the substream - hls_chunks = parse_m3u(substream_m3u_data) - chunk_seconds = 0 - time_start = time.time() - for chunk_item in hls_chunks: - if chunk_item.path in prev_chunks: - continue - chunk_length = int(chunk_item.length) if chunk_item.length else 6 - chunk_item_url = chunk_item.path - if not chunk_item_url.startswith("http"): - # path is relative, stitch it together - base_path = substream_url.rsplit("/", 1)[0] - chunk_item_url = base_path + "/" + chunk_item.path - # handle (optional) in-playlist (timed) metadata - if has_playlist_metadata is None: - has_playlist_metadata = chunk_item.title not in (None, "") - logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata) - if has_playlist_metadata and chunk_item.title != "no desc": - # bbc (and maybe others?) set the title to 'no desc' - cleaned_stream_title = clean_stream_title(chunk_item.title) - if cleaned_stream_title != streamdetails.stream_title: - logger.log( - VERBOSE_LOG_LEVEL, "HLS Radio streamtitle original: %s", chunk_item.title - ) - logger.log( - VERBOSE_LOG_LEVEL, "HLS Radio streamtitle cleaned: %s", cleaned_stream_title - ) - streamdetails.stream_title = cleaned_stream_title - logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item) - # prevent that we play this chunk again if we loop through - prev_chunks.append(chunk_item.path) - async with mass.http_session.get( - chunk_item_url, headers=HTTP_HEADERS, timeout=timeout - ) as resp: - yield await resp.content.read() - chunk_seconds += chunk_length - # handle (optional) in-band (m3u) metadata - if has_id3_metadata is not None and has_playlist_metadata: + # We simply let ffmpeg deal with parsing the HLS playlist and stichting chunks together. + # However we do not feed the playlist URL to ffmpeg directly to give us the possibility + # to monitor the stream title and other metadata for radio streams in the future. + # Also, we've seen cases where ffmpeg sometimes chokes in a stream and aborts, which is not + # very useful for radio streams which you want to simply go on forever, so we need to loop + # and restart ffmpeg in case of an error. + input_format = AudioFormat(content_type=ContentType.UNKNOWN) + audio_format_detected = False + async for chunk in get_ffmpeg_stream( + audio_input=substream_url, + input_format=input_format, + output_format=AudioFormat(content_type=ContentType.WAV), + ): + yield chunk + if audio_format_detected: continue - if has_id3_metadata in (None, True): - tags = await parse_tags(chunk_item_url) - has_id3_metadata = tags.title and tags.title not in chunk_item.path - logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata) - - # end of playlist reached - we loop around to get the next playlist with chunks - # safeguard for an endless loop - # this may happen if we're simply going too fast for the live stream - # we already throttle it a bit but we may end up in a situation where something is wrong - # and we want to break out of this loop, hence this check - if chunk_seconds == 0: - empty_loops += 1 - await asyncio.sleep(1) - else: - empty_loops = 0 - if empty_loops == 50: - logger.warning("breaking out of endless loop") - break - # ensure that we're not going to fast - otherwise we get the same substream playlist - while (time.time() - time_start) < (chunk_seconds - 1): - await asyncio.sleep(0.5) + if input_format.content_type not in (ContentType.UNKNOWN, ContentType.WAV): + # we need to determine the audio format from the first chunk + streamdetails.audio_format = input_format + audio_format_detected = True + loops -= 1 async def get_hls_substream( @@ -679,15 +627,21 @@ async def get_hls_substream( raw_data = await resp.read() encoding = resp.charset or await detect_charset(raw_data) master_m3u_data = raw_data.decode(encoding) - if not allow_encrypted and "EXT-X-KEY:METHOD=AES-128" in master_m3u_data: - # for now we don't support encrypted HLS streams + if not allow_encrypted and "EXT-X-KEY:METHOD=" in master_m3u_data: + # for now we do not (yet) support encrypted HLS streams raise InvalidDataError("HLS stream is encrypted, not supported") substreams = parse_m3u(master_m3u_data) - if any(x for x in substreams if x.length or x.key): - # this is already a substream! - return PlaylistItem( - path=url, - ) + # There is a chance that we did not get a master playlist with subplaylists + # but just a single master/sub playlist with the actual audio stream(s) + # so we need to detect if the playlist child's contain audio streams or + # sub-playlists. + if any( + x + for x in substreams + if (x.length or x.path.endswith((".mp4", ".aac"))) + and not x.path.endswith((".m3u", ".m3u8")) + ): + return PlaylistItem(path=url, key=substreams[0].key) # sort substreams on best quality (highest bandwidth) when available if any(x for x in substreams if x.stream_info): substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True) diff --git a/music_assistant/server/helpers/ffmpeg.py b/music_assistant/server/helpers/ffmpeg.py index de0b4e75e..74b1c127a 100644 --- a/music_assistant/server/helpers/ffmpeg.py +++ b/music_assistant/server/helpers/ffmpeg.py @@ -122,7 +122,7 @@ async def _feed_stdin(self) -> None: generator_exhausted = False audio_received = False try: - async for chunk in TimedAsyncGenerator(self.audio_input, 30): + async for chunk in TimedAsyncGenerator(self.audio_input, 300): audio_received = True await self.write(chunk) generator_exhausted = True @@ -169,7 +169,7 @@ async def get_ffmpeg_stream( ) as ffmpeg_proc: # read final chunks from stdout iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any() - async for chunk in TimedAsyncGenerator(iterator, 60): + async for chunk in iterator: yield chunk diff --git a/music_assistant/server/helpers/playlists.py b/music_assistant/server/helpers/playlists.py index d7e780ccb..e3cef7023 100644 --- a/music_assistant/server/helpers/playlists.py +++ b/music_assistant/server/helpers/playlists.py @@ -145,7 +145,9 @@ def parse_pls(pls_data: str) -> list[PlaylistItem]: return playlist -async def fetch_playlist(mass: MusicAssistant, url: str) -> list[PlaylistItem]: +async def fetch_playlist( + mass: MusicAssistant, url: str, raise_on_hls: bool = True +) -> list[PlaylistItem]: """Parse an online m3u or pls playlist.""" try: async with mass.http_session.get(url, allow_redirects=True, timeout=5) as resp: @@ -164,8 +166,10 @@ async def fetch_playlist(mass: MusicAssistant, url: str) -> list[PlaylistItem]: msg = f"Error while fetching playlist {url}" raise InvalidDataError(msg) from err - if "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data: - raise IsHLSPlaylist(encrypted="#EXT-X-KEY:" in playlist_data) + if raise_on_hls and "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data: + exc = IsHLSPlaylist() + exc.encrypted = "#EXT-X-KEY:" in playlist_data + raise exc if url.endswith((".m3u", ".m3u8")): playlist = parse_m3u(playlist_data) diff --git a/music_assistant/server/providers/apple_music/__init__.py b/music_assistant/server/providers/apple_music/__init__.py index 79863152c..8ae48b40f 100644 --- a/music_assistant/server/providers/apple_music/__init__.py +++ b/music_assistant/server/providers/apple_music/__init__.py @@ -39,7 +39,7 @@ from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import CONF_PASSWORD from music_assistant.server.helpers.app_vars import app_var -from music_assistant.server.helpers.audio import get_hls_substream +from music_assistant.server.helpers.playlists import fetch_playlist from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries from music_assistant.server.models.music_provider import MusicProvider @@ -721,8 +721,14 @@ async def _parse_stream_url_and_uri(self, stream_assets: list[dict]) -> str: ctrp256_urls = [asset["URL"] for asset in stream_assets if asset["flavor"] == "28:ctrp256"] if len(ctrp256_urls) == 0: raise MediaNotFoundError("No ctrp256 URL found for song.") - playlist_item = await get_hls_substream(self.mass, ctrp256_urls[0]) - track_url = playlist_item.path + playlist_url = ctrp256_urls[0] + playlist_items = await fetch_playlist(self.mass, ctrp256_urls[0], raise_on_hls=False) + # Apple returns a HLS (substream) playlist but instead of chunks, + # each item is just the whole file. So we simply grab the first playlist item. + playlist_item = playlist_items[0] + # path is relative, stitch it together + base_path = playlist_url.rsplit("/", 1)[0] + track_url = base_path + "/" + playlist_items[0].path key = playlist_item.key return (track_url, key)