Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Merge branch 'TimelyDataflow:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
antiguru authored Nov 21, 2023
2 parents d9896ce + 5ea6fe5 commit b55e5b6
Showing 1 changed file with 138 additions and 34 deletions.
172 changes: 138 additions & 34 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! and should consume fewer resources (computation and memory) when it applies.
use std::rc::Rc;
use timely::container::columnation::TimelyStack;

use trace::implementations::spine_fueled::Spine;

Expand All @@ -18,21 +19,24 @@ use self::val_batch::{OrdValBatch};


/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R), O>>>>;
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R), O>, Vec<((K,V),T,R)>>>>;
// /// A trace implementation for empty values using a spine of ordered lists.
// pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R), O>>>>;

/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>>>>;
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>, TimelyStack<((K,V),T,R)>>>>;
// /// A trace implementation backed by columnar storage.
// pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R), O>>>>;

mod val_batch {

use std::convert::TryInto;
use std::marker::PhantomData;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::progress::{Antichain, frontier::AntichainRef};

use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher;
use trace::layers::BatchContainer;

use super::{Layout, Update};
Expand Down Expand Up @@ -69,42 +73,61 @@ mod val_batch {
}
/// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
fn updates_for_value(&self, index: usize) -> (usize, usize) {
(self.vals_offs[index].try_into().ok().unwrap(), self.vals_offs[index+1].try_into().ok().unwrap())
let mut lower = self.vals_offs[index].try_into().ok().unwrap();
let upper = self.vals_offs[index+1].try_into().ok().unwrap();
// We use equal lower and upper to encode "singleton update; just before here".
// It should only apply when there is a prior element, so `lower` should be greater than zero.
if lower == upper {
assert!(lower > 0);
lower -= 1;
}
(lower, upper)
}
}

