Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix segment collection end timestamp calculation #102

Merged
merged 5 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The package can be installed by adding `membrane_mp4_plugin` to your list of dep
```elixir
defp deps do
[
{:membrane_mp4_plugin, "~> 0.32.0"}
{:membrane_mp4_plugin, "~> 0.33.0"}
Qizot marked this conversation as resolved.
Show resolved Hide resolved
]
end
```
Expand Down
77 changes: 64 additions & 13 deletions lib/membrane_mp4/muxer/cmaf.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ defmodule Membrane.MP4.Muxer.CMAF do
pads_registration_order: [],
sample_queues: %{},
finish_current_segment?: false,
video_pad: nil
video_pad: nil,
all_input_pads_ready?: false,
buffers_awaiting_init: []
})
|> set_chunk_duration_range()

Expand Down Expand Up @@ -309,23 +311,35 @@ defmodule Membrane.MP4.Muxer.CMAF do
if are_all_group_pads_ready?(pad, ctx, state) do
stream_format = generate_output_stream_format(output_pad, state)

old_input_pads_ready? = state.all_input_pads_ready?

state = update_input_pads_ready(pad, ctx, state)

{actions, state} =
if old_input_pads_ready? != state.all_input_pads_ready? do
mat-hek marked this conversation as resolved.
Show resolved Hide resolved
replay_init_buffers(ctx, state)
else
{[], state}
end

cond do
is_nil(ctx.pads[output_pad].stream_format) ->
{[stream_format: {output_pad, stream_format}], state}
{[{:stream_format, {output_pad, stream_format}} | actions], state}

stream_format != ctx.pads[output_pad].stream_format ->
{[], SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)}
{actions, SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)}

true ->
{[], state}
{actions, state}
end
else
{[], state}
end
end

@impl true
def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state) do
def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state)
when state.all_input_pads_ready? do
use Numbers, overload_operators: true, comparison: true

# In case DTS is not set, use PTS. This is the case for audio tracks or H264 originated
Expand All @@ -335,7 +349,7 @@ defmodule Membrane.MP4.Muxer.CMAF do

{sample, state} =
state
|> maybe_init_segment_base_timestamp(pad, sample)
|> maybe_init_segment_timestamps(pad, sample)
|> process_buffer_awaiting_duration(pad, sample)

state = SegmentHelper.update_awaiting_stream_format(state, pad)
Expand All @@ -359,6 +373,11 @@ defmodule Membrane.MP4.Muxer.CMAF do
end
end

@impl true
def handle_buffer(pad, sample, _ctx, state) do
{[], %{state | buffers_awaiting_init: [{pad, sample} | state.buffers_awaiting_init]}}
end

