Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Merge branch 'TimelyDataflow:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
antiguru authored Nov 9, 2023
2 parents 2ebd82d + 0673ecd commit 40fbd33
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 82 deletions.
48 changes: 48 additions & 0 deletions src/dynamic/pointstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,51 @@ impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
Self { vector }
}
}

use timely::container::columnation::{Columnation, Region};
impl<T: Columnation> Columnation for PointStamp<T> {
type InnerRegion = PointStampStack<T::InnerRegion>;
}

/// Stack for PointStamp. Part of Columnation implementation.
pub struct PointStampStack<R: Region>(<Vec<R::Item> as Columnation>::InnerRegion)
where
<R as Region>::Item: Columnation;

impl<R: Region> Default for PointStampStack<R>
where
<R as Region>::Item: Columnation
{
#[inline]
fn default() -> Self {
Self(Default::default())
}
}

impl<R: Region> Region for PointStampStack<R>
where
<R as Region>::Item: Columnation
{
type Item = PointStamp<R::Item>;

#[inline]
unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
Self::Item { vector: self.0.copy(&item.vector) }
}

fn clear(&mut self) {
self.0.clear();
}

fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator<Item=&'a Self::Item> + Clone {
self.0.reserve_items(items.map(|x| &x.vector));
}

fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator<Item=&'a Self> + Clone {
self.0.reserve_regions(regions.map(|r| &r.0));
}

fn heap_size(&self, callback: impl FnMut(usize, usize)) {
self.0.heap_size(callback);
}
}
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,17 @@ impl Config {
pub fn configure(config: &mut timely::WorkerConfig, options: &Config) {
if let Some(effort) = options.idle_merge_effort {
config.set("differential/idle_merge_effort".to_string(), effort);
config.set::<trace::ExertionLogic>(
"differential/default_exert_logic".to_string(),
std::sync::Arc::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
}),
);
}
}
20 changes: 8 additions & 12 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use timely::dataflow::operators::Capability;
use ::{Data, ExchangeData, Collection, AsCollection, Hashable};
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;

use trace::wrappers::enter::{TraceEnter, BatchEnter};
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
use trace::wrappers::filter::{TraceFilter, BatchFilter};
Expand Down Expand Up @@ -563,15 +563,13 @@ where
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

let (activator, effort) =
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.scope().activator_for(&info.address[..])), Some(effort))
let activator = Some(self.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
// If there is default exertion logic set, install it.
if let Some(exert_logic) = self.inner.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}
else {
(None, None)
};

let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);

*reader = Some(reader_local);
Expand Down Expand Up @@ -672,9 +670,7 @@ where
prev_frontier.extend(input.frontier().frontier().iter().cloned());
}

if let Some(mut fuel) = effort.clone() {
writer.exert(&mut fuel);
}
writer.exert();
}
})
};
Expand Down
23 changes: 9 additions & 14 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ use timely::dataflow::operators::Capability;

use ::{ExchangeData, Hashable};
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, Cursor};
use trace::{self, Trace, TraceReader, Batch, Cursor};

use trace::Builder;

Expand Down Expand Up @@ -165,20 +165,17 @@ where
register.get::<::logging::DifferentialEvent>("differential/arrange")
};

// Establish compaction effort to apply even without updates.
let (activator, effort) =
if let Some(effort) = stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(stream.scope().activator_for(&info.address[..])), Some(effort))
}
else {
(None, None)
};

// Tracks the lower envelope of times in `priority_queue`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
let activator = Some(stream.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);

if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}

let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
// Capture the reader outside the builder scope.
*reader = Some(reader_local.clone());
Expand Down Expand Up @@ -334,9 +331,7 @@ where
reader_local.set_physical_compaction(prev_frontier.borrow());
}

if let Some(mut fuel) = effort.clone() {
writer.exert(&mut fuel);
}
writer.exert();
}
})
};
Expand Down
4 changes: 2 additions & 2 deletions src/operators/arrange/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ where
}

/// Exerts merge effort, even without additional updates.
pub fn exert(&mut self, fuel: &mut isize) {
pub fn exert(&mut self) {
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.exert(fuel);
trace.borrow_mut().trace.exert();
}
}

