diff --git a/lib/membrane_mp4/muxer/cmaf.ex b/lib/membrane_mp4/muxer/cmaf.ex index a7164c6..0224bd2 100644 --- a/lib/membrane_mp4/muxer/cmaf.ex +++ b/lib/membrane_mp4/muxer/cmaf.ex @@ -187,7 +187,9 @@ defmodule Membrane.MP4.Muxer.CMAF do pads_registration_order: [], sample_queues: %{}, finish_current_segment?: false, - video_pad: nil + video_pad: nil, + all_input_pads_ready?: false, + buffers_awaiting_init: [] }) |> set_chunk_duration_range() @@ -309,15 +311,26 @@ defmodule Membrane.MP4.Muxer.CMAF do if are_all_group_pads_ready?(pad, ctx, state) do stream_format = generate_output_stream_format(output_pad, state) + old_input_pads_ready? = state.all_input_pads_ready? + + state = update_input_pads_ready(pad, ctx, state) + + {actions, state} = + if old_input_pads_ready? != state.all_input_pads_ready? do + replay_init_buffers(ctx, state) + else + {[], state} + end + cond do is_nil(ctx.pads[output_pad].stream_format) -> - {[stream_format: {output_pad, stream_format}], state} + {[{:stream_format, {output_pad, stream_format}} | actions], state} stream_format != ctx.pads[output_pad].stream_format -> - {[], SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)} + {actions, SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)} true -> - {[], state} + {actions, state} end else {[], state} @@ -325,7 +338,8 @@ defmodule Membrane.MP4.Muxer.CMAF do end @impl true - def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state) do + def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state) + when state.all_input_pads_ready? do use Numbers, overload_operators: true, comparison: true # In case DTS is not set, use PTS. This is the case for audio tracks or H264 originated @@ -359,6 +373,11 @@ defmodule Membrane.MP4.Muxer.CMAF do end end + @impl true + def handle_buffer(pad, sample, _ctx, state) do + {[], %{state | buffers_awaiting_init: [{pad, sample} | state.buffers_awaiting_init]}} + end + @impl true def handle_event(_pad, %__MODULE__.RequestMediaFinalization{}, _ctx, state) do {[], %{state | finish_current_segment?: true}} @@ -735,6 +754,25 @@ defmodule Membrane.MP4.Muxer.CMAF do end end + defp update_input_pads_ready(pad, ctx, state) do + all_input_pads_ready? = + Enum.all?(ctx.pads, fn + {^pad, _data} -> true + {Pad.ref(:output, _id), _data} -> true + {Pad.ref(:input, _id), data} -> data.stream_format != nil + end) + + %{state | all_input_pads_ready?: all_input_pads_ready?} + end + + defp replay_init_buffers(ctx, state) do + {buffers, state} = Map.pop!(state, :buffers_awaiting_init) + + Enum.flat_map_reduce(buffers, state, fn {pad, buffer}, state -> + handle_buffer(pad, buffer, ctx, state) + end) + end + @min_chunk_duration Membrane.Time.milliseconds(50) defp set_chunk_duration_range( %{