@impl true
def handle_event(_pad, %__MODULE__.RequestMediaFinalization{}, _ctx, state) do
{[], %{state | finish_current_segment?: true}}
Expand Down Expand Up @@ -438,9 +457,12 @@ defmodule Membrane.MP4.Muxer.CMAF do
track_data = %{
id: track_id,
track: nil,
# base timestamp of the current segment, initialized with DTS of the first buffer
# decoding timestamp of the current segment, initialized with DTS of the first buffer
# and then incremented by duration of every produced segment
segment_decoding_timestamp: nil,
# presentation timestamp of the current segment, initialized with PTS of the first buffer
# and then incremented by duration of every produced segment
segment_base_timestamp: nil,
segment_presentation_timestamp: nil,
end_timestamp: 0,
buffer_awaiting_duration: nil,
chunks_duration: Membrane.Time.seconds(0)
Expand Down Expand Up @@ -587,7 +609,7 @@ defmodule Membrane.MP4.Muxer.CMAF do
sequence_number: state.seq_nums[output_pad],
timescale: timescale,
base_timestamp:
track_data.segment_base_timestamp
track_data.segment_presentation_timestamp
|> Helper.timescalify(timescale)
|> Ratio.trunc(),
unscaled_duration: duration,
Expand All @@ -603,7 +625,12 @@ defmodule Membrane.MP4.Muxer.CMAF do
state =
tracks_data
|> Enum.reduce(state, fn %{unscaled_duration: duration, pad: pad}, state ->
update_in(state, [:pad_to_track_data, pad, :segment_base_timestamp], &(&1 + duration))
state
|> update_in([:pad_to_track_data, pad, :segment_decoding_timestamp], &(&1 + duration))
|> update_in(
[:pad_to_track_data, pad, :segment_presentation_timestamp],
&(&1 + duration)
)
end)
|> update_in([:seq_nums, output_pad], &(&1 + 1))

Expand Down Expand Up @@ -712,16 +739,40 @@ defmodule Membrane.MP4.Muxer.CMAF do
end
end

defp maybe_init_segment_base_timestamp(state, pad, sample) do
defp maybe_init_segment_timestamps(state, pad, sample) do
case state do
%{pad_to_track_data: %{^pad => %{segment_base_timestamp: nil}}} ->
put_in(state, [:pad_to_track_data, pad, :segment_base_timestamp], sample.dts)
%{pad_to_track_data: %{^pad => %{segment_decoding_timestamp: nil}}} ->
update_in(state, [:pad_to_track_data, pad], fn data ->
Map.merge(data, %{
segment_decoding_timestamp: sample.dts,
segment_presentation_timestamp: sample.pts
})
end)

_else ->
state
end
end

defp update_input_pads_ready(pad, ctx, state) do
all_input_pads_ready? =
Enum.all?(ctx.pads, fn
{^pad, _data} -> true
{Pad.ref(:output, _id), _data} -> true
{Pad.ref(:input, _id), data} -> data.stream_format != nil
end)

%{state | all_input_pads_ready?: all_input_pads_ready?}
end

defp replay_init_buffers(ctx, state) do
{buffers, state} = Map.pop!(state, :buffers_awaiting_init)

Enum.flat_map_reduce(buffers, state, fn {pad, buffer}, state ->
handle_buffer(pad, buffer, ctx, state)
end)
end

@min_chunk_duration Membrane.Time.milliseconds(50)
defp set_chunk_duration_range(
%{
Expand Down
20 changes: 10 additions & 10 deletions lib/membrane_mp4/muxer/cmaf/segment_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
end

defp push_video_segment(state, queue, pad, sample) do
base_timestamp = max_segment_base_timestamp(state)
base_timestamp = max_segment_decoding_timestamp(state)

queue =
if state.finish_current_segment? do
Expand All @@ -144,7 +144,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
end

defp push_audio_segment(state, queue, pad, sample) do
base_timestamp = max_segment_base_timestamp(state)
base_timestamp = max_segment_decoding_timestamp(state)

{video_pad, video_queue} =
Enum.find(state.sample_queues, {nil, nil}, fn {_pad, queue} ->
Expand Down Expand Up @@ -190,7 +190,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
total_collected_durations =
Map.fetch!(state.pad_to_track_data, pad).chunks_duration + collected_duration

base_timestamp = state.pad_to_track_data[pad].segment_base_timestamp
base_timestamp = state.pad_to_track_data[pad].segment_decoding_timestamp

queue =
cond do
Expand Down Expand Up @@ -262,7 +262,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
if video_queue do
SamplesQueue.force_push(queue, sample)
else
base_timestamp = max_segment_base_timestamp(state)
base_timestamp = max_segment_decoding_timestamp(state)

SamplesQueue.plain_push_until_target(queue, sample, base_timestamp)
end
Expand All @@ -288,7 +288,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
defp update_queue_for(pad, queue, state), do: put_in(state, [:sample_queues, pad], queue)

defp collect_samples_for_video_track(pad, queue, state) do
end_timestamp = SamplesQueue.last_collected_dts(queue)
end_timestamp = SamplesQueue.collectable_end_timestamp(queue)
state = update_queue_for(pad, queue, state)

if tracks_ready_for_collection?(state, end_timestamp) do
Expand All @@ -311,7 +311,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
end

defp collect_samples_for_audio_track(pad, queue, state) do
end_timestamp = SamplesQueue.last_collected_dts(queue)
end_timestamp = SamplesQueue.collectable_end_timestamp(queue)
state = update_queue_for(pad, queue, state)

if tracks_ready_for_collection?(state, end_timestamp) do
Expand All @@ -338,7 +338,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do

defp tracks_ready_for_collection?(state, end_timestamp) do
Enum.all?(state.sample_queues, fn {_pad, queue} ->
SamplesQueue.last_collected_dts(queue) >= end_timestamp
SamplesQueue.collectable_end_timestamp(queue) >= end_timestamp
end)
end

Expand Down Expand Up @@ -393,11 +393,11 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
maybe_return_segment(segment, reset_chunks_duration(state))
end

defp max_segment_base_timestamp(state) do
defp max_segment_decoding_timestamp(state) do
state.pad_to_track_data
|> Enum.reject(fn {_key, track_data} -> is_nil(track_data.segment_base_timestamp) end)
|> Enum.reject(fn {_key, track_data} -> is_nil(track_data.segment_decoding_timestamp) end)
|> Enum.map(fn {_key, track_data} ->
Ratio.to_float(track_data.segment_base_timestamp)
Ratio.to_float(track_data.segment_decoding_timestamp)
end)
|> Enum.max()
end
Expand Down
25 changes: 16 additions & 9 deletions lib/membrane_mp4/muxer/cmaf/track_samples_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -280,24 +280,31 @@ defmodule Membrane.MP4.Muxer.CMAF.TrackSamplesQueue do
end

@doc """
Returns dts of the latest sample that is eligible for collection.
Returns the end timestamp for latest sample that is eligible for collection.

In case of collectable state it is the last sample that has been put to queue, otherwise
it is the last sample that will be in return from 'collect/1'.
"""
@spec last_collected_dts(t()) :: integer()
def last_collected_dts(%__MODULE__{
@spec collectable_end_timestamp(t()) :: integer()
def collectable_end_timestamp(%__MODULE__{
collectable?: false,
target_samples: target_samples,
excess_samples: excess_samples
}),
do: latest_collected_dts(excess_samples) || latest_collected_dts(target_samples) || -1
}) do
sample = List.first(excess_samples) || List.first(target_samples)

if sample do
sample.dts + sample.metadata.duration
else
-1
end
end

def last_collected_dts(%__MODULE__{collectable?: true, target_samples: target_samples}),
do: latest_collected_dts(List.last(target_samples, []) |> List.wrap()) || -1
def collectable_end_timestamp(%__MODULE__{collectable?: true, target_samples: target_samples}) do
sample = List.last(target_samples)

defp latest_collected_dts([]), do: nil
defp latest_collected_dts([sample | _rest]), do: Ratio.to_float(sample.dts)
sample.dts + sample.metadata.duration
end

@doc """
Returns the most recenlty pushed sample.
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.MP4.Plugin.MixProject do
use Mix.Project

@version "0.32.0"
@version "0.33.0"
@github_url "https://github.com/membraneframework/membrane_mp4_plugin"

def project do
Expand Down
Binary file modified test/fixtures/cmaf/muxed_audio_video/segment_1.m4s
Binary file not shown.
Binary file modified test/fixtures/cmaf/muxed_audio_video/segment_2.m4s
Binary file not shown.
Binary file modified test/fixtures/cmaf/ref_video_hevc_segment1.m4s
Binary file not shown.
Binary file modified test/fixtures/cmaf/ref_video_hevc_segment2.m4s
Binary file not shown.
Binary file modified test/fixtures/cmaf/ref_video_segment1.m4s
Binary file not shown.
Binary file modified test/fixtures/cmaf/ref_video_segment2.m4s
Binary file not shown.
14 changes: 12 additions & 2 deletions test/membrane_mp4/muxer/cmaf/segment_helper_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelperTest do
input_to_output_pad: %{audio: :output, video: :output},
input_groups: %{output: [:audio, :video]},
pad_to_track_data: %{
audio: %{segment_base_timestamp: 0, chunks_duration: 0, buffer_awaiting_duration: nil},
video: %{segment_base_timestamp: 0, chunks_duration: 0, buffer_awaiting_duration: nil}
audio: %{
segment_decoding_timestamp: 0,
segment_presentation_timestamp: 0,
chunks_duration: 0,
buffer_awaiting_duration: nil
},
video: %{
segment_decoding_timestamp: 0,
segment_presentation_timestamp: 0,
chunks_duration: 0,
buffer_awaiting_duration: nil
}
},
sample_queues: %{
audio: %Queue{track_with_keyframes?: false, duration_range: chunk_duration_range},
Expand Down