Skip to content

Commit

Permalink
Add waiting for all input pads before assembling first segment
Browse files Browse the repository at this point in the history
  • Loading branch information
Qizot committed Feb 23, 2024
1 parent 1f7fa29 commit 6f5db82
Showing 1 changed file with 43 additions and 5 deletions.
48 changes: 43 additions & 5 deletions lib/membrane_mp4/muxer/cmaf.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ defmodule Membrane.MP4.Muxer.CMAF do
pads_registration_order: [],
sample_queues: %{},
finish_current_segment?: false,
video_pad: nil
video_pad: nil,
all_input_pads_ready?: false,
buffers_awaiting_init: []
})
|> set_chunk_duration_range()

Expand Down Expand Up @@ -309,23 +311,35 @@ defmodule Membrane.MP4.Muxer.CMAF do
if are_all_group_pads_ready?(pad, ctx, state) do
stream_format = generate_output_stream_format(output_pad, state)

old_input_pads_ready? = state.all_input_pads_ready?

state = update_input_pads_ready(pad, ctx, state)

{actions, state} =
if old_input_pads_ready? != state.all_input_pads_ready? do
replay_init_buffers(ctx, state)
else
{[], state}
end

cond do
is_nil(ctx.pads[output_pad].stream_format) ->
{[stream_format: {output_pad, stream_format}], state}
{[{:stream_format, {output_pad, stream_format}} | actions], state}

stream_format != ctx.pads[output_pad].stream_format ->
{[], SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)}
{actions, SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)}

true ->
{[], state}
{actions, state}
end
else
{[], state}
end
end

@impl true
def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state) do
def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state)
when state.all_input_pads_ready? do
use Numbers, overload_operators: true, comparison: true

# In case DTS is not set, use PTS. This is the case for audio tracks or H264 originated
Expand Down Expand Up @@ -359,6 +373,11 @@ defmodule Membrane.MP4.Muxer.CMAF do
end
end

@impl true
def handle_buffer(pad, sample, _ctx, state) do
{[], %{state | buffers_awaiting_init: [{pad, sample} | state.buffers_awaiting_init]}}
end

@impl true
def handle_event(_pad, %__MODULE__.RequestMediaFinalization{}, _ctx, state) do
{[], %{state | finish_current_segment?: true}}
Expand Down Expand Up @@ -735,6 +754,25 @@ defmodule Membrane.MP4.Muxer.CMAF do
end
end

defp update_input_pads_ready(pad, ctx, state) do
all_input_pads_ready? =
Enum.all?(ctx.pads, fn
{^pad, _data} -> true
{Pad.ref(:output, _id), _data} -> true
{Pad.ref(:input, _id), data} -> data.stream_format != nil
end)

%{state | all_input_pads_ready?: all_input_pads_ready?}
end

defp replay_init_buffers(ctx, state) do
{buffers, state} = Map.pop!(state, :buffers_awaiting_init)

Enum.flat_map_reduce(buffers, state, fn {pad, buffer}, state ->
handle_buffer(pad, buffer, ctx, state)
end)
end

@min_chunk_duration Membrane.Time.milliseconds(50)
defp set_chunk_duration_range(
%{
Expand Down

0 comments on commit 6f5db82

Please sign in to comment.