Skip to content

Commit

Permalink
reduce per-slice allocations
Browse files Browse the repository at this point in the history
`slices::Slices::get_range` was too closely following the example of
`http_serve::Entity::get_range`:

The latter is once-per-request, so just boxing is low-cost and makes
sense to easily avoid monomorphization bloat when there are potentially
many types of entity streams in one program. In Moonfire, it's used with
different streams defined in the `moonfire_nvr::web::mp4`,
`moonfire_nvr::bundled_ui`, and `http_serve::file` modules. Putting them
all into a single boxless enum would be a pain. In particular, the last
one is not a nameable type today and would need more generic parameters
to implement the caller-demanded `Entity` definition.

The former is once-per-slice, there are tons of slices per request, and
it's easy to define a two-case enum right where it's needed. So the
trade-off is quite different.

Also fix up some out-of-date comments.
  • Loading branch information
scottlamb committed Mar 8, 2025
1 parent 3cc9603 commit 2985214
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 60 deletions.
9 changes: 5 additions & 4 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ flate2 = "1.0.26"
hyper-util = { version = "0.1.7", features = ["server-graceful", "tokio"] }
http-body = "1.0.1"
http-body-util = "0.1.2"
pin-project = "1.1.10"

[target.'cfg(target_os = "linux")'.dependencies]
libsystemd = "0.7.0"
Expand Down
2 changes: 1 addition & 1 deletion server/db/dir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This mostly includes opening a directory and looking for recordings within it.
//! Updates to the directory happen through [crate::writer].
mod reader;
pub mod reader;

use crate::coding;
use crate::db::CompositeId;
Expand Down
84 changes: 44 additions & 40 deletions server/src/mp4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::BytesMut;
use db::dir;
use db::recording::{self, rescale, TIME_UNITS_PER_SEC};
use futures::stream::{self, TryStreamExt};
use futures::Stream;
use http::header::HeaderValue;
use hyper::body::Buf;
use pin_project::pin_project;
use reffers::ARefss;
use smallvec::SmallVec;
use std::cmp;
Expand Down Expand Up @@ -758,19 +758,37 @@ impl Slice {
}
}

#[pin_project(project = SliceStreamProj)]
enum SliceStream {
Once(Option<Result<Chunk, Error>>),
File(#[pin] db::dir::reader::FileStream),
}

impl futures::stream::Stream for SliceStream {
type Item = Result<Chunk, BoxedError>;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.project() {
SliceStreamProj::Once(o) => {
std::task::Poll::Ready(o.take().map(|r| r.map_err(wrap_error)))
}
SliceStreamProj::File(f) => f.poll_next(cx).map_ok(Chunk::from).map_err(wrap_error),
}
}
}

