Skip to content

Commit

Permalink
Modify check for monotonic dts
Browse files Browse the repository at this point in the history
  • Loading branch information
uvjustin committed Sep 9, 2020
1 parent 90892d2 commit dd68834
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 15 deletions.
1 change: 1 addition & 0 deletions homeassistant/components/stream/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@
MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds

PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio
MAX_TIMESTAMP_GAP = 10000 # seconds - anything from 100 to 50000 is probably reasonable
3 changes: 1 addition & 2 deletions homeassistant/components/stream/hls.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ async def handle(self, request, stream, sequence):
return web.HTTPNotFound()
headers = {"Content-Type": "video/iso.segment"}
return web.Response(
body=get_m4s(segment.segment, int(sequence)),
headers=headers,
body=get_m4s(segment.segment, int(sequence)), headers=headers,
)


Expand Down
37 changes: 24 additions & 13 deletions homeassistant/components/stream/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import av

from .const import MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO
from .const import MAX_TIMESTAMP_GAP, MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO
from .core import Segment, StreamBuffer

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,9 +50,7 @@ def stream_worker(hass, stream, quit_event):
# To avoid excessive restarts, don't restart faster than once every 40 seconds.
wait_timeout = max(40 - (time.time() - start_time), 0)
_LOGGER.debug(
"Restarting stream worker in %d seconds: %s",
wait_timeout,
stream.source,
"Restarting stream worker in %d seconds: %s", wait_timeout, stream.source,
)


Expand All @@ -64,6 +62,7 @@ def _stream_worker_internal(hass, stream, quit_event):
video_stream = container.streams.video[0]
except (KeyError, IndexError):
_LOGGER.error("Stream has no video")
container.close()
return
try:
audio_stream = container.streams.audio[0]
Expand Down Expand Up @@ -183,6 +182,12 @@ def mux_audio_packet(packet):
packet.stream = output_streams[audio_stream]
buffer.output.mux(packet)

def finalize_stream():
if not stream.keepalive:
# End of stream, clear listeners and stop thread
for fmt, _ in outputs.items():
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)

if not peek_first_pts():
container.close()
return
Expand All @@ -205,15 +210,25 @@ def mux_audio_packet(packet):
continue
last_packet_was_without_dts = False
except (av.AVError, StopIteration) as ex:
if not stream.keepalive:
# End of stream, clear listeners and stop thread
for fmt, _ in outputs.items():
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
finalize_stream()
_LOGGER.error("Error demuxing stream: %s", str(ex))
break

# Discard packet if dts is not monotonic
if packet.dts <= last_dts[packet.stream]:
if (last_dts[packet.stream] - packet.dts) > (
packet.time_base * MAX_TIMESTAMP_GAP
):
finalize_stream()
_LOGGER.warning(
"Timestamp overflow detected: dts=%s, resetting stream", packet.dts
)
break
_LOGGER.warning(
"Dropping out of order packet: %s <= %s",
packet.dts,
last_dts[packet.stream],
)
continue

# Check for end of segment
Expand All @@ -226,11 +241,7 @@ def mux_audio_packet(packet):
if stream.outputs.get(fmt):
hass.loop.call_soon_threadsafe(
stream.outputs[fmt].put,
Segment(
sequence,
buffer.segment,
segment_duration,
),
Segment(sequence, buffer.segment, segment_duration,),
)

# Reinitialize
Expand Down

0 comments on commit dd68834

Please sign in to comment.