diff --git a/examples/spines.rs b/examples/spines.rs index f18ca5ad9..81fa438b2 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -51,6 +51,23 @@ fn main() { keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, + "slc" => { + + use differential_dataflow::trace::implementations::ord_neu::PreferredSpine; + use differential_dataflow::operators::reduce::ReduceCore; + + let data = + data.map(|x| (x.clone().into_bytes(), x.into_bytes())) + .arrange::>() + .reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1))); + let keys = + keys.map(|x| (x.clone().into_bytes(), 7)) + .arrange::>() + .reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1))); + + keys.join_core(&data, |_k,&(),&()| Option::<()>::None) + .probe_with(&mut probe); + }, _ => { println!("unreconized mode: {:?}", mode) } diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index f4406ad5e..7373dc188 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -434,27 +434,27 @@ where } } -/// A type that can be arranged into a trace of type `T`. +/// A type that can be arranged as if a collection of updates shaped as `((K,V),G::Timestamp,R)`. /// -/// This trait is implemented for appropriately typed collections and all traces that might accommodate them, -/// as well as by arranged data for their corresponding trace type. -pub trait Arrange +/// This trait is primarily implemented by `Collection`. +/// +/// The resulting arrangements may not present as `((K,V),T,R)`, as their output types are unconstrained. +/// This allows e.g. for `Vec` inputs to present as `&[u8]` when read, but that relationship is not +/// constrained by this trait. +pub trait Arrange where + G: Scope, G::Timestamp: Lattice, - K: Data, - V: Data, { - /// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type. + /// Arranges a stream of `(Key, Val)` updates by `Key`. /// /// This operator arranges a stream of values into a shared trace, whose contents it maintains. - /// This trace is current for all times marked completed in the output stream, and probing this stream - /// is the correct way to determine that times in the shared trace are committed. fn arrange(&self) -> Arranged> where - K: ExchangeData+Hashable, + Tr: Trace + 'static, + K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace+TraceReader+'static, Tr::Batch: Batch, Tr::Batcher: Batcher, Tr::Builder: Builder, @@ -462,17 +462,15 @@ where self.arrange_named("Arrange") } - /// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type. + /// Arranges a stream of `(Key, Val)` updates by `Key`, and presents with a `name` argument. /// /// This operator arranges a stream of values into a shared trace, whose contents it maintains. - /// This trace is current for all times marked completed in the output stream, and probing this stream - /// is the correct way to determine that times in the shared trace are committed. fn arrange_named(&self, name: &str) -> Arranged> where - K: ExchangeData+Hashable, + Tr: Trace + 'static, + K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData, - Tr: Trace+TraceReader+'static, Tr::Batch: Batch, Tr::Batcher: Batcher, Tr::Builder: Builder, @@ -481,15 +479,18 @@ where self.arrange_core(exchange, name) } - /// Arranges a stream of `(Key, Val)` updates by `Key`. Accepts an empty instance of the trace type. + /// Arranges a stream of `(Key, Val)` updates by `Key`, configured with a name and a parallelization contract. /// /// This operator arranges a stream of values into a shared trace, whose contents it maintains. - /// This trace is current for all times marked completed in the output stream, and probing this stream - /// is the correct way to determine that times in the shared trace are committed. + /// It uses the supplied parallelization contract to distribute the data, which does not need to + /// be consistently by key (though this is the most common). fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + K: Clone, + V: Clone, + R: Clone, + Tr: Trace+'static, Tr::Batch: Batch, Tr::Batcher: Batcher, Tr::Builder: Builder, @@ -499,40 +500,15 @@ where impl Arrange for Collection where G: Scope, - G::Timestamp: Lattice+Ord, - K: Data, - V: Data, + G::Timestamp: Lattice, + K: Clone + 'static, + V: Clone + 'static, R: Semigroup, { - fn arrange(&self) -> Arranged> - where - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, - { - self.arrange_named("Arrange") - } - - fn arrange_named(&self, name: &str) -> Arranged> - where - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, - Tr: Trace + TraceReader + 'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, - { - let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - self.arrange_core(exchange, name) - } - fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + Tr: Trace+'static, Tr::Batch: Batch, Tr::Batcher: Batcher, Tr::Builder: Builder, @@ -697,7 +673,7 @@ where fn arrange_core(&self, pact: P, name: &str) -> Arranged> where P: ParallelizationContract, - Tr: Trace+TraceReader+'static, + Tr: Trace+'static, Tr::Batch: Batch, Tr::Batcher: Batcher, Tr::Builder: Builder, diff --git a/src/operators/join.rs b/src/operators/join.rs index c1708a759..48e508ab6 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -3,7 +3,6 @@ //! The various `join` implementations require that the units of each collection can be multiplied, and that //! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c) //! + (b * c), and if this is not equal to the former term, little is known about the actual output. -use std::fmt::Debug; use std::cmp::Ordering; use timely::order::PartialOrder; @@ -215,7 +214,7 @@ where /// This method is used by the various `join` implementations, but it can also be used /// directly in the event that one has a handle to an `Arranged`, perhaps because /// the arrangement is available for re-use, or from the output of a `reduce` operator. -pub trait JoinCore where G::Timestamp: Lattice+Ord { +pub trait JoinCore where G::Timestamp: Lattice+Ord { /// Joins two arranged collections with the same key type. /// @@ -255,7 +254,7 @@ pub trait JoinCore where G::Time fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+Clone+Debug+'static, + Tr2::Val: Ord+'static, Tr2::R: Semigroup, R: Multiply, >::Output: Semigroup, @@ -305,7 +304,7 @@ pub trait JoinCore where G::Time fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+Clone+Debug+'static, + Tr2::Val: Ord+'static, Tr2::R: Semigroup, D: Data, ROut: Semigroup, @@ -326,7 +325,7 @@ where fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+Clone+Debug+'static, + Tr2::Val: Ord+'static, Tr2::R: Semigroup, R: Multiply, >::Output: Semigroup, @@ -341,7 +340,7 @@ where fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+Clone+Debug+'static, + Tr2::Val: Ord+'static, Tr2::R: Semigroup, R: Semigroup, D: Data, @@ -357,15 +356,15 @@ where impl JoinCore for Arranged where G: Scope, - G::Timestamp: Lattice+Ord+Debug, + G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Key: Ord+Debug+'static, - T1::Val: Ord+Clone+Debug+'static, + T1::Key: Ord+'static, + T1::Val: Ord+'static, T1::R: Semigroup, { fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> where - Tr2::Val: Ord+Clone+Debug+'static, + Tr2::Val: Ord+'static, Tr2: TraceReader+Clone+'static, Tr2::R: Semigroup, T1::R: Multiply, @@ -382,270 +381,297 @@ impl JoinCore for Arranged self.join_core_internal_unsafe(other, result) } - fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> Collection + fn join_core_internal_unsafe (&self, other: &Arranged, result: L) -> Collection where Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+Clone+Debug+'static, + Tr2::Val: Ord+'static, Tr2::R: Semigroup, D: Data, ROut: Semigroup, I: IntoIterator, L: FnMut(&T1::Key,&T1::Val,&Tr2::Val,&G::Timestamp,&T1::R,&Tr2::R)->I+'static, { - // Rename traces for symmetry from here on out. - let mut trace1 = self.trace.clone(); - let mut trace2 = other.trace.clone(); - - self.stream.binary_frontier(&other.stream, Pipeline, Pipeline, "Join", move |capability, info| { - - // Acquire an activator to reschedule the operator when it has unfinished work. - use timely::scheduling::Activator; - let activations = self.stream.scope().activations().clone(); - let activator = Activator::new(&info.address[..], activations); - - // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound. - // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as - // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the - // initial work for the two traces, and before the operator is constructed. - - // Acknowledged frontier for each input. - // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`. - // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond - // the physical compaction frontier of their corresponding trace. - // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used. - use timely::progress::frontier::Antichain; - let mut acknowledged1 = Antichain::from_elem(::minimum()); - let mut acknowledged2 = Antichain::from_elem(::minimum()); - - // deferred work of batches from each input. - let mut todo1 = std::collections::VecDeque::new(); - let mut todo2 = std::collections::VecDeque::new(); - - // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start. - trace1.map_batches(|batch1| { - acknowledged1.clone_from(batch1.upper()); - // No `todo1` work here, because we haven't accepted anything into `batches2` yet. - // It is effectively "empty", because we choose to drain `trace1` before `trace2`. - // Once we start streaming batches in, we will need to respond to new batches from - // `input1` with logic that would have otherwise been here. Check out the next loop - // for the structure. - }); - // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by - // iterating through batches and capturing the upper bound. This is a great moment to assert that - // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`. - // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier. - assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow())); - - // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock - // on both traces at the same time, as they could be the same trace and this would panic. - let mut batch2_cursors = Vec::new(); - trace2.map_batches(|batch2| { - acknowledged2.clone_from(batch2.upper()); - batch2_cursors.push((batch2.cursor(), batch2.clone())); - }); - // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by - // iterating through batches and capturing the upper bound. This is a great moment to assert that - // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`. - // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier. - assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow())); - - // Load up deferred work using trace2 cursors and batches captured just above. - for (batch2_cursor, batch2) in batch2_cursors.into_iter() { - // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`. - let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap(); - // We could downgrade the capability here, but doing so is a bit complicated mathematically. - // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not - // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have - // that property. - todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone())); - } + join_traces(self, other, result) + } +} + +/// An equijoin of two traces, sharing a common key type. +/// +/// This method exists to provide join functionality without opinions on the specific input types, keys and values, +/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and +/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic +/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments. +/// +/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function. +pub fn join_traces(arranged1: &Arranged, arranged2: &Arranged, mut result: L) -> Collection +where + G: Scope, + G::Timestamp: Lattice+Ord, + T1: TraceReader+Clone+'static, + T1::Key: Ord, + T1::Val: Ord, + T1::R: Semigroup, + T2: TraceReader+Clone+'static, + T2::Val: Ord, + T2::R: Semigroup, + D: Data, + R: Semigroup, + I: IntoIterator, + L: FnMut(&T1::Key,&T1::Val,&T2::Val,&G::Timestamp,&T1::R,&T2::R)->I+'static, +{ + // Rename traces for symmetry from here on out. + let mut trace1 = arranged1.trace.clone(); + let mut trace2 = arranged2.trace.clone(); + + arranged1.stream.binary_frontier(&arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| { + + // Acquire an activator to reschedule the operator when it has unfinished work. + use timely::scheduling::Activator; + let activations = arranged1.stream.scope().activations().clone(); + let activator = Activator::new(&info.address[..], activations); + + // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound. + // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as + // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the + // initial work for the two traces, and before the operator is constructed. + + // Acknowledged frontier for each input. + // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`. + // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond + // the physical compaction frontier of their corresponding trace. + // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used. + use timely::progress::frontier::Antichain; + let mut acknowledged1 = Antichain::from_elem(::minimum()); + let mut acknowledged2 = Antichain::from_elem(::minimum()); + + // deferred work of batches from each input. + let mut todo1 = std::collections::VecDeque::new(); + let mut todo2 = std::collections::VecDeque::new(); + + // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start. + trace1.map_batches(|batch1| { + acknowledged1.clone_from(batch1.upper()); + // No `todo1` work here, because we haven't accepted anything into `batches2` yet. + // It is effectively "empty", because we choose to drain `trace1` before `trace2`. + // Once we start streaming batches in, we will need to respond to new batches from + // `input1` with logic that would have otherwise been here. Check out the next loop + // for the structure. + }); + // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by + // iterating through batches and capturing the upper bound. This is a great moment to assert that + // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`. + // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier. + assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow())); + + // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock + // on both traces at the same time, as they could be the same trace and this would panic. + let mut batch2_cursors = Vec::new(); + trace2.map_batches(|batch2| { + acknowledged2.clone_from(batch2.upper()); + batch2_cursors.push((batch2.cursor(), batch2.clone())); + }); + // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by + // iterating through batches and capturing the upper bound. This is a great moment to assert that + // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`. + // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier. + assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow())); + + // Load up deferred work using trace2 cursors and batches captured just above. + for (batch2_cursor, batch2) in batch2_cursors.into_iter() { + // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`. + let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap(); + // We could downgrade the capability here, but doing so is a bit complicated mathematically. + // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not + // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have + // that property. + todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone())); + } - // Droppable handles to shared trace data structures. - let mut trace1_option = Some(trace1); - let mut trace2_option = Some(trace2); - - // Swappable buffers for input extraction. - let mut input1_buffer = Vec::new(); - let mut input2_buffer = Vec::new(); - - move |input1, input2, output| { - - // 1. Consuming input. - // - // The join computation repeatedly accepts batches of updates from each of its inputs. - // - // For each accepted batch, it prepares a work-item to join the batch against previously "accepted" - // updates from its other input. It is important to track which updates have been accepted, because - // we use a shared trace and there may be updates present that are in advance of this accepted bound. - // - // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream, - // and 3. if the trace can confirm a region of empty space directly following our accepted bound. - // This last case is a consequence of our inability to transmit empty batches, as they may be formed - // in the absence of timely dataflow capabilities. - - // Drain input 1, prepare work. - input1.for_each(|capability, data| { - // This test *should* always pass, as we only drop a trace in response to the other input emptying. - if let Some(ref mut trace2) = trace2_option { - let capability = capability.retain(); - data.swap(&mut input1_buffer); - for batch1 in input1_buffer.drain(..) { - // Ignore any pre-loaded data. - if PartialOrder::less_equal(&acknowledged1, &batch1.lower()) { - if !batch1.is_empty() { - // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()` - // at start-up, and have held back physical compaction ever since. - let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap(); - let batch1_cursor = batch1.cursor(); - todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone())); - } - - // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we - // may have skipped over empty batches. Still, the batches are in-order, and we should be - // able to just assume the most recent `batch1.upper` - debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper())); - acknowledged1.clone_from(batch1.upper()); + // Droppable handles to shared trace data structures. + let mut trace1_option = Some(trace1); + let mut trace2_option = Some(trace2); + + // Swappable buffers for input extraction. + let mut input1_buffer = Vec::new(); + let mut input2_buffer = Vec::new(); + + move |input1, input2, output| { + + // 1. Consuming input. + // + // The join computation repeatedly accepts batches of updates from each of its inputs. + // + // For each accepted batch, it prepares a work-item to join the batch against previously "accepted" + // updates from its other input. It is important to track which updates have been accepted, because + // we use a shared trace and there may be updates present that are in advance of this accepted bound. + // + // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream, + // and 3. if the trace can confirm a region of empty space directly following our accepted bound. + // This last case is a consequence of our inability to transmit empty batches, as they may be formed + // in the absence of timely dataflow capabilities. + + // Drain input 1, prepare work. + input1.for_each(|capability, data| { + // This test *should* always pass, as we only drop a trace in response to the other input emptying. + if let Some(ref mut trace2) = trace2_option { + let capability = capability.retain(); + data.swap(&mut input1_buffer); + for batch1 in input1_buffer.drain(..) { + // Ignore any pre-loaded data. + if PartialOrder::less_equal(&acknowledged1, &batch1.lower()) { + if !batch1.is_empty() { + // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()` + // at start-up, and have held back physical compaction ever since. + let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap(); + let batch1_cursor = batch1.cursor(); + todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone())); } + + // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we + // may have skipped over empty batches. Still, the batches are in-order, and we should be + // able to just assume the most recent `batch1.upper` + debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper())); + acknowledged1.clone_from(batch1.upper()); } } - else { panic!("`trace2_option` dropped before `input1` emptied!"); } - }); - - // Drain input 2, prepare work. - input2.for_each(|capability, data| { - // This test *should* always pass, as we only drop a trace in response to the other input emptying. - if let Some(ref mut trace1) = trace1_option { - let capability = capability.retain(); - data.swap(&mut input2_buffer); - for batch2 in input2_buffer.drain(..) { - // Ignore any pre-loaded data. - if PartialOrder::less_equal(&acknowledged2, &batch2.lower()) { - if !batch2.is_empty() { - // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()` - // at start-up, and have held back physical compaction ever since. - let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap(); - let batch2_cursor = batch2.cursor(); - todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone())); - } - - // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we - // may have skipped over empty batches. Still, the batches are in-order, and we should be - // able to just assume the most recent `batch2.upper` - debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper())); - acknowledged2.clone_from(batch2.upper()); + } + else { panic!("`trace2_option` dropped before `input1` emptied!"); } + }); + + // Drain input 2, prepare work. + input2.for_each(|capability, data| { + // This test *should* always pass, as we only drop a trace in response to the other input emptying. + if let Some(ref mut trace1) = trace1_option { + let capability = capability.retain(); + data.swap(&mut input2_buffer); + for batch2 in input2_buffer.drain(..) { + // Ignore any pre-loaded data. + if PartialOrder::less_equal(&acknowledged2, &batch2.lower()) { + if !batch2.is_empty() { + // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()` + // at start-up, and have held back physical compaction ever since. + let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap(); + let batch2_cursor = batch2.cursor(); + todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone())); } + + // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we + // may have skipped over empty batches. Still, the batches are in-order, and we should be + // able to just assume the most recent `batch2.upper` + debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper())); + acknowledged2.clone_from(batch2.upper()); } } - else { panic!("`trace1_option` dropped before `input2` emptied!"); } - }); - - // Advance acknowledged frontiers through any empty regions that we may not receive as batches. - if let Some(trace1) = trace1_option.as_mut() { - trace1.advance_upper(&mut acknowledged1); - } - if let Some(trace2) = trace2_option.as_mut() { - trace2.advance_upper(&mut acknowledged2); } + else { panic!("`trace1_option` dropped before `input2` emptied!"); } + }); - // 2. Join computation. - // - // For each of the inputs, we do some amount of work (measured in terms of number - // of output records produced). This is meant to yield control to allow downstream - // operators to consume and reduce the output, but it it also means to provide some - // degree of responsiveness. There is a potential risk here that if we fall behind - // then the increasing queues hold back physical compaction of the underlying traces - // which results in unintentionally quadratic processing time (each batch of either - // input must scan all batches from the other input). - - // Perform some amount of outstanding work. - let mut fuel = 1_000_000; - while !todo1.is_empty() && fuel > 0 { - todo1.front_mut().unwrap().work( - output, - |k,v2,v1,t,r2,r1| result(k,v1,v2,t,r1,r2), - &mut fuel - ); - if !todo1.front().unwrap().work_remains() { todo1.pop_front(); } - } + // Advance acknowledged frontiers through any empty regions that we may not receive as batches. + if let Some(trace1) = trace1_option.as_mut() { + trace1.advance_upper(&mut acknowledged1); + } + if let Some(trace2) = trace2_option.as_mut() { + trace2.advance_upper(&mut acknowledged2); + } - // Perform some amount of outstanding work. - let mut fuel = 1_000_000; - while !todo2.is_empty() && fuel > 0 { - todo2.front_mut().unwrap().work( - output, - |k,v1,v2,t,r1,r2| result(k,v1,v2,t,r1,r2), - &mut fuel - ); - if !todo2.front().unwrap().work_remains() { todo2.pop_front(); } - } + // 2. Join computation. + // + // For each of the inputs, we do some amount of work (measured in terms of number + // of output records produced). This is meant to yield control to allow downstream + // operators to consume and reduce the output, but it it also means to provide some + // degree of responsiveness. There is a potential risk here that if we fall behind + // then the increasing queues hold back physical compaction of the underlying traces + // which results in unintentionally quadratic processing time (each batch of either + // input must scan all batches from the other input). + + // Perform some amount of outstanding work. + let mut fuel = 1_000_000; + while !todo1.is_empty() && fuel > 0 { + todo1.front_mut().unwrap().work( + output, + |k,v2,v1,t,r2,r1| result(k,v1,v2,t,r1,r2), + &mut fuel + ); + if !todo1.front().unwrap().work_remains() { todo1.pop_front(); } + } - // Re-activate operator if work remains. - if !todo1.is_empty() || !todo2.is_empty() { - activator.activate(); - } + // Perform some amount of outstanding work. + let mut fuel = 1_000_000; + while !todo2.is_empty() && fuel > 0 { + todo2.front_mut().unwrap().work( + output, + |k,v1,v2,t,r1,r2| result(k,v1,v2,t,r1,r2), + &mut fuel + ); + if !todo2.front().unwrap().work_remains() { todo2.pop_front(); } + } - // 3. Trace maintenance. - // - // Importantly, we use `input.frontier()` here rather than `acknowledged` to track - // the progress of an input, because should we ever drop one of the traces we will - // lose the ability to extract information from anything other than the input. - // For example, if we dropped `trace2` we would not be able to use `advance_upper` - // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical - // compaction of `trace1`. - - // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs. - if let Some(trace1) = trace1_option.as_mut() { - if input2.frontier().is_empty() { trace1_option = None; } - else { - // Allow `trace1` to compact logically up to the frontier we may yet receive, - // in the opposing input (`input2`). All `input2` times will be beyond this - // frontier, and joined times only need to be accurate when advanced to it. - trace1.set_logical_compaction(input2.frontier().frontier()); - // Allow `trace1` to compact physically up to the upper bound of batches we - // have received in its input (`input1`). We will not require a cursor that - // is not beyond this bound. - trace1.set_physical_compaction(acknowledged1.borrow()); - } + // Re-activate operator if work remains. + if !todo1.is_empty() || !todo2.is_empty() { + activator.activate(); + } + + // 3. Trace maintenance. + // + // Importantly, we use `input.frontier()` here rather than `acknowledged` to track + // the progress of an input, because should we ever drop one of the traces we will + // lose the ability to extract information from anything other than the input. + // For example, if we dropped `trace2` we would not be able to use `advance_upper` + // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical + // compaction of `trace1`. + + // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs. + if let Some(trace1) = trace1_option.as_mut() { + if input2.frontier().is_empty() { trace1_option = None; } + else { + // Allow `trace1` to compact logically up to the frontier we may yet receive, + // in the opposing input (`input2`). All `input2` times will be beyond this + // frontier, and joined times only need to be accurate when advanced to it. + trace1.set_logical_compaction(input2.frontier().frontier()); + // Allow `trace1` to compact physically up to the upper bound of batches we + // have received in its input (`input1`). We will not require a cursor that + // is not beyond this bound. + trace1.set_physical_compaction(acknowledged1.borrow()); } + } - // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs. - if let Some(trace2) = trace2_option.as_mut() { - if input1.frontier().is_empty() { trace2_option = None;} - else { - // Allow `trace2` to compact logically up to the frontier we may yet receive, - // in the opposing input (`input1`). All `input1` times will be beyond this - // frontier, and joined times only need to be accurate when advanced to it. - trace2.set_logical_compaction(input1.frontier().frontier()); - // Allow `trace2` to compact physically up to the upper bound of batches we - // have received in its input (`input2`). We will not require a cursor that - // is not beyond this bound. - trace2.set_physical_compaction(acknowledged2.borrow()); - } + // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs. + if let Some(trace2) = trace2_option.as_mut() { + if input1.frontier().is_empty() { trace2_option = None;} + else { + // Allow `trace2` to compact logically up to the frontier we may yet receive, + // in the opposing input (`input1`). All `input1` times will be beyond this + // frontier, and joined times only need to be accurate when advanced to it. + trace2.set_logical_compaction(input1.frontier().frontier()); + // Allow `trace2` to compact physically up to the upper bound of batches we + // have received in its input (`input2`). We will not require a cursor that + // is not beyond this bound. + trace2.set_physical_compaction(acknowledged2.borrow()); } } - }) - .as_collection() - } + } + }) + .as_collection() } + /// Deferred join computation. /// /// The structure wraps cursors which allow us to play out join computation at whatever rate we like. /// This allows us to avoid producing and buffering massive amounts of data, without giving the timely /// dataflow system a chance to run operators that can consume and aggregate the data. -struct Deferred +struct Deferred where - T: Timestamp+Lattice+Ord+Debug, + T: Timestamp+Lattice+Ord, R: Semigroup, - C1: Cursor, - C2: Cursor, - C1::Val: Ord+Clone, - C2::Val: Ord+Clone, + C1: Cursor, + C2: Cursor, + C1::Val: Ord, + C2::Val: Ord, C1::R: Semigroup, C2::R: Semigroup, D: Ord+Clone+Data, { - phant: ::std::marker::PhantomData, trace: C1, trace_storage: S1, batch: C2, @@ -655,22 +681,21 @@ where temp: Vec<((D, T), R)>, } -impl Deferred +impl Deferred where - K: Ord+Debug+Eq, - C1: Cursor, - C2: Cursor, - C1::Val: Ord+Clone+Debug, - C2::Val: Ord+Clone+Debug, + C1::Key: Ord+Eq, + C1: Cursor, + C2: Cursor, + C1::Val: Ord, + C2::Val: Ord, C1::R: Semigroup, C2::R: Semigroup, - T: Timestamp+Lattice+Ord+Debug, + T: Timestamp+Lattice+Ord, R: Semigroup, D: Clone+Data, { fn new(trace: C1, trace_storage: S1, batch: C2, batch_storage: S2, capability: Capability) -> Self { Deferred { - phant: ::std::marker::PhantomData, trace, trace_storage, batch, @@ -688,7 +713,7 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) - where I: IntoIterator, L: FnMut(&K, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I { + where I: IntoIterator, L: FnMut(&C1::Key, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I { let meet = self.capability.time(); @@ -752,14 +777,12 @@ where } } -struct JoinThinker<'a, V1: Ord+Clone+'a, V2: Ord+Clone+'a, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup> { +struct JoinThinker<'a, V1: Ord+'a + ?Sized, V2: Ord+'a + ?Sized, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup> { pub history1: ValueHistory<'a, V1, T, R1>, pub history2: ValueHistory<'a, V2, T, R2>, } -impl<'a, V1: Ord+Clone, V2: Ord+Clone, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup> JoinThinker<'a, V1, V2, T, R1, R2> -where V1: Debug, V2: Debug, T: Debug -{ +impl<'a, V1: Ord + ?Sized, V2: Ord + ?Sized, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup> JoinThinker<'a, V1, V2, T, R1, R2> { fn new() -> Self { JoinThinker { history1: ValueHistory::new(), diff --git a/src/operators/mod.rs b/src/operators/mod.rs index e0040443d..1dd8cb516 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -23,12 +23,12 @@ use lattice::Lattice; use trace::Cursor; /// An accumulation of (value, time, diff) updates. -struct EditList<'a, V: 'a, T, R> { +struct EditList<'a, V: 'a + ?Sized, T, R> { values: Vec<(&'a V, usize)>, edits: Vec<(T, R)>, } -impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup { +impl<'a, V:'a + ?Sized, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup { /// Creates an empty list of edits. #[inline] fn new() -> Self { @@ -39,7 +39,7 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup { } /// Loads the contents of a cursor. fn load(&mut self, cursor: &mut C, storage: &'a S, logic: L) - where V: Clone, C: Cursor, C::Key: Eq, L: Fn(&T)->T { + where C: Cursor, C::Key: Eq, L: Fn(&T)->T { self.clear(); while cursor.val_valid(storage) { cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone())); @@ -80,7 +80,7 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup { } } -struct ValueHistory<'storage, V: 'storage, T, R> { +struct ValueHistory<'storage, V: 'storage + ?Sized, T, R> { edits: EditList<'storage, V, T, R>, history: Vec<(T, T, usize, usize)>, // (time, meet, value_index, edit_offset) @@ -88,7 +88,7 @@ struct ValueHistory<'storage, V: 'storage, T, R> { buffer: Vec<((&'storage V, T), R)>, // where we accumulate / collapse updates. } -impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueHistory<'storage, V, T, R> { +impl<'storage, V: Ord+'storage + ?Sized, T: Lattice+Ord+Clone, R: Semigroup> ValueHistory<'storage, V, T, R> { fn new() -> Self { ValueHistory { edits: EditList::new(), @@ -154,7 +154,7 @@ impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueH struct HistoryReplay<'storage, 'history, V, T, R> where 'storage: 'history, - V: Ord+'storage, + V: Ord+'storage + ?Sized, T: Lattice+Ord+Clone+'history, R: Semigroup+'history, { @@ -164,7 +164,7 @@ where impl<'storage, 'history, V, T, R> HistoryReplay<'storage, 'history, V, T, R> where 'storage: 'history, - V: Ord+'storage, + V: Ord+'storage + ?Sized, T: Lattice+Ord+Clone+'history, R: Semigroup+'history, { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index e53ead081..4fbc04904 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -239,7 +239,7 @@ where } /// Extension trait for the `reduce_core` differential dataflow method. -pub trait ReduceCore where G::Timestamp: Lattice+Ord { +pub trait ReduceCore where G::Timestamp: Lattice+Ord { /// Applies `reduce` to arranged data, and returns an arrangement of output data. /// /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although @@ -276,7 +276,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -299,7 +299,7 @@ pub trait ReduceCore where G::Timestam T2::Val: Data, T2::R: Semigroup, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; } @@ -309,6 +309,7 @@ where G: Scope, G::Timestamp: Lattice+Ord, K: ExchangeData+Hashable, + K: ToOwned, V: ExchangeData, R: ExchangeData+Semigroup, { @@ -318,7 +319,7 @@ where T2::R: Semigroup, T2: Trace+TraceReader+'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -326,319 +327,346 @@ where } } -impl ReduceCore for Arranged +impl ReduceCore for Arranged where + K: ToOwned + Ord + ?Sized, + K::Owned: Data, + V: ToOwned + Ord + ?Sized, G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, { - fn reduce_core(&self, name: &str, mut logic: L) -> Arranged> + fn reduce_core(&self, name: &str, logic: L) -> Arranged> where T2: Trace+TraceReader+'static, - T2::Val: Data, + T2::Val: Ord + ToOwned, + ::Owned: Data, T2::R: Semigroup, T2::Batch: Batch, - T2::Builder: Builder, - L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { + T2::Builder: Builder::Owned), T2::Time, T2::R)>, + L: FnMut(&K, &[(&V, R)], &mut Vec<(::Owned,T2::R)>, &mut Vec<(::Owned, T2::R)>)+'static, + { + reduce_trace(self, name, logic) + } +} - let mut result_trace = None; +fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> +where + G: Scope, + G::Timestamp: Lattice+Ord, + T1: TraceReader + Clone + 'static, + T1::Key: Ord + ToOwned, + ::Owned: Ord, + T1::Val: Ord, + T1::R: Semigroup, + T2: Trace+TraceReader + 'static, + T2::Val: Ord + ToOwned, + ::Owned: Data, + T2::R: Semigroup, + T2::Batch: Batch, + T2::Builder: Builder::Owned, ::Owned), T2::Time, T2::R)>, + L: FnMut(&T1::Key, &[(&T1::Val, T1::R)], &mut Vec<(::Owned,T2::R)>, &mut Vec<(::Owned, T2::R)>)+'static, +{ + let mut result_trace = None; - // fabricate a data-parallel operator using the `unary_notify` pattern. - let stream = { + // fabricate a data-parallel operator using the `unary_notify` pattern. + let stream = { - let result_trace = &mut result_trace; - self.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| { + let result_trace = &mut result_trace; + trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| { - let logger = { - let scope = self.stream.scope(); - let register = scope.log_register(); - register.get::<::logging::DifferentialEvent>("differential/arrange") - }; + let logger = { + let scope = trace.stream.scope(); + let register = scope.log_register(); + register.get::<::logging::DifferentialEvent>("differential/arrange") + }; - let activator = Some(self.stream.scope().activator_for(&operator_info.address[..])); - let mut empty = T2::new(operator_info.clone(), logger.clone(), activator); - // If there is default exert logic set, install it. - if let Some(exert_logic) = self.stream.scope().config().get::("differential/default_exert_logic").cloned() { - empty.set_exert_logic(exert_logic); - } + let activator = Some(trace.stream.scope().activator_for(&operator_info.address[..])); + let mut empty = T2::new(operator_info.clone(), logger.clone(), activator); + // If there is default exert logic set, install it. + if let Some(exert_logic) = trace.stream.scope().config().get::("differential/default_exert_logic").cloned() { + empty.set_exert_logic(exert_logic); + } - let mut source_trace = self.trace.clone(); + let mut source_trace = trace.trace.clone(); - let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); + let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); - // let mut output_trace = TraceRc::make_from(agent).0; - *result_trace = Some(output_reader.clone()); + // let mut output_trace = TraceRc::make_from(agent).0; + *result_trace = Some(output_reader.clone()); - // let mut thinker1 = history_replay_prior::HistoryReplayer::::new(); - // let mut thinker = history_replay::HistoryReplayer::::new(); - let mut new_interesting_times = Vec::::new(); + // let mut thinker1 = history_replay_prior::HistoryReplayer::::new(); + // let mut thinker = history_replay::HistoryReplayer::::new(); + let mut new_interesting_times = Vec::::new(); - // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, - // as well as capabilities for these times (or their lower envelope, at least). - let mut interesting = Vec::<(K, G::Timestamp)>::new(); - let mut capabilities = Vec::>::new(); + // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, + // as well as capabilities for these times (or their lower envelope, at least). + let mut interesting = Vec::<(::Owned, G::Timestamp)>::new(); + let mut capabilities = Vec::>::new(); - // buffers and logic for computing per-key interesting times "efficiently". - let mut interesting_times = Vec::::new(); + // buffers and logic for computing per-key interesting times "efficiently". + let mut interesting_times = Vec::::new(); - // Upper and lower frontiers for the pending input and output batches to process. - let mut upper_limit = Antichain::from_elem(::minimum()); - let mut lower_limit = Antichain::from_elem(::minimum()); + // Upper and lower frontiers for the pending input and output batches to process. + let mut upper_limit = Antichain::from_elem(::minimum()); + let mut lower_limit = Antichain::from_elem(::minimum()); - // Output batches may need to be built piecemeal, and these temp storage help there. - let mut output_upper = Antichain::from_elem(::minimum()); - let mut output_lower = Antichain::from_elem(::minimum()); + // Output batches may need to be built piecemeal, and these temp storage help there. + let mut output_upper = Antichain::from_elem(::minimum()); + let mut output_lower = Antichain::from_elem(::minimum()); - let mut input_buffer = Vec::new(); + let mut input_buffer = Vec::new(); - let id = self.stream.scope().index(); + let id = trace.stream.scope().index(); - move |input, output| { + move |input, output| { - // The `reduce` operator receives fully formed batches, which each serve as an indication - // that the frontier has advanced to the upper bound of their description. - // - // Although we could act on each individually, several may have been sent, and it makes - // sense to accumulate them first to coordinate their re-evaluation. We will need to pay - // attention to which times need to be collected under which capability, so that we can - // assemble output batches correctly. We will maintain several builders concurrently, and - // place output updates into the appropriate builder. - // - // It turns out we must use notificators, as we cannot await empty batches from arrange to - // indicate progress, as the arrange may not hold the capability to send such. Instead, we - // must watch for progress here (and the upper bound of received batches) to tell us how - // far we can process work. - // - // We really want to retire all batches we receive, so we want a frontier which reflects - // both information from batches as well as progress information. I think this means that - // we keep times that are greater than or equal to a time in the other frontier, deduplicated. - - let mut batch_cursors = Vec::new(); - let mut batch_storage = Vec::new(); - - // Downgrate previous upper limit to be current lower limit. - lower_limit.clear(); - lower_limit.extend(upper_limit.borrow().iter().cloned()); - - // Drain the input stream of batches, validating the contiguity of the batch descriptions and - // capturing a cursor for each of the batches as well as ensuring we hold a capability for the - // times in the batch. - input.for_each(|capability, batches| { - - batches.swap(&mut input_buffer); - for batch in input_buffer.drain(..) { - upper_limit.clone_from(batch.upper()); - batch_cursors.push(batch.cursor()); - batch_storage.push(batch); - } + // The `reduce` operator receives fully formed batches, which each serve as an indication + // that the frontier has advanced to the upper bound of their description. + // + // Although we could act on each individually, several may have been sent, and it makes + // sense to accumulate them first to coordinate their re-evaluation. We will need to pay + // attention to which times need to be collected under which capability, so that we can + // assemble output batches correctly. We will maintain several builders concurrently, and + // place output updates into the appropriate builder. + // + // It turns out we must use notificators, as we cannot await empty batches from arrange to + // indicate progress, as the arrange may not hold the capability to send such. Instead, we + // must watch for progress here (and the upper bound of received batches) to tell us how + // far we can process work. + // + // We really want to retire all batches we receive, so we want a frontier which reflects + // both information from batches as well as progress information. I think this means that + // we keep times that are greater than or equal to a time in the other frontier, deduplicated. + + let mut batch_cursors = Vec::new(); + let mut batch_storage = Vec::new(); + + // Downgrate previous upper limit to be current lower limit. + lower_limit.clear(); + lower_limit.extend(upper_limit.borrow().iter().cloned()); + + // Drain the input stream of batches, validating the contiguity of the batch descriptions and + // capturing a cursor for each of the batches as well as ensuring we hold a capability for the + // times in the batch. + input.for_each(|capability, batches| { + + batches.swap(&mut input_buffer); + for batch in input_buffer.drain(..) { + upper_limit.clone_from(batch.upper()); + batch_cursors.push(batch.cursor()); + batch_storage.push(batch); + } + + // Ensure that `capabilities` covers the capability of the batch. + capabilities.retain(|cap| !capability.time().less_than(&cap.time())); + if !capabilities.iter().any(|cap| cap.time().less_equal(&capability.time())) { + capabilities.push(capability.retain()); + } + }); + + // Pull in any subsequent empty batches we believe to exist. + source_trace.advance_upper(&mut upper_limit); - // Ensure that `capabilities` covers the capability of the batch. - capabilities.retain(|cap| !capability.time().less_than(&cap.time())); - if !capabilities.iter().any(|cap| cap.time().less_equal(&capability.time())) { - capabilities.push(capability.retain()); + // Only if our upper limit has advanced should we do work. + if upper_limit != lower_limit { + + // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send + // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches + // to indicate forward progress, and must hope that downstream operators look at progress frontiers + // as well as batch descriptions. + // + // We can (and should) advance source and output traces if `upper_limit` indicates this is possible. + if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) { + + // `interesting` contains "warnings" about keys and times that may need to be re-considered. + // We first extract those times from this list that lie in the interval we will process. + sort_dedup(&mut interesting); + // `exposed` contains interesting (key, time)s now below `upper_limit` + let exposed = { + let (exposed, new_interesting) = interesting.drain(..).partition(|&(_, ref time)| !upper_limit.less_equal(time)); + interesting = new_interesting; + exposed + }; + + // Prepare an output buffer and builder for each capability. + // + // We buffer and build separately, as outputs are produced grouped by time, whereas the + // builder wants to see outputs grouped by value. While the per-key computation could + // do the re-sorting itself, buffering per-key outputs lets us double check the results + // against other implementations for accuracy. + // + // TODO: It would be better if all updates went into one batch, but timely dataflow prevents + // this as long as it requires that there is only one capability for each message. + let mut buffers = Vec::<(G::Timestamp, Vec<(::Owned, G::Timestamp, T2::R)>)>::new(); + let mut builders = Vec::new(); + for i in 0 .. capabilities.len() { + buffers.push((capabilities[i].time().clone(), Vec::new())); + builders.push(T2::Builder::new()); } - }); - // Pull in any subsequent empty batches we believe to exist. - source_trace.advance_upper(&mut upper_limit); + // cursors for navigating input and output traces. + let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); + let source_storage = &source_storage; + let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); + let output_storage = &output_storage; + let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); + let batch_storage = &batch_storage; - // Only if our upper limit has advanced should we do work. - if upper_limit != lower_limit { + let mut thinker = history_replay::HistoryReplayer::::new(); - // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send - // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches - // to indicate forward progress, and must hope that downstream operators look at progress frontiers - // as well as batch descriptions. + // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`. // - // We can (and should) advance source and output traces if `upper_limit` indicates this is possible. - if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) { - - // `interesting` contains "warnings" about keys and times that may need to be re-considered. - // We first extract those times from this list that lie in the interval we will process. - sort_dedup(&mut interesting); - // `exposed` contains interesting (key, time)s now below `upper_limit` - let exposed = { - let (exposed, new_interesting) = interesting.drain(..).partition(|&(_, ref time)| !upper_limit.less_equal(time)); - interesting = new_interesting; - exposed + // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length + // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`. + // There could perhaps be a less provocative variable name. + let mut exposed_position = 0; + while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() { + + use std::borrow::Borrow; + + // Determine the next key we will work on; could be synthetic, could be from a batch. + let key1 = exposed.get(exposed_position).map(|x| &x.0); + let key2 = batch_cursor.get_key(&batch_storage); + let key = match (key1, key2) { + (Some(key1), Some(key2)) => ::std::cmp::min(key1.borrow(), key2), + (Some(key1), None) => key1.borrow(), + (None, Some(key2)) => key2, + (None, None) => unreachable!(), }; - // Prepare an output buffer and builder for each capability. - // - // We buffer and build separately, as outputs are produced grouped by time, whereas the - // builder wants to see outputs grouped by value. While the per-key computation could - // do the re-sorting itself, buffering per-key outputs lets us double check the results - // against other implementations for accuracy. - // - // TODO: It would be better if all updates went into one batch, but timely dataflow prevents - // this as long as it requires that there is only one capability for each message. - let mut buffers = Vec::<(G::Timestamp, Vec<(T2::Val, G::Timestamp, T2::R)>)>::new(); - let mut builders = Vec::new(); - for i in 0 .. capabilities.len() { - buffers.push((capabilities[i].time().clone(), Vec::new())); - builders.push(T2::Builder::new()); - } + // `interesting_times` contains those times between `lower_issued` and `upper_limit` + // that we need to re-consider. We now populate it, but perhaps this should be left + // to the per-key computation, which may be able to avoid examining the times of some + // values (for example, in the case of min/max/topk). + interesting_times.clear(); - // cursors for navigating input and output traces. - let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); - let source_storage = &source_storage; - let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); - let output_storage = &output_storage; - let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); - let batch_storage = &batch_storage; - - let mut thinker = history_replay::HistoryReplayer::::new(); - - // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`. - // - // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length - // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`. - // There could perhaps be a less provocative variable name. - let mut exposed_position = 0; - while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() { - - // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed.get(exposed_position).map(|x| &x.0); - let key2 = batch_cursor.get_key(&batch_storage); - let key = match (key1, key2) { - (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), - (Some(key1), None) => key1, - (None, Some(key2)) => key2, - (None, None) => unreachable!(), - }; - - // `interesting_times` contains those times between `lower_issued` and `upper_limit` - // that we need to re-consider. We now populate it, but perhaps this should be left - // to the per-key computation, which may be able to avoid examining the times of some - // values (for example, in the case of min/max/topk). - interesting_times.clear(); - - // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed.get(exposed_position).map(|x| &x.0) == Some(key) { - interesting_times.push(exposed[exposed_position].1.clone()); - exposed_position += 1; - } + // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. + while exposed.get(exposed_position).map(|x| x.0.borrow()) == Some(key) { + interesting_times.push(exposed[exposed_position].1.clone()); + exposed_position += 1; + } - // tidy up times, removing redundancy. - sort_dedup(&mut interesting_times); - - // do the per-key computation. - let _counters = thinker.compute( - key, - (&mut source_cursor, source_storage), - (&mut output_cursor, output_storage), - (&mut batch_cursor, batch_storage), - &mut interesting_times, - &mut logic, - &upper_limit, - &mut buffers[..], - &mut new_interesting_times, - ); - - if batch_cursor.get_key(batch_storage) == Some(key) { - batch_cursor.step_key(batch_storage); - } + // tidy up times, removing redundancy. + sort_dedup(&mut interesting_times); + + // do the per-key computation. + let _counters = thinker.compute( + key, + (&mut source_cursor, source_storage), + (&mut output_cursor, output_storage), + (&mut batch_cursor, batch_storage), + &mut interesting_times, + &mut logic, + &upper_limit, + &mut buffers[..], + &mut new_interesting_times, + ); + + if batch_cursor.get_key(batch_storage) == Some(key) { + batch_cursor.step_key(batch_storage); + } - // Record future warnings about interesting times (and assert they should be "future"). - for time in new_interesting_times.drain(..) { - debug_assert!(upper_limit.less_equal(&time)); - interesting.push((key.clone(), time)); - } + // Record future warnings about interesting times (and assert they should be "future"). + for time in new_interesting_times.drain(..) { + debug_assert!(upper_limit.less_equal(&time)); + interesting.push((key.to_owned(), time)); + } - // Sort each buffer by value and move into the corresponding builder. - // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`, - // (ii) that the buffers are time-ordered, and (iii) that the builders accept - // arbitrarily ordered times. - for index in 0 .. buffers.len() { - buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); - for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push(((key.clone(), val), time, diff)); - } + // Sort each buffer by value and move into the corresponding builder. + // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`, + // (ii) that the buffers are time-ordered, and (iii) that the builders accept + // arbitrarily ordered times. + for index in 0 .. buffers.len() { + buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); + for (val, time, diff) in buffers[index].1.drain(..) { + builders[index].push(((key.to_owned(), val), time, diff)); } } + } - // We start sealing output batches from the lower limit (previous upper limit). - // In principle, we could update `lower_limit` itself, and it should arrive at - // `upper_limit` by the end of the process. - output_lower.clear(); - output_lower.extend(lower_limit.borrow().iter().cloned()); - - // build and ship each batch (because only one capability per message). - for (index, builder) in builders.drain(..).enumerate() { - - // Form the upper limit of the next batch, which includes all times greater - // than the input batch, or the capabilities from i + 1 onward. - output_upper.clear(); - output_upper.extend(upper_limit.borrow().iter().cloned()); - for capability in &capabilities[index + 1 ..] { - output_upper.insert(capability.time().clone()); - } + // We start sealing output batches from the lower limit (previous upper limit). + // In principle, we could update `lower_limit` itself, and it should arrive at + // `upper_limit` by the end of the process. + output_lower.clear(); + output_lower.extend(lower_limit.borrow().iter().cloned()); + + // build and ship each batch (because only one capability per message). + for (index, builder) in builders.drain(..).enumerate() { + + // Form the upper limit of the next batch, which includes all times greater + // than the input batch, or the capabilities from i + 1 onward. + output_upper.clear(); + output_upper.extend(upper_limit.borrow().iter().cloned()); + for capability in &capabilities[index + 1 ..] { + output_upper.insert(capability.time().clone()); + } - if output_upper.borrow() != output_lower.borrow() { + if output_upper.borrow() != output_lower.borrow() { - let batch = builder.done(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); + let batch = builder.done(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); - // ship batch to the output, and commit to the output trace. - output.session(&capabilities[index]).give(batch.clone()); - output_writer.insert(batch, Some(capabilities[index].time().clone())); + // ship batch to the output, and commit to the output trace. + output.session(&capabilities[index]).give(batch.clone()); + output_writer.insert(batch, Some(capabilities[index].time().clone())); - output_lower.clear(); - output_lower.extend(output_upper.borrow().iter().cloned()); - } + output_lower.clear(); + output_lower.extend(output_upper.borrow().iter().cloned()); } + } - // This should be true, as the final iteration introduces no capabilities, and - // uses exactly `upper_limit` to determine the upper bound. Good to check though. - assert!(output_upper.borrow() == upper_limit.borrow()); + // This should be true, as the final iteration introduces no capabilities, and + // uses exactly `upper_limit` to determine the upper bound. Good to check though. + assert!(output_upper.borrow() == upper_limit.borrow()); - // Determine the frontier of our interesting times. - let mut frontier = Antichain::::new(); - for &(_, ref time) in &interesting { - frontier.insert(time.clone()); - } + // Determine the frontier of our interesting times. + let mut frontier = Antichain::::new(); + for &(_, ref time) in &interesting { + frontier.insert(time.clone()); + } - // Update `capabilities` to reflect interesting pairs described by `frontier`. - let mut new_capabilities = Vec::new(); - for time in frontier.borrow().iter() { - if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) { - new_capabilities.push(cap.delayed(time)); - } - else { - println!("{}:\tfailed to find capability less than new frontier time:", id); - println!("{}:\t time: {:?}", id, time); - println!("{}:\t caps: {:?}", id, capabilities); - println!("{}:\t uppr: {:?}", id, upper_limit); - } + // Update `capabilities` to reflect interesting pairs described by `frontier`. + let mut new_capabilities = Vec::new(); + for time in frontier.borrow().iter() { + if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) { + new_capabilities.push(cap.delayed(time)); + } + else { + println!("{}:\tfailed to find capability less than new frontier time:", id); + println!("{}:\t time: {:?}", id, time); + println!("{}:\t caps: {:?}", id, capabilities); + println!("{}:\t uppr: {:?}", id, upper_limit); } - capabilities = new_capabilities; - - // ensure that observed progres is reflected in the output. - output_writer.seal(upper_limit.clone()); - } - else { - output_writer.seal(upper_limit.clone()); } + capabilities = new_capabilities; - // We only anticipate future times in advance of `upper_limit`. - source_trace.set_logical_compaction(upper_limit.borrow()); - output_reader.set_logical_compaction(upper_limit.borrow()); - - // We will only slice the data between future batches. - source_trace.set_physical_compaction(upper_limit.borrow()); - output_reader.set_physical_compaction(upper_limit.borrow()); + // ensure that observed progres is reflected in the output. + output_writer.seal(upper_limit.clone()); } + else { + output_writer.seal(upper_limit.clone()); + } + + // We only anticipate future times in advance of `upper_limit`. + source_trace.set_logical_compaction(upper_limit.borrow()); + output_reader.set_logical_compaction(upper_limit.borrow()); - // Exert trace maintenance if we have been so requested. - output_writer.exert(); + // We will only slice the data between future batches. + source_trace.set_physical_compaction(upper_limit.borrow()); + output_reader.set_physical_compaction(upper_limit.borrow()); } + + // Exert trace maintenance if we have been so requested. + output_writer.exert(); } - ) - }; + } + ) + }; - Arranged { stream: stream, trace: result_trace.unwrap() } - } + Arranged { stream: stream, trace: result_trace.unwrap() } } + #[inline(never)] fn sort_dedup(list: &mut Vec) { list.dedup(); @@ -648,8 +676,9 @@ fn sort_dedup(list: &mut Vec) { trait PerKeyCompute<'a, V1, V2, T, R1, R2> where - V1: Ord+Clone+'a, - V2: Ord+Clone+'a, + V1: Ord + ?Sized, + V2: Ord + ToOwned+'a + ?Sized, + V2::Owned: Ord + Clone + 'a, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup, @@ -664,14 +693,14 @@ where times: &mut Vec, logic: &mut L, upper_limit: &Antichain, - outputs: &mut [(T, Vec<(V2, T, R2)>)], + outputs: &mut [(T, Vec<(V2::Owned, T, R2)>)], new_interesting: &mut Vec) -> (usize, usize) where - K: Eq+Clone, + K: Eq + ?Sized, C1: Cursor, C2: Cursor, C3: Cursor, - L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>); + L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2::Owned, R2)>, &mut Vec<(V2::Owned, R2)>); } @@ -690,8 +719,9 @@ mod history_replay { /// time order, maintaining consolidated representations of updates with respect to future interesting times. pub struct HistoryReplayer<'a, V1, V2, T, R1, R2> where - V1: Ord+Clone+'a, - V2: Ord+Clone+'a, + V1: Ord+'a + ?Sized, + V2: Ord+'a + ToOwned + ?Sized, + V2::Owned: Ord + 'a, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup, @@ -700,9 +730,9 @@ mod history_replay { input_history: ValueHistory<'a, V1, T, R1>, output_history: ValueHistory<'a, V2, T, R2>, input_buffer: Vec<(&'a V1, R1)>, - output_buffer: Vec<(V2, R2)>, - update_buffer: Vec<(V2, R2)>, - output_produced: Vec<((V2, T), R2)>, + output_buffer: Vec<(V2::Owned, R2)>, + update_buffer: Vec<(V2::Owned, R2)>, + output_produced: Vec<((V2::Owned, T), R2)>, synth_times: Vec, meets: Vec, times_current: Vec, @@ -711,8 +741,9 @@ mod history_replay { impl<'a, V1, V2, T, R1, R2> PerKeyCompute<'a, V1, V2, T, R1, R2> for HistoryReplayer<'a, V1, V2, T, R1, R2> where - V1: Ord+Clone, - V2: Ord+Clone, + V1: Ord + ?Sized, + V2: Ord + ToOwned + ?Sized, + V2::Owned: Ord + Clone + 'a, T: Lattice+Ord+Clone, R1: Semigroup, R2: Semigroup, @@ -742,14 +773,14 @@ mod history_replay { times: &mut Vec, logic: &mut L, upper_limit: &Antichain, - outputs: &mut [(T, Vec<(V2, T, R2)>)], + outputs: &mut [(T, Vec<(V2::Owned, T, R2)>)], new_interesting: &mut Vec) -> (usize, usize) where - K: Eq+Clone, + K: Eq + ?Sized, C1: Cursor, C2: Cursor, C3: Cursor, - L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>) + L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2::Owned, R2)>, &mut Vec<(V2::Owned, R2)>) { // The work we need to perform is at times defined principally by the contents of `batch_cursor` @@ -911,7 +942,7 @@ mod history_replay { meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet)); for &((ref value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { - self.output_buffer.push(((*value).clone(), diff.clone())); + self.output_buffer.push(((*value).to_owned(), diff.clone())); } else { self.temporary.push(next_time.join(time)); @@ -919,7 +950,7 @@ mod history_replay { } for &((ref value, ref time), ref diff) in self.output_produced.iter() { if time.less_equal(&next_time) { - self.output_buffer.push(((*value).clone(), diff.clone())); + self.output_buffer.push(((*value).to_owned(), diff.clone())); } else { self.temporary.push(next_time.join(&time)); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 6ffd1059d..2bf547925 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -12,14 +12,14 @@ use trace::implementations::Update; /// Creates batches from unordered tuples. pub struct MergeBatcher { - sorter: MergeSorter<(U::Key, U::Val), U::Time, U::Diff>, + sorter: MergeSorter<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>, lower: Antichain, frontier: Antichain, phantom: ::std::marker::PhantomData, } impl Batcher for MergeBatcher { - type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff); type Time = U::Time; fn new() -> Self { @@ -126,7 +126,7 @@ impl Batcher for MergeBatcher { let mut buffer = Vec::new(); self.sorter.push(&mut buffer); // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((U::Key,U::Val),U::Time,U::Diff)>() > 0 { + while buffer.capacity() > 0 && std::mem::size_of::<((U::KeyOwned,U::ValOwned),U::Time,U::Diff)>() > 0 { buffer = Vec::new(); self.sorter.push(&mut buffer); } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 5d4fad18a..4780f8485 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -14,12 +14,12 @@ use trace::implementations::Update; /// Creates batches from unordered tuples. pub struct ColumnatedMergeBatcher where - U::Key: Columnation, - U::Val: Columnation, + U::KeyOwned: Columnation, + U::ValOwned: Columnation, U::Time: Columnation, U::Diff: Columnation, { - sorter: MergeSorterColumnation<(U::Key, U::Val), U::Time, U::Diff>, + sorter: MergeSorterColumnation<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>, lower: Antichain, frontier: Antichain, phantom: PhantomData, @@ -27,12 +27,12 @@ where impl Batcher for ColumnatedMergeBatcher where - U::Key: Columnation + 'static, - U::Val: Columnation + 'static, + U::KeyOwned: Columnation + 'static, + U::ValOwned: Columnation + 'static, U::Time: Columnation + 'static, U::Diff: Columnation + 'static, { - type Item = ((U::Key,U::Val),U::Time,U::Diff); + type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff); type Time = U::Time; fn new() -> Self { @@ -106,7 +106,7 @@ where if upper.less_equal(time) { self.frontier.insert(time.clone()); if keep.is_empty() { - if keep.capacity() != MergeSorterColumnation::<(U::Key, U::Val), U::Time, U::Diff>::buffer_size() { + if keep.capacity() != MergeSorterColumnation::<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>::buffer_size() { keep = self.sorter.empty(); } } else if keep.len() == keep.capacity() { diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index bfcc0e37c..1f3923bc1 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -55,6 +55,7 @@ pub use self::ord::OrdKeySpine as KeySpine; use std::ops::{Add, Sub}; use std::convert::{TryInto, TryFrom}; +use std::borrow::{Borrow, ToOwned}; use timely::container::columnation::{Columnation, TimelyStack}; use lattice::Lattice; @@ -62,10 +63,14 @@ use difference::Semigroup; /// A type that names constituent update types. pub trait Update { + /// We will be able to read out references to this type, and must supply `Key::Owned` as input. + type Key: Ord + ToOwned + ?Sized; /// Key by which data are grouped. - type Key: Ord+Clone; + type KeyOwned: Ord+Clone + Borrow; /// Values associated with the key. - type Val: Ord+Clone; + type Val: Ord + ToOwned + ?Sized; + /// Values associated with the key, in owned form + type ValOwned: Ord+Clone + Borrow; /// Time at which updates occur. type Time: Ord+Lattice+timely::progress::Timestamp+Clone; /// Way in which updates occur. @@ -80,7 +85,9 @@ where R: Semigroup+Clone, { type Key = K; + type KeyOwned = K; type Val = V; + type ValOwned = V; type Time = T; type Diff = R; } @@ -88,7 +95,7 @@ where /// A type with opinions on how updates should be laid out. pub trait Layout { /// The represented update. - type Target: Update; + type Target: Update + ?Sized; /// Offsets to use from keys into vals. type KeyOffset: OrdOffset; /// Offsets to use from vals into updates. @@ -111,7 +118,11 @@ pub struct Vector { phantom: std::marker::PhantomData<(U, O)>, } -impl Layout for Vector { +impl Layout for Vector +where + U::Key: ToOwned + Sized + Clone, + U::Val: ToOwned + Sized + Clone, +{ type Target = U; type KeyOffset = O; type ValOffset = O; @@ -127,8 +138,8 @@ pub struct TStack { impl Layout for TStack where - U::Key: Columnation, - U::Val: Columnation, + U::Key: Columnation + ToOwned, + U::Val: Columnation + ToOwned, U::Time: Columnation, U::Diff: Columnation, { @@ -140,8 +151,66 @@ where type UpdContainer = TimelyStack<(U::Time, U::Diff)>; } +/// A type with a preferred container. +/// +/// Examples include types that implement `Clone` who prefer +pub trait PreferredContainer : ToOwned { + /// The preferred container for the type. + type Container: BatchContainer + RetainFrom; +} + +impl PreferredContainer for T { + type Container = Vec; +} + +impl PreferredContainer for [T] { + type Container = SliceContainer; +} + +/// An update and layout description based on preferred containers. +pub struct Preferred { + phantom: std::marker::PhantomData<(Box, Box, T, D, O)>, +} + +impl Update for Preferred +where + K: Ord+ToOwned + ?Sized, + K::Owned: Ord+Clone, + V: Ord+ToOwned + ?Sized, + V::Owned: Ord+Clone, + T: Ord+Lattice+timely::progress::Timestamp+Clone, + R: Semigroup+Clone, + O: OrdOffset, +{ + type Key = K; + type KeyOwned = K::Owned; + type Val = V; + type ValOwned = V::Owned; + type Time = T; + type Diff = R; +} + +impl Layout for Preferred +where + K: Ord+ToOwned+PreferredContainer + ?Sized, + K::Owned: Ord+Clone, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Ord+Clone, + T: Ord+Lattice+timely::progress::Timestamp+Clone, + D: Semigroup+Clone, + O: OrdOffset, +{ + type Target = Preferred; + type KeyOffset = O; + type ValOffset = O; + type KeyContainer = K::Container; + type ValContainer = V::Container; + type UpdContainer = Vec<(T, D)>; +} + + /// A container that can retain/discard from some offset onward. -pub trait RetainFrom { +pub trait RetainFrom { /// Retains elements from an index onwards that satisfy a predicate. fn retain_frombool>(&mut self, index: usize, predicate: P); } @@ -350,12 +419,12 @@ pub mod containers { /// /// The length will be one greater than the number of contained slices, /// starting with zero and ending with `self.inner.len()`. - pub offsets: Vec, + offsets: Vec, /// An inner container for sequences of `B` that dereferences to a slice. - pub inner: Vec, + inner: Vec, } - impl BatchContainer for SliceContainer + impl BatchContainer for SliceContainer where B: Clone + Sized, [B]: ToOwned>, @@ -384,16 +453,20 @@ pub mod containers { } } fn with_capacity(size: usize) -> Self { + let mut offsets = Vec::with_capacity(size + 1); + offsets.push(0); Self { - offsets: Vec::with_capacity(size), + offsets, inner: Vec::with_capacity(size), } } fn reserve(&mut self, _additional: usize) { } fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1); + offsets.push(0); Self { - offsets: Vec::with_capacity(cont1.offsets.len() + cont2.offsets.len()), + offsets, inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()), } } @@ -408,7 +481,7 @@ pub mod containers { } /// Default implementation introduces a first offset. - impl Default for SliceContainer { + impl Default for SliceContainer { fn default() -> Self { Self { offsets: vec![0], @@ -416,4 +489,13 @@ pub mod containers { } } } + + use trace::implementations::RetainFrom; + /// A container that can retain/discard from some offset onward. + impl RetainFrom<[B]> for SliceContainer { + /// Retains elements from an index onwards that satisfy a predicate. + fn retain_frombool>(&mut self, _index: usize, _predicate: P) { + unimplemented!() + } + } } diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index b5607484b..ae99c893f 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -85,7 +85,11 @@ pub type ColKeySpine = Spine< /// The `L` parameter captures the updates should be laid out, and `C` determines which /// merge batcher to select. #[derive(Abomonation)] -pub struct OrdValBatch { +pub struct OrdValBatch +where + ::Key: Sized, + ::Val: Sized, +{ /// Where all the dataz is. pub layer: KVTDLayer, /// Description of the update times this layer represents. @@ -102,7 +106,11 @@ type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBu type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyOffset, ::KeyContainer>; type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyOffset, ::KeyContainer>; -impl BatchReader for OrdValBatch { +impl BatchReader for OrdValBatch +where + ::Key: Sized + Clone, + ::Val: Sized + Clone, +{ type Key = ::Key; type Val = ::Val; type Time = ::Time; @@ -115,6 +123,9 @@ impl BatchReader for OrdValBatch { } impl Batch for OrdValBatch +where + ::Key: Sized + Clone, + ::Val: Sized + Clone, { type Merger = OrdValMerger; @@ -124,7 +135,11 @@ impl Batch for OrdValBatch } -impl OrdValBatch { +impl OrdValBatch +where + ::Key: Sized, + ::Val: Sized, +{ fn advance_builder_from(layer: &mut KVTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; @@ -214,7 +229,11 @@ impl OrdValBatch { } /// State for an in-progress merge. -pub struct OrdValMerger { +pub struct OrdValMerger +where + ::Key: Sized + Clone, + ::Val: Sized + Clone, +{ // first batch, and position therein. lower1: usize, upper1: usize, @@ -228,6 +247,8 @@ pub struct OrdValMerger { impl Merger> for OrdValMerger where + ::Key: Sized + Clone, + ::Val: Sized + Clone, OrdValBatch: Batch::Time> { fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef< as BatchReader>::Time>) -> Self { @@ -307,12 +328,20 @@ where } /// A cursor for navigating a single layer. -pub struct OrdValCursor { +pub struct OrdValCursor +where + ::Key: Sized + Clone, + ::Val: Sized + Clone, +{ phantom: std::marker::PhantomData, cursor: OrderedCursor>, } -impl Cursor> for OrdValCursor { +impl Cursor> for OrdValCursor +where + ::Key: Sized + Clone, + ::Val: Sized + Clone, +{ type Key = ::Key; type Val = ::Val; type Time = ::Time; @@ -340,13 +369,19 @@ impl Cursor> for OrdValCursor { } /// A builder for creating layers from unsorted update tuples. -pub struct OrdValBuilder { +pub struct OrdValBuilder +where + ::Key: Sized, + ::Val: Sized, +{ builder: KVTDBuilder, } impl Builder for OrdValBuilder where + ::Key: Sized + Clone, + ::Val: Sized + Clone, OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { type Item = ((::Key, ::Val), ::Time, ::Diff); @@ -382,14 +417,20 @@ where /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Abomonation)] -pub struct OrdKeyBatch { +pub struct OrdKeyBatch +where + ::Key: Sized, +{ /// Where all the dataz is. pub layer: KTDLayer, /// Description of the update times this layer represents. pub desc: Description<::Time>, } -impl BatchReader for OrdKeyBatch { +impl BatchReader for OrdKeyBatch +where + ::Key: Sized + Clone, +{ type Key = ::Key; type Val = (); type Time = ::Time; @@ -406,7 +447,11 @@ impl BatchReader for OrdKeyBatch { fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdKeyBatch where L::Target: Update { +impl Batch for OrdKeyBatch +where + ::Key: Sized + Clone, + L::Target: Update, +{ type Merger = OrdKeyMerger; fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { @@ -414,7 +459,10 @@ impl Batch for OrdKeyBatch where L::Target: Update { } } -impl OrdKeyBatch { +impl OrdKeyBatch +where + ::Key: Sized + Clone, +{ fn advance_builder_from(layer: &mut KTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; @@ -478,7 +526,10 @@ impl OrdKeyBatch { } /// State for an in-progress merge. -pub struct OrdKeyMerger { +pub struct OrdKeyMerger +where + ::Key: Sized + Clone, +{ // first batch, and position therein. lower1: usize, upper1: usize, @@ -492,7 +543,8 @@ pub struct OrdKeyMerger { impl Merger> for OrdKeyMerger where - OrdKeyBatch: Batch::Time> + ::Key: Sized + Clone, + OrdKeyBatch: Batch::Time>, { fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -584,7 +636,10 @@ pub struct OrdKeyCursor { cursor: OrderedCursor::Time, ::Diff>>, } -impl Cursor> for OrdKeyCursor { +impl Cursor> for OrdKeyCursor +where + ::Key: Sized, +{ type Key = ::Key; type Val = (); type Time = ::Time; @@ -611,12 +666,16 @@ impl Cursor> for OrdKeyCursor { /// A builder for creating layers from unsorted update tuples. -pub struct OrdKeyBuilder { +pub struct OrdKeyBuilder +where + ::Key: Sized + Clone, +{ builder: KTDBuilder, } impl Builder for OrdKeyBuilder where + ::Key: Sized + Clone, OrdKeyBatch: Batch::Key, Val=(), Time=::Time, R=::Diff> { type Item = ((::Key, ()), ::Time, ::Diff); diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index ffbec0792..d34682338 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -15,7 +15,7 @@ use trace::implementations::merge_batcher::MergeBatcher; use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; use trace::rc_blanket_impls::RcBuilder; -use super::{Update, Layout, Vector, TStack}; +use super::{Update, Layout, Vector, TStack, Preferred}; use self::val_batch::{OrdValBatch, OrdValBuilder}; @@ -34,11 +34,21 @@ pub type ColValSpine = Spine< ColumnatedMergeBatcher<((K,V),T,R)>, RcBuilder>>, >; + +/// A trace implementation backed by columnar storage. +pub type PreferredSpine = Spine< + Rc>>, + ColumnatedMergeBatcher>, + RcBuilder>>, +>; + + // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; mod val_batch { + use std::borrow::Borrow; use std::convert::TryInto; use std::marker::PhantomData; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -503,9 +513,10 @@ mod val_batch { impl Builder for OrdValBuilder where - OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> + OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff>, + ::KeyOwned: Borrow<::Key>, { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Item = ((::KeyOwned, ::ValOwned), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -528,9 +539,9 @@ mod val_batch { fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(&key) { + if self.result.keys.last() == Some(key.borrow()) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(&val) { + if self.result.vals.last() == Some(val.borrow()) { self.push_update(time, diff); } else { // New value; complete representation of prior value. @@ -554,9 +565,9 @@ mod val_batch { fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key) { + if self.result.keys.last() == Some(key.borrow()) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val) { + if self.result.vals.last() == Some(val.borrow()) { // TODO: here we could look for repetition, and not push the update in that case. // More logic (and state) would be required to correctly wrangle this. self.push_update(time.clone(), diff.clone()); @@ -566,7 +577,7 @@ mod val_batch { // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val); + self.result.vals.copy(val.borrow()); } } else { // New key; complete representation of prior key. @@ -575,8 +586,8 @@ mod val_batch { if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val); - self.result.keys.copy(key); + self.result.vals.copy(val.borrow()); + self.result.keys.copy(key.borrow()); } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index de8f16814..542e6651f 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -71,6 +71,7 @@ impl Hashable for HashWrapper { mod val_batch { + use std::borrow::Borrow; use std::convert::TryInto; use std::marker::PhantomData; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -97,7 +98,8 @@ mod val_batch { #[derive(Abomonation, Debug)] pub struct RhhValStorage where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// The requested capacity for `keys`. We use this when determining where a key with a certain hash @@ -131,7 +133,8 @@ mod val_batch { impl RhhValStorage where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { @@ -226,7 +229,8 @@ mod val_batch { #[derive(Abomonation)] pub struct RhhValBatch where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// The updates themselves. pub storage: RhhValStorage, @@ -242,7 +246,8 @@ mod val_batch { impl BatchReader for RhhValBatch where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { type Key = ::Key; type Val = ::Val; @@ -269,7 +274,8 @@ mod val_batch { impl Batch for RhhValBatch where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { type Merger = RhhValMerger; @@ -281,7 +287,8 @@ mod val_batch { /// State for an in-progress merge. pub struct RhhValMerger where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Key position to merge next in the first batch. key_cursor1: usize, @@ -303,8 +310,9 @@ mod val_batch { impl Merger> for RhhValMerger where - ::Key: Default + HashOrdered, - RhhValBatch: Batch::Time> + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, + RhhValBatch: Batch::Time>, { fn new(batch1: &RhhValBatch, batch2: &RhhValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -393,7 +401,8 @@ mod val_batch { // Helper methods in support of merging batches. impl RhhValMerger where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Copy the next key in `source`. @@ -571,7 +580,8 @@ mod val_batch { /// the cursor, rather than internal state. pub struct RhhValCursor where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Absolute position of the current key. key_cursor: usize, @@ -583,7 +593,8 @@ mod val_batch { impl Cursor> for RhhValCursor where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { type Key = ::Key; type Val = ::Val; @@ -660,7 +671,8 @@ mod val_batch { /// A builder for creating layers from unsorted update tuples. pub struct RhhValBuilder where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { result: RhhValStorage, singleton: Option<(::Time, ::Diff)>, @@ -673,7 +685,8 @@ mod val_batch { impl RhhValBuilder where - ::Key: Default + HashOrdered + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, { /// Pushes a single update, which may set `self.singleton` rather than push. /// @@ -704,10 +717,12 @@ mod val_batch { impl Builder for RhhValBuilder where - ::Key: Default + HashOrdered, + ::Key: HashOrdered, + ::KeyOwned: Default + HashOrdered, + ::KeyOwned: Borrow<::Key>, RhhValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Item = ((::KeyOwned, ::ValOwned), ::Time, ::Diff); type Time = ::Time; type Output = RhhValBatch; @@ -742,9 +757,9 @@ mod val_batch { fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(&key) { + if self.result.keys.last() == Some(key.borrow()) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(&val) { + if self.result.vals.last() == Some(val.borrow()) { self.push_update(time, diff); } else { // New value; complete representation of prior value. @@ -761,7 +776,7 @@ mod val_batch { self.push_update(time, diff); self.result.vals.push(val); // Insert the key, but with no specified offset. - self.result.insert_key(&key, None); + self.result.insert_key(key.borrow(), None); } } @@ -769,9 +784,9 @@ mod val_batch { fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key) { + if self.result.keys.last() == Some(key.borrow()) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val) { + if self.result.vals.last() == Some(val.borrow()) { // TODO: here we could look for repetition, and not push the update in that case. // More logic (and state) would be required to correctly wrangle this. self.push_update(time.clone(), diff.clone()); @@ -781,7 +796,7 @@ mod val_batch { // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val); + self.result.vals.copy(val.borrow()); } } else { // New key; complete representation of prior key. @@ -790,9 +805,9 @@ mod val_batch { if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val); + self.result.vals.copy(val.borrow()); // Insert the key, but with no specified offset. - self.result.insert_key(key, None); + self.result.insert_key(key.borrow(), None); } } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 7e4ee2d66..4d464d324 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -112,8 +112,8 @@ where impl TraceReader for Spine where B: Batch+Clone+'static, - B::Key: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). - B::Val: Ord+Clone, // Clone is required by `batch::advance_*` (in-place could remove). + B::Key: Ord, // Clone is required by `batch::advance_*` (in-place could remove). + B::Val: Ord, // Clone is required by `batch::advance_*` (in-place could remove). B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::R: Semigroup, { @@ -260,8 +260,8 @@ where impl Trace for Spine where B: Batch+Clone+'static, - B::Key: Ord+Clone, - B::Val: Ord+Clone, + B::Key: Ord, + B::Val: Ord, B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::R: Semigroup, BA: Batcher