impl slices::Slice for Slice {
type Ctx = File;
type Chunk = Chunk;
type Stream = SliceStream;

fn end(&self) -> u64 {
self.0 & 0xFF_FF_FF_FF_FF
}
fn get_range(
&self,
f: &File,
range: Range<u64>,
len: u64,
) -> Box<dyn Stream<Item = Result<Self::Chunk, BoxedError>> + Send + Sync> {
fn get_range(&self, f: &File, range: Range<u64>, len: u64) -> SliceStream {
trace!("getting mp4 slice {:?}'s range {:?} / {}", self, range, len);
let p = self.p();
let res = match self.t() {
Expand Down Expand Up @@ -802,22 +820,20 @@ impl slices::Slice for Slice {
SliceType::SubtitleSampleData => f.0.get_subtitle_sample_data(p, range.clone(), len),
SliceType::Truns => self.wrap_truns(f, range.clone(), len as usize),
};
Box::new(stream::once(futures::future::ready(
res.map_err(wrap_error).and_then(move |c| {
if c.remaining() != (range.end - range.start) as usize {
return Err(wrap_error(err!(
Internal,
msg(
"{:?} range {:?} produced incorrect len {}",
self,
range,
c.remaining()
)
)));
}
Ok(c)
}),
)))
SliceStream::Once(Some(res.and_then(move |c| {
if c.remaining() != (range.end - range.start) as usize {
bail!(
Internal,
msg(
"{:?} range {:?} produced incorrect len {}",
self,
range,
c.remaining()
)
);
}
Ok(c)
})))
}

fn get_slices(ctx: &File) -> &Slices<Self> {
Expand Down Expand Up @@ -1796,32 +1812,20 @@ impl FileInner {
.into())
}

/// Gets a `Chunk` of video sample data from disk.
/// This works by `mmap()`ing in the data. There are a couple caveats:
///
/// * The thread which reads the resulting slice is likely to experience major page faults.
/// Eventually this will likely be rewritten to `mmap()` the memory in another thread, and
/// `mlock()` and send chunks of it to be read and `munlock()`ed to avoid this problem.
///
/// * If the backing file is truncated, the program will crash with `SIGBUS`. This shouldn't
/// happen because nothing should be touching Moonfire NVR's files but itself.
fn get_video_sample_data(
&self,
i: usize,
r: Range<u64>,
) -> Box<dyn Stream<Item = Result<Chunk, BoxedError>> + Send + Sync> {
/// Gets a stream representing a range of segment `i`'s sample data from disk.
fn get_video_sample_data(&self, i: usize, r: Range<u64>) -> SliceStream {
let s = &self.segments[i];
let sr = s.s.sample_file_range();
let f = match self.dirs_by_stream_id.get(&s.s.id.stream()) {
None => {
return Box::new(stream::iter(std::iter::once(Err(wrap_error(err!(
return SliceStream::Once(Some(Err(err!(
NotFound,
msg("{}: stream not found", s.s.id)
))))))
))))
}
Some(d) => d.open_file(s.s.id, (r.start + sr.start)..(r.end + sr.start)),
};
Box::new(f.map_ok(Chunk::from).map_err(wrap_error))
SliceStream::File(f)
}

fn get_subtitle_sample_data(&self, i: usize, r: Range<u64>, len: u64) -> Result<Chunk, Error> {
Expand Down
26 changes: 13 additions & 13 deletions server/src/slices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use tracing_futures::Instrument;
/// Each `Slice` instance belongs to a single `Slices`.
pub trait Slice: fmt::Debug + Sized + Sync + 'static {
type Ctx: Send + Sync + Clone;
type Chunk: Send + Sync;
type Chunk: Send + Sync + 'static;
type Stream: Stream<Item = Result<Self::Chunk, BoxedError>> + Send + Sync;

/// The byte position (relative to the start of the `Slices`) of the end of this slice,
/// exclusive. Note the starting position (and thus length) are inferred from the previous
Expand All @@ -27,12 +28,10 @@ pub trait Slice: fmt::Debug + Sized + Sync + 'static {
/// Gets the body bytes indicated by `r`, which is relative to this slice's start.
/// The additional argument `ctx` is as supplied to the `Slices`.
/// The additional argument `l` is the length of this slice, as determined by the `Slices`.
fn get_range(
&self,
ctx: &Self::Ctx,
r: Range<u64>,
len: u64,
) -> Box<dyn Stream<Item = Result<Self::Chunk, BoxedError>> + Sync + Send>;
///
/// Note that unlike [`http_entity::Entity::get_range`], this is called many times per request,
/// so it's worth defining a custom stream type to avoid allocation overhead.
fn get_range(&self, ctx: &Self::Ctx, r: Range<u64>, len: u64) -> Self::Stream;

fn get_slices(ctx: &Self::Ctx) -> &Slices<Self>;
}
Expand Down Expand Up @@ -127,7 +126,7 @@ where
}

/// Writes `range` to `out`.
/// This interface mirrors `http_serve::Entity::write_to`, with the additional `ctx` argument.
/// This interface mirrors `http_serve::Entity::get_range`, with the additional `ctx` argument.
pub fn get_range(
&self,
ctx: &S::Ctx,
Expand Down Expand Up @@ -170,7 +169,7 @@ where
let l = s_end - slice_start;
body = s.get_range(&c, start_pos..min_end - slice_start, l);
};
futures::future::ready(Some((Pin::from(body), (c, i + 1, 0, min_end))))
futures::future::ready(Some((body, (c, i + 1, 0, min_end))))
},
);
Box::pin(bodies.flatten().in_current_span())
Expand All @@ -182,7 +181,7 @@ mod tests {
use super::{Slice, Slices};
use crate::body::BoxedError;
use db::testutil;
use futures::stream::{self, Stream, TryStreamExt};
use futures::stream::{self, TryStreamExt};
use std::ops::Range;
use std::pin::Pin;

Expand All @@ -201,6 +200,7 @@ mod tests {
impl Slice for FakeSlice {
type Ctx = &'static Slices<FakeSlice>;
type Chunk = FakeChunk;
type Stream = stream::Once<futures::future::Ready<Result<FakeChunk, BoxedError>>>;

fn end(&self) -> u64 {
self.end
Expand All @@ -211,11 +211,11 @@ mod tests {
_ctx: &&'static Slices<FakeSlice>,
r: Range<u64>,
_l: u64,
) -> Box<dyn Stream<Item = Result<FakeChunk, BoxedError>> + Send + Sync> {
Box::new(stream::once(futures::future::ok(FakeChunk {
) -> Self::Stream {
stream::once(futures::future::ok(FakeChunk {
slice: self.name,
range: r,
})))
}))
}

fn get_slices(ctx: &&'static Slices<FakeSlice>) -> &'static Slices<Self> {
Expand Down
1 change: 0 additions & 1 deletion server/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use futures::StreamExt;
use retina::client::Demuxed;
use retina::codec::CodecItem;
use std::pin::Pin;
use std::result::Result;
use tracing::Instrument;
use url::Url;

Expand Down
1 change: 0 additions & 1 deletion server/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::stream;
use base::clock::{Clocks, TimerGuard};
use base::{bail, err, Error};
use db::{dir, recording, writer, Camera, Database, Stream};
use std::result::Result;
use std::str::FromStr;
use std::sync::Arc;
use tracing::{debug, info, trace, warn, Instrument};
Expand Down

0 comments on commit 2985214

Please sign in to comment.