Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datastore revamp 6: sunset LogMsg storage + save store to disk #1795

Merged
merged 1 commit into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 30 additions & 47 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeMap;

use nohash_hasher::IntMap;

use re_arrow_store::{DataStoreConfig, TimeInt};
Expand Down Expand Up @@ -159,33 +161,31 @@ impl EntityDb {
/// A in-memory database built from a stream of [`LogMsg`]es.
#[derive(Default)]
pub struct LogDb {
/// Messages in the order they arrived
chronological_row_ids: Vec<RowId>,
log_messages: ahash::HashMap<RowId, LogMsg>,

/// Data that was logged with [`TimePoint::timeless`].
/// We need to re-insert those in any new timelines
/// that are created after they were logged.
timeless_row_ids: Vec<RowId>,
/// All [`EntityPathOpMsg`]s ever received.
entity_op_msgs: BTreeMap<RowId, EntityPathOpMsg>,

/// Set by whomever created this [`LogDb`].
pub data_source: Option<re_smart_channel::Source>,

/// Comes in a special message, [`LogMsg::BeginRecordingMsg`].
recording_info: Option<RecordingInfo>,
recording_msg: Option<BeginRecordingMsg>,

/// Where we store the entities.
pub entity_db: EntityDb,
}

impl LogDb {
pub fn recording_msg(&self) -> Option<&BeginRecordingMsg> {
self.recording_msg.as_ref()
}

pub fn recording_info(&self) -> Option<&RecordingInfo> {
self.recording_info.as_ref()
self.recording_msg().map(|msg| &msg.info)
}

pub fn recording_id(&self) -> RecordingId {
if let Some(info) = &self.recording_info {
info.recording_id
if let Some(msg) = &self.recording_msg {
msg.info.recording_id
} else {
RecordingId::ZERO
}
Expand All @@ -203,11 +203,16 @@ impl LogDb {
self.entity_db.tree.num_timeless_messages()
}

pub fn num_rows(&self) -> usize {
self.entity_db.data_store.total_timeless_rows() as usize
+ self.entity_db.data_store.total_temporal_rows() as usize
}

pub fn is_empty(&self) -> bool {
self.log_messages.is_empty()
self.num_rows() == 0
}

pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
Expand All @@ -218,38 +223,27 @@ impl LogDb {
time_point,
path_op,
} = msg;
self.entity_op_msgs.insert(*row_id, msg.clone());
self.entity_db.add_path_op(*row_id, time_point, path_op);
}
LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
}

// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
self.chronological_row_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

Ok(())
}

fn add_begin_recording_msg(&mut self, msg: &BeginRecordingMsg) {
self.recording_info = Some(msg.info.clone());
self.recording_msg = Some(msg.clone());
}

pub fn len(&self) -> usize {
self.log_messages.len()
/// Returns an iterator over all [`EntityPathOpMsg`]s that have been written to this `LogDb`.
pub fn iter_entity_op_msgs(&self) -> impl Iterator<Item = &EntityPathOpMsg> {
self.entity_op_msgs.values()
}

/// In the order they arrived
pub fn chronological_log_messages(&self) -> impl Iterator<Item = &LogMsg> {
self.chronological_row_ids
.iter()
.filter_map(|id| self.get_log_msg(id))
}

pub fn get_log_msg(&self, row_id: &RowId) -> Option<&LogMsg> {
self.log_messages.get(row_id)
pub fn get_entity_op_msg(&self, row_id: &RowId) -> Option<&EntityPathOpMsg> {
self.entity_op_msgs.get(row_id)
}

/// Free up some RAM by forgetting the older parts of all timelines.
Expand All @@ -263,26 +257,15 @@ impl LogDb {
let cutoff_times = self.entity_db.data_store.oldest_time_per_timeline();

let Self {
chronological_row_ids,
log_messages,
timeless_row_ids,
entity_op_msgs,
data_source: _,
recording_info: _,
recording_msg: _,
entity_db,
} = self;

{
crate::profile_scope!("chronological_row_ids");
chronological_row_ids.retain(|row_id| !drop_row_ids.contains(row_id));
}

{
crate::profile_scope!("log_messages");
log_messages.retain(|row_id, _| !drop_row_ids.contains(row_id));
}
{
crate::profile_scope!("timeless_row_ids");
timeless_row_ids.retain(|row_id| !drop_row_ids.contains(row_id));
crate::profile_scope!("entity_op_msgs");
entity_op_msgs.retain(|row_id, _| !drop_row_ids.contains(row_id));
}

entity_db.purge(&cutoff_times, &drop_row_ids);
Expand Down
11 changes: 11 additions & 0 deletions crates/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,14 @@ pub fn encode<'a>(
}
encoder.finish()
}

pub fn encode_owned(
messages: impl Iterator<Item = LogMsg>,
write: impl std::io::Write,
) -> Result<(), EncodeError> {
let mut encoder = Encoder::new(write)?;
for message in messages {
encoder.append(&message)?;
}
encoder.finish()
}
2 changes: 1 addition & 1 deletion crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ impl DataTable {
/// Internally, time columns are (de)serialized separately from the rest of the control
/// columns for efficiency/QOL concerns: that doesn't change the fact that they are control
/// columns all the same!
/// * Data columns are the one that hold component data.
/// * Data columns are the ones that hold component data.
/// They are optional, potentially sparse, and never deserialized on the server-side (not by
/// the storage systems, at least).
pub fn serialize(&self) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
Expand Down
113 changes: 51 additions & 62 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ impl eframe::App for App {
log_db,
)
.selection_state
.on_frame_start(log_db, blueprint);
.on_frame_start(blueprint);

{
// TODO(andreas): store the re_renderer somewhere else.
Expand Down Expand Up @@ -704,7 +704,7 @@ impl App {
log_db.data_source = Some(self.rx.source().clone());
}

if let Err(err) = log_db.add(msg) {
if let Err(err) = log_db.add(&msg) {
re_log::error!("Failed to add incoming msg: {err}");
};

Expand Down Expand Up @@ -916,8 +916,6 @@ fn preview_files_being_dropped(egui_ctx: &egui::Context) {
enum PanelSelection {
#[default]
Viewport,

EventLog,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: keeping the whole panel system in place even though we're back to one panel... for now

}

#[derive(Default, serde::Deserialize, serde::Serialize)]
Expand All @@ -940,8 +938,6 @@ struct AppState {
/// Which view panel is currently being shown
panel_selection: PanelSelection,

event_log_view: crate::event_log_view::EventLogView,

selection_panel: crate::selection_panel::SelectionPanel,
time_panel: crate::time_panel::TimePanel,

Expand Down Expand Up @@ -969,7 +965,6 @@ impl AppState {
selected_rec_id,
recording_configs,
panel_selection,
event_log_view,
blueprints,
selection_panel,
time_panel,
Expand Down Expand Up @@ -1014,7 +1009,6 @@ impl AppState {
.entry(selected_app_id)
.or_insert_with(|| Blueprint::new(ui.ctx()))
.blueprint_panel_and_viewport(&mut ctx, ui),
PanelSelection::EventLog => event_log_view.ui(&mut ctx, ui),
});

// move time last, so we get to see the first data first!
Expand Down Expand Up @@ -1523,7 +1517,13 @@ fn save(app: &mut App, loop_selection: Option<(re_data_store::Timeline, TimeRang
.set_title(title)
.save_file()
{
let f = save_database_to_file(app.log_db(), path, loop_selection);
let f = match save_database_to_file(app.log_db(), path, loop_selection) {
Ok(f) => f,
Err(err) => {
re_log::error!("File saving failed: {err}");
return;
}
};
if let Err(err) = app.spawn_threaded_promise(FILE_SAVER_PROMISE, f) {
// NOTE: Shouldn't even be possible as the "Save" button is already
// grayed out at this point... better safe than sorry though.
Expand All @@ -1546,16 +1546,6 @@ fn main_view_selector_ui(ui: &mut egui::Ui, app: &mut App) {
{
ui.close_menu();
}
if ui
.selectable_value(
&mut app.state.panel_selection,
PanelSelection::EventLog,
"Event Log",
)
.clicked()
{
ui.close_menu();
}
});
}
}
Expand Down Expand Up @@ -1760,57 +1750,56 @@ fn save_database_to_file(
log_db: &LogDb,
path: std::path::PathBuf,
time_selection: Option<(re_data_store::Timeline, TimeRangeF)>,
) -> impl FnOnce() -> anyhow::Result<std::path::PathBuf> {
use re_log_types::{EntityPathOpMsg, TimeInt};

let msgs = match time_selection {
// Fast path: no query, just dump everything.
None => log_db
.chronological_log_messages()
.cloned()
.collect::<Vec<_>>(),

// Query path: time to filter!
Some((timeline, range)) => {
use std::ops::RangeInclusive;
let range: RangeInclusive<TimeInt> = range.min.floor()..=range.max.ceil();
log_db
.chronological_log_messages()
.filter(|msg| {
match msg {
LogMsg::BeginRecordingMsg(_) | LogMsg::Goodbye(_) => {
true // timeless
}
LogMsg::EntityPathOpMsg(_, EntityPathOpMsg { time_point, .. }) => {
time_point.is_timeless() || {
let is_within_range = time_point
.get(&timeline)
.map_or(false, |t| range.contains(t));
is_within_range
}
}
LogMsg::ArrowMsg(_, _) => {
// TODO(john)
false
}
}
})
.cloned()
.collect::<Vec<_>>()
}
};
) -> anyhow::Result<impl FnOnce() -> anyhow::Result<std::path::PathBuf>> {
use re_arrow_store::TimeRange;

crate::profile_scope!("dump_messages");

move || {
let begin_rec_msg = log_db
.recording_msg()
.map(|msg| LogMsg::BeginRecordingMsg(msg.clone()));

let ent_op_msgs = log_db
.iter_entity_op_msgs()
.map(|msg| LogMsg::EntityPathOpMsg(log_db.recording_id(), msg.clone()))
.collect_vec();

let time_filter = time_selection.map(|(timeline, range)| {
(
timeline,
TimeRange::new(range.min.floor(), range.max.ceil()),
)
});
let data_msgs: Result<Vec<_>, _> = log_db
.entity_db
.data_store
.to_data_tables(time_filter)
.map(|table| {
table
.to_arrow_msg()
.map(|msg| LogMsg::ArrowMsg(log_db.recording_id(), msg))
})
.collect();

use anyhow::Context as _;
let data_msgs = data_msgs.with_context(|| "Failed to export to data tables")?;

let msgs = std::iter::once(begin_rec_msg)
.flatten() // option
.chain(ent_op_msgs)
.chain(data_msgs);

Ok(move || {
crate::profile_scope!("save_to_file");

use anyhow::Context as _;
let file = std::fs::File::create(path.as_path())
.with_context(|| format!("Failed to create file at {path:?}"))?;

re_log_encoding::encoder::encode(msgs.iter(), file)
re_log_encoding::encoder::encode_owned(msgs, file)
.map(|_| path)
.context("Message encode")
}
})
}

#[allow(unused_mut)]
Expand All @@ -1821,7 +1810,7 @@ fn load_rrd_to_log_db(mut read: impl std::io::Read) -> anyhow::Result<LogDb> {

let mut log_db = LogDb::default();
for msg in decoder {
log_db.add(msg?)?;
log_db.add(&msg?)?;
}
Ok(log_db)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod viewer_analytics;

pub(crate) use misc::{mesh_loader, Item, TimeControl, TimeView, ViewerContext};
use re_log_types::PythonVersion;
pub(crate) use ui::{event_log_view, memory_panel, selection_panel, time_panel, UiVerbosity};
pub(crate) use ui::{memory_panel, selection_panel, time_panel, UiVerbosity};

pub use app::{App, StartupOptions};
pub use remote_viewer_app::RemoteViewerApp;
Expand Down
Loading