Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Ogg.Muxer #12

Merged
merged 34 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3a61ebb
Start putting known timestamps in metadata
Noarkhh May 9, 2024
5ea52d6
Make integration test check for timestamps
Noarkhh May 9, 2024
168b724
Change packet struct, adjust tests, bump version
Noarkhh May 13, 2024
8b37ba9
Satisfy checks
Noarkhh May 13, 2024
e0ff770
Change test
Noarkhh May 13, 2024
6223ce8
Refactor handling track specific state
Noarkhh May 13, 2024
d2770c2
Remove inspects
Noarkhh May 13, 2024
f2f4ac0
Remove unused variables
Noarkhh May 13, 2024
e4a94ce
Update dep
Noarkhh May 14, 2024
ee50ab8
Change flow control to auto on all pads
Noarkhh May 13, 2024
73c9c73
Remove unused functions and simplify
Noarkhh May 13, 2024
0eb9e56
If received eos forward it on pad added
Noarkhh May 13, 2024
7d43e3a
Refactor get_packet_actions
Noarkhh May 14, 2024
06440c2
Revert generating page pts
Noarkhh May 16, 2024
b9285da
Minor refactor
Noarkhh May 16, 2024
ee5e6df
Use static pad (#13)
Noarkhh May 20, 2024
f60135a
Bump version
Noarkhh May 20, 2024
f03585c
Remove opus_plugin dependency
Noarkhh May 20, 2024
7f26131
Start implementing the demuxer
Noarkhh May 14, 2024
3c9d393
Further work
Noarkhh May 15, 2024
cf570a9
Muxer prototype working
Noarkhh May 15, 2024
d2e5dc9
Add Page alias
Noarkhh May 15, 2024
40b8cfa
Use Page alias in struct creations
Noarkhh May 15, 2024
3969997
Bump opus_plugin version
Noarkhh May 20, 2024
96f70c0
Start adding tests
Noarkhh May 20, 2024
f74f2ba
Add muxer demuxer test
Noarkhh May 20, 2024
ffd43b5
Change file structure
Noarkhh May 20, 2024
75ea53c
Merge branch 'master' into create-muxer
Noarkhh May 20, 2024
5d4e888
Add ability to fill gaps with PLC
Noarkhh May 20, 2024
dd791b7
Minor refactor
Noarkhh May 21, 2024
c16885d
Move Opus header and packet utils into one module
Noarkhh May 21, 2024
79a5b4f
Bump version
Noarkhh May 21, 2024
3d93c80
Apply reviewers suggestion, update example
Noarkhh May 28, 2024
33cf36a
Add error margin for timestamp gaps
Noarkhh Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_ogg_plugin` to your list of dep
```elixir
def deps do
[
{:membrane_ogg_plugin, "~> 0.4.0"}
{:membrane_ogg_plugin, "~> 0.5.0"}
]
end
```
Expand All @@ -26,7 +26,7 @@ end

For an example see `examples/demuxer_example.exs`. To run the example you can use the following command:

```iex examples/demuxer_example.exs```
```elixir examples/demuxer_example.exs```

On macOS you might need to set the opus include path like this:

Expand Down
47 changes: 22 additions & 25 deletions examples/demuxer_example.exs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@

Mix.install([
{:membrane_core, "~> 0.11.0"},
{:membrane_opus_format, "~> 0.3.0"},
{:crc, "~> 0.10"},
{:membrane_file_plugin, "~> 0.13.1"},
{:membrane_portaudio_plugin,
git: "https://github.com/membraneframework/membrane_portaudio_plugin.git",
branch: "bugfix/rename_playback_state_to_playback"},
{:membrane_opus_plugin, "~> 0.16.0"},
{:membrane_file_plugin, "~> 0.16.0"},
{:membrane_portaudio_plugin, "~> 0.18.0"},
{:membrane_opus_plugin, "~> 0.20.0"},
{:membrane_ogg_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()},
])

Expand All @@ -18,28 +12,31 @@ defmodule DemuxerExample do
def handle_init(_context, _opts) do
structure = [
child(:source, %Membrane.File.Source{
location: "./test/fixtures/test_fixtures_1.ogg"
}) |>
child(:ogg_demuxer, Membrane.Ogg.Demuxer)
location: "./test/fixtures/in_opus.ogg"
})
|> child(:ogg_demuxer, Membrane.Ogg.Demuxer)
|> child(:opus, Membrane.Opus.Decoder)
|> child(:portaudio, Membrane.PortAudio.Sink)
]

{[spec: structure, playback: :playing], %{}}
{[spec: structure], %{}}
end

@impl true
def handle_child_notification({:new_track, {track_id, codec}}, :ogg_demuxer, _context, state) do
case codec do
:opus ->
structure = [
get_child(:ogg_demuxer)
|> via_out(Pad.ref(:output, track_id))
|> child(:opus, Membrane.Opus.Decoder)
|> child(:portaudio, Membrane.PortAudio.Sink)
]
def handle_element_end_of_stream(:portaudio, _pad, _ctx, state) do
{[terminate: :normal], state}
end

{[spec: structure, playback: :playing], state}
end
def handle_element_end_of_stream(_element, _pad, _ctx, state) do
{[], state}
end
end

{:ok, sup_pid, pid} = DemuxerExample.start_link()
{:ok, _supervisor_pid, pipeline_pid} = Membrane.Pipeline.start(DemuxerExample)
ref = Process.monitor(pipeline_pid)

# Wait for the pipeline to finish
receive do
{:DOWN, ^ref, :process, _pipeline_pid, _reason} ->
:ok
end
143 changes: 143 additions & 0 deletions lib/muxer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
defmodule Membrane.Ogg.Muxer do
@moduledoc """
A Membrane element for muxing streams into a OGG container.
For now only supports muxing a single Opus track containing one stream (mono or stereo).

The incoming Opus stream needs to have `:duration` field in metadata.
"""
use Membrane.Filter
use Numbers, overload_operators: true

require Membrane.Logger
alias Membrane.{Buffer, Opus}
alias Membrane.Ogg.Page

def_input_pad :input,
accepted_format: %Membrane.Opus{self_delimiting?: false}

def_output_pad :output,
accepted_format: %Membrane.RemoteStream{type: :packetized, content_format: Ogg}

@fixed_sample_rate 48_000

defmodule State do
@moduledoc false
@type t :: %__MODULE__{
current_page: Page.t() | nil,
total_duration: Membrane.Time.t()
}

@enforce_keys []
defstruct @enforce_keys ++
[
current_page: nil,
total_duration: 0
]
end

@impl true
def handle_init(_ctx, _opts) do
{[], %State{}}
end

@impl true
def handle_stream_format(:input, %Opus{channels: channels}, _ctx, state) do
stream_format = %Membrane.RemoteStream{type: :packetized, content_format: Ogg}

header_page =
Page.create_first(0)
|> Page.append_packet!(Membrane.Ogg.Opus.create_id_header(channels))
|> Page.finalize(0)

comment_page =
Page.create_subsequent(header_page)
|> Page.append_packet!(Membrane.Ogg.Opus.create_comment_header())
|> Page.finalize(0)

first_audio_data_page = Page.create_subsequent(comment_page)

buffers = [
%Buffer{payload: Page.serialize(header_page)},
%Buffer{payload: Page.serialize(comment_page)}
]

{
[stream_format: {:output, stream_format}, buffer: {:output, buffers}],
%State{state | current_page: first_audio_data_page}
}
end

@impl true
def handle_buffer(
:input,
%Buffer{pts: pts, metadata: %{duration: _duration}} = buffer,
_ctx,
state
)
when not is_nil(pts) do
packets_to_encapsulate =
if pts > state.total_duration do
Membrane.Logger.debug(
"Stream discontiunuity of length #{Membrane.Time.as_milliseconds(pts - state.total_duration, :exact) |> Ratio.to_float()}ms, using Packet Loss Concealment"
)

Membrane.Ogg.Opus.create_plc_packets(pts, pts - state.total_duration) ++ [buffer]
else
[buffer]
end

{buffers, state} = encapsulate_packets(packets_to_encapsulate, state)

{[buffer: {:output, buffers}], state}
end

@impl true
def handle_end_of_stream(:input, _ctx, %State{current_page: current_page} = state) do
payload =
current_page
|> Page.finalize(calculate_granule_position(state.total_duration), true)
|> Page.serialize()

{[buffer: {:output, %Buffer{payload: payload}}, end_of_stream: :output], state}
end

@spec calculate_granule_position(Membrane.Time.t()) :: non_neg_integer()
defp calculate_granule_position(duration) do
(Membrane.Time.as_seconds(duration, :exact) * @fixed_sample_rate)
|> Ratio.trunc()
end

@spec encapsulate_packets([Buffer.t() | Membrane.Ogg.Opus.plc_packet()], State.t(), [Buffer.t()]) ::
{pages :: [Buffer.t()], state :: State.t()}
defp encapsulate_packets(packets, state, page_buffers \\ [])

defp encapsulate_packets([first_packet | rest_packets], state, page_buffers) do
{new_page_buffers, state} =
case Page.append_packet(state.current_page, first_packet.payload) do
{:ok, page} ->
{[], %State{state | current_page: page}}

{:error, :not_enough_space} ->
complete_page =
state.current_page
|> Page.finalize(calculate_granule_position(first_packet.pts))

new_page =
Page.create_subsequent(complete_page)
|> Page.append_packet!(first_packet.payload)

{[%Buffer{payload: Page.serialize(complete_page)}],
%State{state | current_page: new_page}}
end

encapsulate_packets(
rest_packets,
%{state | total_duration: state.total_duration + first_packet.metadata.duration},
page_buffers ++ new_page_buffers
)
end

defp encapsulate_packets([], state, page_buffers) do
{page_buffers, state}
end
end
60 changes: 60 additions & 0 deletions lib/opus.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule Membrane.Ogg.Opus do
@moduledoc false

require Membrane.Logger

@id_header_signature "OpusHead"
@version 1
@preskip 0
@sample_rate 48_000
@output_gain 0
@channel_mapping_family 0

@comment_header_signature "OpusTags"
@vendor "membraneframework"
@user_comment_list_length 0

@plc_packet_for_2_5ms_gap <<16::5, 0::1, 0::3>>
@shortest_frame_duration Membrane.Time.microseconds(2_500)

@type plc_packet :: %{
payload: binary(),
pts: Membrane.Time.t(),
metadata: %{duration: Membrane.Time.non_neg()}
}

@spec create_id_header(non_neg_integer()) :: binary()
def create_id_header(channel_count) do
<<@id_header_signature, @version, channel_count, @preskip::little-16, @sample_rate::little-32,
@output_gain::little-16, @channel_mapping_family>>
end

@spec create_comment_header() :: binary()
def create_comment_header() do
<<@comment_header_signature, byte_size(@vendor)::little-32, @vendor,
@user_comment_list_length::little-32>>
end

@spec create_plc_packets(Membrane.Time.t(), Membrane.Time.t()) :: [plc_packet()]
def create_plc_packets(gap_start_timestamp, gap_duration) do
# PLC: Packet Loss Concealment
if rem(gap_duration, @shortest_frame_duration) != 0 do
Membrane.Logger.warning(
"Theoretically impossible gap in Opus stream of #{Membrane.Time.as_milliseconds(gap_duration, :exact) |> Ratio.to_float()}ms"
)
end

# Adding a millisecond margin in case of timestamp innacuracies
packets_to_generate =
div(gap_duration + Membrane.Time.millisecond(), @shortest_frame_duration)

Range.to_list(0..(packets_to_generate - 1))
|> Enum.map(
&%{
payload: @plc_packet_for_2_5ms_gap,
pts: gap_start_timestamp + &1 * @shortest_frame_duration,
metadata: %{duration: @shortest_frame_duration}
}
)
end
end
Loading