Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: home-assistant/core
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: fdd3904a82407a774597232c3c3d3bd7cd27c57f
Choose a base ref
..
head repository: home-assistant/core
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: dd6883421b40a31cac7ac3a2c93196ee8092ff17
Choose a head ref
Showing with 32 additions and 7 deletions.
  1. +1 −0 homeassistant/components/stream/const.py
  2. +1 −2 homeassistant/components/stream/hls.py
  3. +30 −5 homeassistant/components/stream/worker.py
1 change: 1 addition & 0 deletions homeassistant/components/stream/const.py
Original file line number Diff line number Diff line change
@@ -19,3 +19,4 @@
MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds

PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio
MAX_TIMESTAMP_GAP = 10000 # seconds - anything from 100 to 50000 is probably reasonable
3 changes: 1 addition & 2 deletions homeassistant/components/stream/hls.py
Original file line number Diff line number Diff line change
@@ -72,8 +72,7 @@ async def handle(self, request, stream, sequence):
return web.HTTPNotFound()
headers = {"Content-Type": "video/iso.segment"}
return web.Response(
body=get_m4s(segment.segment, int(sequence)),
headers=headers,
body=get_m4s(segment.segment, int(sequence)), headers=headers,
)


35 changes: 30 additions & 5 deletions homeassistant/components/stream/worker.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@

import av

from .const import MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO
from .const import MAX_TIMESTAMP_GAP, MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO
from .core import Segment, StreamBuffer

_LOGGER = logging.getLogger(__name__)
@@ -72,6 +72,8 @@ def _stream_worker_internal(hass, stream, quit_event):
# The presentation timestamps of the first packet in each stream we receive
# Use to adjust before muxing or outputting, but we don't adjust internally
first_pts = {}
# The decoder timestamps of the latest packet in each stream we processed
last_dts = None
# Keep track of consecutive packets without a dts to detect end of stream.
last_packet_was_without_dts = False
# Holds the buffers for each stream provider
@@ -180,9 +182,16 @@ def mux_audio_packet(packet):
packet.stream = output_streams[audio_stream]
buffer.output.mux(packet)

def finalize_stream():
if not stream.keepalive:
# End of stream, clear listeners and stop thread
for fmt, _ in outputs.items():
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)

if not peek_first_pts():
container.close()
return
last_dts = {k: v - 1 for k, v in first_pts.items()}
initialize_segment(first_pts[video_stream])

while not quit_event.is_set():
@@ -201,13 +210,27 @@ def mux_audio_packet(packet):
continue
last_packet_was_without_dts = False
except (av.AVError, StopIteration) as ex:
if not stream.keepalive:
# End of stream, clear listeners and stop thread
for fmt, _ in outputs.items():
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
finalize_stream()
_LOGGER.error("Error demuxing stream: %s", str(ex))
break

# Discard packet if dts is not monotonic
if packet.dts <= last_dts[packet.stream]:
if (last_dts[packet.stream] - packet.dts) > (
packet.time_base * MAX_TIMESTAMP_GAP
):
finalize_stream()
_LOGGER.warning(
"Timestamp overflow detected: dts=%s, resetting stream", packet.dts
)
break
_LOGGER.warning(
"Dropping out of order packet: %s <= %s",
packet.dts,
last_dts[packet.stream],
)
continue

# Check for end of segment
if packet.stream == video_stream and packet.is_keyframe:
segment_duration = (packet.pts - segment_start_pts) * packet.time_base
@@ -224,6 +247,8 @@ def mux_audio_packet(packet):
# Reinitialize
initialize_segment(packet.pts)

# Update last_dts processed
last_dts[packet.stream] = packet.dts
# mux video packets immediately, save audio packets to be muxed all at once
if packet.stream == video_stream:
mux_video_packet(packet) # mutates packet timestamps