/// An immutable collection of update tuples, from a contiguous interval of logical times.
///
/// The `L` parameter captures how the updates should be laid out, and `C` determines which
/// merge batcher to select.
#[derive(Abomonation)]
pub struct OrdValBatch<L: Layout> {
pub struct OrdValBatch<L: Layout, C> {
/// The updates themselves.
pub storage: OrdValStorage<L>,
/// Description of the update times this layer represents.
pub description: Description<<L::Target as Update>::Time>,
/// The number of updates reflected in the batch.
///
/// We track this separately from `storage` because due to the singleton optimization,
/// we may have many more updates than `storage.updates.len()`. It should equal that
/// length, plus the number of singleton optimizations employed.
pub updates: usize,
/// Phantom marker for Rust happiness.
pub phantom: PhantomData<C>,
}

impl<L: Layout> BatchReader for OrdValBatch<L> {
impl<L: Layout, C> BatchReader for OrdValBatch<L, C> {
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Cursor = OrdValCursor<L>;
type Cursor = OrdValCursor<L, C>;
fn cursor(&self) -> Self::Cursor {
OrdValCursor {
key_cursor: 0,
val_cursor: 0,
phantom: std::marker::PhantomData,
phantom: PhantomData,
}
}
fn len(&self) -> usize {
// Normally this would be `self.updates.len()`, but we have a clever compact encoding.
// Perhaps we should count such exceptions to the side, to provide a correct accounting.
self.storage.updates.len()
self.updates
}
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
}

impl<L: Layout> Batch for OrdValBatch<L> {
impl<L: Layout> Batch for OrdValBatch<L, Vec<L::Target>> {
type Batcher = MergeBatcher<Self>;
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;
Expand All @@ -114,6 +137,23 @@ mod val_batch {
}
}

impl<L: Layout> Batch for OrdValBatch<L, TimelyStack<L::Target>>
where
<L as Layout>::Target: Columnation,
Self::Key: Columnation + 'static,
Self::Val: Columnation + 'static,
Self::Time: Columnation + 'static,
Self::R: Columnation + 'static,
{
type Batcher = ColumnatedMergeBatcher<Self>;
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdValMerger::new(self, other, compaction_frontier)
}
}

/// State for an in-progress merge.
pub struct OrdValMerger<L: Layout> {
/// Key position to merge next in the first batch.
Expand All @@ -130,10 +170,15 @@ mod val_batch {
/// We could emulate a `ChangeBatch` here, with related compaction smarts.
/// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
/// Counts the number of singleton-optimized entries, that we may correctly count the updates.
singletons: usize,
}

impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L> {
fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
impl<L: Layout, C> Merger<OrdValBatch<L, C>> for OrdValMerger<L>
where
OrdValBatch<L, C>: Batch<Time=<L::Target as Update>::Time>
{
fn new(batch1: &OrdValBatch<L, C>, batch2: &OrdValBatch<L, C>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {

assert!(batch1.upper() == batch2.lower());
use lattice::Lattice;
Expand Down Expand Up @@ -162,15 +207,18 @@ mod val_batch {
result: storage,
description,
update_stash: Vec::new(),
singletons: 0,
}
}
fn done(self) -> OrdValBatch<L> {
fn done(self) -> OrdValBatch<L, C> {
OrdValBatch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: self.description,
phantom: PhantomData
}
}
fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
fn work(&mut self, source1: &OrdValBatch<L, C>, source2: &OrdValBatch<L, C>, fuel: &mut isize) {

// An (incomplete) indication of the amount of work we've done so far.
let starting_updates = self.result.updates.len();
Expand Down Expand Up @@ -348,8 +396,18 @@ mod val_batch {
use consolidation;
consolidation::consolidate(&mut self.update_stash);
if !self.update_stash.is_empty() {
for item in self.update_stash.drain(..) {
self.result.updates.push(item);
// If there is a single element, equal to a just-prior recorded update,
// we push nothing and report an unincremented offset to encode this case.
if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() {
// Just clear out update_stash, as we won't drain it here.
self.update_stash.clear();
self.singletons += 1;
}
else {
// Conventional; move `update_stash` into `updates`.
for item in self.update_stash.drain(..) {
self.result.updates.push(item);
}
}
Some(self.result.updates.len().try_into().ok().unwrap())
} else {
Expand All @@ -359,25 +417,25 @@ mod val_batch {
}

/// A cursor for navigating a single layer.
pub struct OrdValCursor<L: Layout> {
pub struct OrdValCursor<L: Layout, C> {
/// Absolute position of the current key.
key_cursor: usize,
/// Absolute position of the current value.
val_cursor: usize,
/// Phantom marker for Rust happiness.
phantom: std::marker::PhantomData<L>,
phantom: PhantomData<(L, C)>,
}

impl<L: Layout> Cursor for OrdValCursor<L> {
impl<L: Layout, C> Cursor for OrdValCursor<L, C> {
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Storage = OrdValBatch<L>;
type Storage = OrdValBatch<L, C>;

fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &storage.storage.keys.index(self.key_cursor.try_into().ok().unwrap()) }
fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &storage.storage.vals.index(self.val_cursor.try_into().ok().unwrap()) }
fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(&Self::Time, &Self::R)>(&mut self, storage: &Self::Storage, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
for index in lower .. upper {
Expand Down Expand Up @@ -425,9 +483,46 @@ mod val_batch {
/// A builder for creating layers from unsorted update tuples.
pub struct OrdValBuilder<L: Layout> {
result: OrdValStorage<L>,
singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
/// Counts the number of singleton optimizations we performed.
///
/// This number allows us to correctly gauge the total number of updates reflected in a batch,
/// even though `updates.len()` may be much shorter than this amount.
singletons: usize,
}

impl<L: Layout> Builder<OrdValBatch<L>> for OrdValBuilder<L> {
impl<L: Layout> OrdValBuilder<L> {
/// Pushes a single update, which may set `self.singleton` rather than push.
///
/// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
/// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
/// to encode a singleton update with an "absert" update: repeating the most recent offset.
/// This otherwise invalid state encodes "look back one element".
///
/// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
/// previously pushed update exactly. In that case, we do not push the update into `updates`.
/// The update tuple is retained in `self.singleton` in case we see another update and need
/// to recover the singleton to push it into `updates` to join the second update.
fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
// If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) {
assert!(self.singleton.is_none());
self.singleton = Some((time, diff));
}
else {
// If we have pushed a single element, we need to copy it out to meet this one.
if let Some(time_diff) = self.singleton.take() {
self.result.updates.push(time_diff);
}
self.result.updates.push((time, diff));
}
}
}

impl<L: Layout, C> Builder<OrdValBatch<L, C>> for OrdValBuilder<L>
where
OrdValBatch<L, C>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
{

fn new() -> Self { Self::with_capacity(0) }
fn with_capacity(cap: usize) -> Self {
Expand All @@ -439,7 +534,9 @@ mod val_batch {
vals: L::ValContainer::with_capacity(cap),
vals_offs: Vec::with_capacity(cap),
updates: L::UpdContainer::with_capacity(cap),
}
},
singleton: None,
singletons: 0,
}
}

Expand All @@ -450,20 +547,20 @@ mod val_batch {
if self.result.keys.last() == Some(&key) {
// Perhaps this is a continuation of an already received value.
if self.result.vals.last() == Some(&val) {
// TODO: here we could look for repetition, and not push the update in that case.
// More logic (and state) would be required to correctly wrangle this.
self.result.updates.push((time, diff));
self.push_update(time, diff);
} else {
// New value; complete representation of prior value.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
self.result.updates.push((time, diff));
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
self.result.vals.push(val);
}
} else {
// New key; complete representation of prior key.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap());
self.result.updates.push((time, diff));
self.push_update(time, diff);
self.result.vals.push(val);
self.result.keys.push(key);
}
Expand All @@ -478,32 +575,39 @@ mod val_batch {
if self.result.vals.last() == Some(val) {
// TODO: here we could look for repetition, and not push the update in that case.
// More logic (and state) would be required to correctly wrangle this.
self.result.updates.push((time.clone(), diff.clone()));
self.push_update(time.clone(), diff.clone());
} else {
// New value; complete representation of prior value.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
self.result.updates.push((time.clone(), diff.clone()));
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time.clone(), diff.clone());
self.result.vals.copy(val);
}
} else {
// New key; complete representation of prior key.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap());
self.result.updates.push((time.clone(), diff.clone()));
self.push_update(time.clone(), diff.clone());
self.result.vals.copy(val);
self.result.keys.copy(key);
}
}

#[inline(never)]
fn done(mut self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdValBatch<L> {
fn done(mut self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdValBatch<L, C> {
// Record the final offsets
self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap());
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());

// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap());
OrdValBatch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: Description::new(lower, upper, since),
phantom: PhantomData,
}
}
}
Expand Down

0 comments on commit b55e5b6

Please sign in to comment.