Expand Down
31 changes: 13 additions & 18 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use timely::dataflow::operators::Capability;

use operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent};
use lattice::Lattice;
use trace::{Batch, BatchReader, Cursor, Trace, Builder};
use trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic};
use trace::cursor::CursorList;
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
Expand Down Expand Up @@ -351,17 +351,14 @@ where
register.get::<::logging::DifferentialEvent>("differential/arrange")
};

// Determine if we should regularly exert the trace maintenance machinery,
// and with what amount of effort each time.
let (activator, effort) =
if let Some(effort) = self.stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.stream.scope().activator_for(&operator_info.address[..])), Some(effort))
let activator = Some(self.stream.scope().activator_for(&operator_info.address[..]));
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
// If there is default exert logic set, install it.
if let Some(exert_logic) = self.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
empty.set_exert_logic(exert_logic);
}
else {
(None, None)
};

let empty = T2::new(operator_info.clone(), logger.clone(), activator);

let mut source_trace = self.trace.clone();

let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
Expand Down Expand Up @@ -498,8 +495,8 @@ where
while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() {

// Determine the next key we will work on; could be synthetic, could be from a batch.
let key1 = exposed.get(exposed_position).map(|x| x.0.clone());
let key2 = batch_cursor.get_key(&batch_storage).map(|k| k.clone());
let key1 = exposed.get(exposed_position).map(|x| &x.0);
let key2 = batch_cursor.get_key(&batch_storage);
let key = match (key1, key2) {
(Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
(Some(key1), None) => key1,
Expand All @@ -514,7 +511,7 @@ where
interesting_times.clear();

// Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
while exposed.get(exposed_position).map(|x| &x.0) == Some(&key) {
while exposed.get(exposed_position).map(|x| &x.0) == Some(key) {
interesting_times.push(exposed[exposed_position].1.clone());
exposed_position += 1;
}
Expand All @@ -524,7 +521,7 @@ where

// do the per-key computation.
let _counters = thinker.compute(
&key,
key,
(&mut source_cursor, source_storage),
(&mut output_cursor, output_storage),
(&mut batch_cursor, batch_storage),
Expand All @@ -535,7 +532,7 @@ where
&mut new_interesting_times,
);

if batch_cursor.get_key(batch_storage) == Some(&key) {
if batch_cursor.get_key(batch_storage) == Some(key) {
batch_cursor.step_key(batch_storage);
}

Expand Down Expand Up @@ -629,9 +626,7 @@ where
}

// Exert trace maintenance if we have been so requested.
if let Some(mut fuel) = effort.clone() {
output_writer.exert(&mut fuel);
}
output_writer.exert();
}
}
)
Expand Down
15 changes: 3 additions & 12 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,6 @@ where
}
}


#[inline]
unsafe fn push_unchecked<T>(vec: &mut Vec<T>, element: T) {
debug_assert!(vec.len() < vec.capacity());
let idx = vec.len();
vec.set_len(idx + 1);
::std::ptr::write(vec.get_unchecked_mut(idx), element);
}

pub struct MergeSorter<D: Ord, T: Ord, R: Semigroup> {
queue: Vec<Vec<Vec<(D, T, R)>>>, // each power-of-two length list of allocations.
stash: Vec<Vec<(D, T, R)>>,
Expand Down Expand Up @@ -242,14 +233,14 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
(&x.0, &x.1).cmp(&(&y.0, &y.1))
};
match cmp {
Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop_front().unwrap()); } }
Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop_front().unwrap()); } }
Ordering::Less => result.push(head1.pop_front().unwrap()),
Ordering::Greater => result.push(head2.pop_front().unwrap()),
Ordering::Equal => {
let (data1, time1, mut diff1) = head1.pop_front().unwrap();
let (_data2, _time2, diff2) = head2.pop_front().unwrap();
diff1.plus_equals(&diff2);
if !diff1.is_zero() {
unsafe { push_unchecked(&mut result, (data1, time1, diff1)); }
result.push((data1, time1, diff1));
}
}
}
Expand Down
Loading

0 comments on commit 40fbd33

Please sign in to comment.