Skip to content

Commit

Permalink
clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Dec 3, 2023
1 parent d8c855e commit ae9073c
Show file tree
Hide file tree
Showing 18 changed files with 124 additions and 130 deletions.
2 changes: 0 additions & 2 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ where
G::Timestamp: Lattice,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::KeyOwned: Hashable,
// Tr::Key: Ord+Hashable+Sized,
// Tr::Val: Clone,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut Tr::KeyOwned)+Clone+'static,
D: ExchangeData,
Expand Down
1 change: 0 additions & 1 deletion examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ where
Tr::ValOwned: Debug + Clone,
Tr::Time: Debug + Clone,
Tr::Diff: Debug + Clone,
<Tr::Cursor as Cursor>::ValOwned: Debug,
{
let (mut cursor, storage) = trace.cursor();
for ((k, v), diffs) in cursor.to_vec(&storage).iter() {
Expand Down
28 changes: 14 additions & 14 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ fn main() {
let (keys_input, keys) = scope.new_collection::<String, isize>();

match mode.as_str() {
// "new" => {
// use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;
// let data = data.arrange::<ColKeySpine<_,_,_>>();
// let keys = keys.arrange::<ColKeySpine<_,_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
// "old" => {
// use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine;
// let data = data.arrange::<OrdKeySpine<_,_,_>>();
// let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
// keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
// .probe_with(&mut probe);
// },
"new" => {
use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;
let data = data.arrange::<ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"old" => {
use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine;
let data = data.arrange::<OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
// "rhh" => {
// use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
// let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
Expand Down
10 changes: 5 additions & 5 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
F: for <'b> FnMut(Tr::Key<'b>, Tr::Val<'b>, &G::Timestamp)->TInner+Clone+'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &G::Timestamp)->TInner+Clone+'static,
P: FnMut(&TInner)->Tr::Time+Clone+'static,
{
let logic1 = logic.clone();
Expand Down Expand Up @@ -175,7 +175,7 @@ where
where
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
F: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>)->bool+Clone+'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
{
let logic1 = logic.clone();
let logic2 = logic.clone();
Expand All @@ -192,7 +192,7 @@ where
pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
where
Tr::Diff: Semigroup,
L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> D+'static,
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
{
self.flat_map_ref(move |key, val| Some(logic(key,val)))
}
Expand All @@ -206,7 +206,7 @@ where
Tr::Diff: Semigroup,
I: IntoIterator,
I::Item: Data,
L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static,
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
{
Self::flat_map_batches(&self.stream, logic)
}
Expand All @@ -223,7 +223,7 @@ where
Tr::Diff: Semigroup,
I: IntoIterator,
I::Item: Data,
L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static,
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
{
stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
input.for_each(|time, data| {
Expand Down
13 changes: 7 additions & 6 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ where G::Timestamp: TotalOrder+Lattice+Ord {
}
}

impl<G: Scope, K, T1> CountTotal<G, K, T1::Diff> for Arranged<G, T1>
impl<G: Scope, T1> CountTotal<G, T1::KeyOwned, T1::Diff> for Arranged<G, T1>
where
G::Timestamp: TotalOrder+Lattice+Ord,
T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static,
K: ExchangeData,
T1: for<'a> TraceReader<Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static,
T1::KeyOwned: ExchangeData,
T1::Diff: ExchangeData+Semigroup,
{
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (K, T1::Diff), R2> {
fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::KeyOwned, T1::Diff), R2> {

let mut trace = self.trace.clone();
let mut buffer = Vec::new();
Expand All @@ -74,6 +74,7 @@ where

move |input, output| {

use trace::cursor::MyTrait;
input.for_each(|capability, batches| {
batches.swap(&mut buffer);
let mut session = output.session(&capability);
Expand All @@ -97,14 +98,14 @@ where

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), R2::from(-1i8)));
session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(-1i8)));
}
}
count.as_mut().map(|c| c.plus_equals(diff));
if count.is_none() { count = Some(diff.clone()); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), R2::from(1i8)));
session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(1i8)));
}
}
});
Expand Down
6 changes: 3 additions & 3 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ impl<G, K, V, T1> JoinCore<G, K, V, T1::Diff> for Arranged<G,T1>
G: Scope,
G::Timestamp: Lattice+Ord,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V, Time=G::Timestamp>+Clone+'static,
K: Ord+'static + ?Sized,
V: Ord+'static + ?Sized,
K: Ord+'static,
V: Ord+'static,
T1::Diff: Semigroup,
{
fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<Tr2::Diff>>::Output>
Expand Down Expand Up @@ -796,7 +796,7 @@ where
}
}

