Skip to content

Commit

Permalink
Create Ogg.Muxer (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
Noarkhh authored Jun 6, 2024
1 parent ce0c863 commit 266137e
Show file tree
Hide file tree
Showing 12 changed files with 455 additions and 32 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_ogg_plugin` to your list of dep
```elixir
def deps do
[
{:membrane_ogg_plugin, "~> 0.4.0"}
{:membrane_ogg_plugin, "~> 0.5.0"}
]
end
```
Expand All @@ -26,7 +26,7 @@ end

For an example see `examples/demuxer_example.exs`. To run the example you can use the following command:

```iex examples/demuxer_example.exs```
```elixir examples/demuxer_example.exs```

On macOS you might need to set the opus include path like this:

Expand Down
47 changes: 22 additions & 25 deletions examples/demuxer_example.exs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@

Mix.install([
{:membrane_core, "~> 0.11.0"},
{:membrane_opus_format, "~> 0.3.0"},
{:crc, "~> 0.10"},
{:membrane_file_plugin, "~> 0.13.1"},
{:membrane_portaudio_plugin,
git: "https://github.com/membraneframework/membrane_portaudio_plugin.git",
branch: "bugfix/rename_playback_state_to_playback"},
{:membrane_opus_plugin, "~> 0.16.0"},
{:membrane_file_plugin, "~> 0.16.0"},
{:membrane_portaudio_plugin, "~> 0.18.0"},
{:membrane_opus_plugin, "~> 0.20.0"},
{:membrane_ogg_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()},
])

Expand All @@ -18,28 +12,31 @@ defmodule DemuxerExample do
def handle_init(_context, _opts) do
structure = [
child(:source, %Membrane.File.Source{
location: "./test/fixtures/test_fixtures_1.ogg"
}) |>
child(:ogg_demuxer, Membrane.Ogg.Demuxer)
location: "./test/fixtures/in_opus.ogg"
})
|> child(:ogg_demuxer, Membrane.Ogg.Demuxer)
|> child(:opus, Membrane.Opus.Decoder)
|> child(:portaudio, Membrane.PortAudio.Sink)
]

{[spec: structure, playback: :playing], %{}}
{[spec: structure], %{}}
end

@impl true
def handle_child_notification({:new_track, {track_id, codec}}, :ogg_demuxer, _context, state) do
case codec do
:opus ->
structure = [
get_child(:ogg_demuxer)
|> via_out(Pad.ref(:output, track_id))
|> child(:opus, Membrane.Opus.Decoder)
|> child(:portaudio, Membrane.PortAudio.Sink)
]
def handle_element_end_of_stream(:portaudio, _pad, _ctx, state) do
{[terminate: :normal], state}
end

{[spec: structure, playback: :playing], state}
end
def handle_element_end_of_stream(_element, _pad, _ctx, state) do
{[], state}
end
end

{:ok, sup_pid, pid} = DemuxerExample.start_link()
{:ok, _supervisor_pid, pipeline_pid} = Membrane.Pipeline.start(DemuxerExample)
ref = Process.monitor(pipeline_pid)

# Wait for the pipeline to finish
receive do
{:DOWN, ^ref, :process, _pipeline_pid, _reason} ->
:ok
end
143 changes: 143 additions & 0 deletions lib/muxer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
defmodule Membrane.Ogg.Muxer do
@moduledoc """
A Membrane element for muxing streams into a OGG container.
For now only supports muxing a single Opus track containing one stream (mono or stereo).
The incoming Opus stream needs to have `:duration` field in metadata.
"""
use Membrane.Filter
use Numbers, overload_operators: true

require Membrane.Logger
alias Membrane.{Buffer, Opus}
alias Membrane.Ogg.Page

def_input_pad :input,
accepted_format: %Membrane.Opus{self_delimiting?: false}

def_output_pad :output,
accepted_format: %Membrane.RemoteStream{type: :packetized, content_format: Ogg}

@fixed_sample_rate 48_000

defmodule State do
@moduledoc false
@type t :: %__MODULE__{
current_page: Page.t() | nil,
total_duration: Membrane.Time.t()
}

@enforce_keys []
defstruct @enforce_keys ++
[
current_page: nil,
total_duration: 0
]
end

@impl true
def handle_init(_ctx, _opts) do
{[], %State{}}
end

@impl true
def handle_stream_format(:input, %Opus{channels: channels}, _ctx, state) do
stream_format = %Membrane.RemoteStream{type: :packetized, content_format: Ogg}

header_page =
Page.create_first(0)
|> Page.append_packet!(Membrane.Ogg.Opus.create_id_header(channels))
|> Page.finalize(0)

comment_page =
Page.create_subsequent(header_page)
|> Page.append_packet!(Membrane.Ogg.Opus.create_comment_header())
|> Page.finalize(0)

first_audio_data_page = Page.create_subsequent(comment_page)

buffers = [
%Buffer{payload: Page.serialize(header_page)},
%Buffer{payload: Page.serialize(comment_page)}
]

{
[stream_format: {:output, stream_format}, buffer: {:output, buffers}],
%State{state | current_page: first_audio_data_page}
}
end

@impl true
def handle_buffer(
:input,
%Buffer{pts: pts, metadata: %{duration: _duration}} = buffer,
_ctx,
state
)
when not is_nil(pts) do
packets_to_encapsulate =
if pts > state.total_duration do
Membrane.Logger.debug(
"Stream discontiunuity of length #{Membrane.Time.as_milliseconds(pts - state.total_duration, :exact) |> Ratio.to_float()}ms, using Packet Loss Concealment"
)

Membrane.Ogg.Opus.create_plc_packets(pts, pts - state.total_duration) ++ [buffer]
else
[buffer]
end

{buffers, state} = encapsulate_packets(packets_to_encapsulate, state)

{[buffer: {:output, buffers}], state}
end

@impl true
def handle_end_of_stream(:input, _ctx, %State{current_page: current_page} = state) do
payload =
current_page
|> Page.finalize(calculate_granule_position(state.total_duration), true)
|> Page.serialize()

{[buffer: {:output, %Buffer{payload: payload}}, end_of_stream: :output], state}
end

@spec calculate_granule_position(Membrane.Time.t()) :: non_neg_integer()
defp calculate_granule_position(duration) do
(Membrane.Time.as_seconds(duration, :exact) * @fixed_sample_rate)
|> Ratio.trunc()
end

@spec encapsulate_packets([Buffer.t() | Membrane.Ogg.Opus.plc_packet()], State.t(), [Buffer.t()]) ::
{pages :: [Buffer.t()], state :: State.t()}
defp encapsulate_packets(packets, state, page_buffers \\ [])

defp encapsulate_packets([first_packet | rest_packets], state, page_buffers) do
{new_page_buffers, state} =
case Page.append_packet(state.current_page, first_packet.payload) do
{:ok, page} ->
{[], %State{state | current_page: page}}

{:error, :not_enough_space} ->
complete_page =
state.current_page
|> Page.finalize(calculate_granule_position(first_packet.pts))

new_page =
Page.create_subsequent(complete_page)
|> Page.append_packet!(first_packet.payload)

{[%Buffer{payload: Page.serialize(complete_page)}],
%State{state | current_page: new_page}}
end

encapsulate_packets(
rest_packets,
%{state | total_duration: state.total_duration + first_packet.metadata.duration},
page_buffers ++ new_page_buffers
)
end

defp encapsulate_packets([], state, page_buffers) do
{page_buffers, state}
end
end
60 changes: 60 additions & 0 deletions lib/opus.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule Membrane.Ogg.Opus do
@moduledoc false

require Membrane.Logger

@id_header_signature "OpusHead"
@version 1
@preskip 0
@sample_rate 48_000
@output_gain 0
@channel_mapping_family 0

@comment_header_signature "OpusTags"
@vendor "membraneframework"
@user_comment_list_length 0

@plc_packet_for_2_5ms_gap <<16::5, 0::1, 0::3>>
@shortest_frame_duration Membrane.Time.microseconds(2_500)

@type plc_packet :: %{
payload: binary(),
pts: Membrane.Time.t(),
metadata: %{duration: Membrane.Time.non_neg()}
}

@spec create_id_header(non_neg_integer()) :: binary()
def create_id_header(channel_count) do
<<@id_header_signature, @version, channel_count, @preskip::little-16, @sample_rate::little-32,
@output_gain::little-16, @channel_mapping_family>>
end

@spec create_comment_header() :: binary()
def create_comment_header() do
<<@comment_header_signature, byte_size(@vendor)::little-32, @vendor,
@user_comment_list_length::little-32>>
end

@spec create_plc_packets(Membrane.Time.t(), Membrane.Time.t()) :: [plc_packet()]
def create_plc_packets(gap_start_timestamp, gap_duration) do
# PLC: Packet Loss Concealment
if rem(gap_duration, @shortest_frame_duration) != 0 do
Membrane.Logger.warning(
"Theoretically impossible gap in Opus stream of #{Membrane.Time.as_milliseconds(gap_duration, :exact) |> Ratio.to_float()}ms"
)
end

# Adding a millisecond margin in case of timestamp innacuracies
packets_to_generate =
div(gap_duration + Membrane.Time.millisecond(), @shortest_frame_duration)

Range.to_list(0..(packets_to_generate - 1))
|> Enum.map(
&%{
payload: @plc_packet_for_2_5ms_gap,
pts: gap_start_timestamp + &1 * @shortest_frame_duration,
metadata: %{duration: @shortest_frame_duration}
}
)
end
end
Loading

0 comments on commit 266137e

Please sign in to comment.