fn think<'b, F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&'b mut self, mut results: F) {
fn think<F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {

// for reasonably sized edits, do the dead-simple thing.
if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
Expand Down
7 changes: 2 additions & 5 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ struct ValueHistory<'storage, C: Cursor> where C::Time: Sized, C::Diff: Sized {

impl<'storage, C: Cursor> ValueHistory<'storage, C>
where
// C::Val: Ord+'storage,
C::Time: Lattice+Ord+Clone,
C::Diff: Semigroup,
{
Expand Down Expand Up @@ -136,7 +135,7 @@ where
}

/// Organizes history based on current contents of edits.
fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> where 'storage: 'history {
fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {

self.buffer.clear();
self.history.clear();
Expand Down Expand Up @@ -164,7 +163,6 @@ struct HistoryReplay<'storage, 'history, C>
where
'storage: 'history,
C: Cursor,
// C::Val: Ord+'storage,
C::Time: Lattice+Ord+Clone+'history,
C::Diff: Semigroup+'history,
{
Expand All @@ -175,13 +173,12 @@ impl<'storage, 'history, C> HistoryReplay<'storage, 'history, C>
where
'storage: 'history,
C: Cursor,
// C::Val: Ord+'storage,
C::Time: Lattice+Ord+Clone+'history,
C::Diff: Semigroup+'history,
{
fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
fn edit<'s>(&'s self) -> Option<(C::Val<'storage>, &'s C::Time, &'s C::Diff)> {
fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
}

Expand Down
4 changes: 2 additions & 2 deletions src/trace/cursor/cursor_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<C: Cursor> Cursor for CursorList<C> {
self.minimize_keys(storage);
}
#[inline]
fn seek_key<'a>(&mut self, storage: &Vec<C::Storage>, key: Self::Key<'a>) {
fn seek_key(&mut self, storage: &Vec<C::Storage>, key: Self::Key<'_>) {
for index in 0 .. self.cursors.len() {
self.cursors[index].seek_key(&storage[index], key);
}
Expand All @@ -146,7 +146,7 @@ impl<C: Cursor> Cursor for CursorList<C> {
self.minimize_vals(storage);
}
#[inline]
fn seek_val<'a>(&mut self, storage: &Vec<C::Storage>, val: Self::Val<'a>) {
fn seek_val(&mut self, storage: &Vec<C::Storage>, val: Self::Val<'_>) {
for &index in self.min_key.iter() {
self.cursors[index].seek_val(&storage[index], val);
}
Expand Down
4 changes: 2 additions & 2 deletions src/trace/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub trait Cursor {
/// Advances the cursor to the next key.
fn step_key(&mut self, storage: &Self::Storage);
/// Advances the cursor to the specified key.
fn seek_key<'a>(&mut self, storage: &Self::Storage, key: Self::Key<'a>);
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>);
/// Convenience method to get access by reference to an owned key.
fn seek_key_owned<'a>(&mut self, storage: &Self::Storage, key: &'a Self::KeyOwned) {
self.seek_key(storage, <Self::Key<'a> as MyTrait<'a>>::borrow_as(key));
Expand All @@ -111,7 +111,7 @@ pub trait Cursor {
/// Advances the cursor to the next value.
fn step_val(&mut self, storage: &Self::Storage);
/// Advances the cursor to the specified value.
fn seek_val<'a>(&mut self, storage: &Self::Storage, val: Self::Val<'a>);
fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>);

/// Rewinds the cursor to the first key.
fn rewind_keys(&mut self, storage: &Self::Storage);
Expand Down
2 changes: 1 addition & 1 deletion src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ where
fn step_key(&mut self, storage: &OrdValBatch<L>){ self.cursor.step(&storage.layer); }
fn seek_key(&mut self, storage: &OrdValBatch<L>, key: &Self::Key) { self.cursor.seek(&storage.layer, key); }
fn step_val(&mut self, storage: &OrdValBatch<L>) { self.cursor.child.step(&storage.layer.vals); }
fn seek_val<'a>(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'a>) { self.cursor.child.seek(&storage.layer.vals, val); }
fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'a>) { self.cursor.child.seek(&storage.layer.vals, val); }
fn rewind_keys(&mut self, storage: &OrdValBatch<L>) { self.cursor.rewind(&storage.layer); }
fn rewind_vals(&mut self, storage: &OrdValBatch<L>) { self.cursor.child.rewind(&storage.layer.vals); }
}
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ mod val_batch {
self.key_cursor = storage.storage.keys.len();
}
}
fn seek_key<'a>(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'a>) {
fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(&key));
if self.key_valid(storage) {
self.rewind_vals(storage);
Expand All @@ -482,7 +482,7 @@ mod val_batch {
self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
}
}
fn seek_val<'a>(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'a>) {
fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(&val));
}
fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
Expand Down Expand Up @@ -940,7 +940,7 @@ mod key_batch {
self.key_cursor = storage.storage.keys.len();
}
}
fn seek_key<'a>(&mut self, storage: &Self::Storage, key: Self::Key<'a>) {
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(&key));
if self.key_valid(storage) {
self.rewind_vals(storage);
Expand All @@ -949,7 +949,7 @@ mod key_batch {
fn step_val(&mut self, _storage: &Self::Storage) {
self.val_stepped = true;
}
fn seek_val<'a>(&mut self, _storage: &Self::Storage, _val: Self::Val<'a>) { }
fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
fn rewind_keys(&mut self, storage: &Self::Storage) {
self.key_cursor = 0;
if self.key_valid(storage) {
Expand Down
4 changes: 2 additions & 2 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ mod val_batch {
self.key_cursor = storage.storage.keys.len();
}
}
fn seek_key<'a>(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'a>) {
fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
// self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key));
let desired = storage.storage.desired_location(&key);
// Advance the cursor, if `desired` is ahead of it.
Expand All @@ -659,7 +659,7 @@ mod val_batch {
self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
}
}
fn seek_val<'a>(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'a>) {
fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(&val));
}
fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
Expand Down
Loading

0 comments on commit ae9073c

Please sign in to comment.