Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Re-introduce safe implementation (TimelyDataflow#481)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored May 6, 2024
1 parent 1eb052c commit de6a353
Showing 1 changed file with 51 additions and 67 deletions.
118 changes: 51 additions & 67 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,49 +37,40 @@ pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usi
/// Sorts and consolidates a slice, returning the valid prefix length.
pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_by(|x,y| x.0.cmp(&y.0));

let slice_ptr = slice.as_mut_ptr();

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
for index in 1 .. slice.len() {

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {

assert!(offset < index);

// LOOP INVARIANT: offset < index
let ptr1 = slice_ptr.add(offset);
let ptr2 = slice_ptr.add(index);

if (*ptr1).0 == (*ptr2).0 {
(*ptr1).1.plus_equals(&(*ptr2).1);
if slice.len() > 1 {

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_by(|x,y| x.0.cmp(&y.0));

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
let mut accum = slice[offset].1.clone();

for index in 1 .. slice.len() {
if slice[index].0 == slice[index-1].0 {
accum.plus_equals(&slice[index].1);
}
else {
if !(*ptr1).1.is_zero() {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].1.clone_from(&accum);
offset += 1;
}
let ptr1 = slice_ptr.add(offset);
std::ptr::swap(ptr1, ptr2);
accum.clone_from(&slice[index].1);
}
}
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].1 = accum;
offset += 1;
}

offset
}
if offset < slice.len() && !slice[offset].1.is_zero() {
offset += 1;
else {
slice.iter().filter(|x| !x.1.is_zero()).count()
}

offset
}

/// Sorts and consolidates `vec`.
Expand All @@ -104,50 +95,43 @@ pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D,
/// Sorts and consolidates a slice, returning the valid prefix length.
pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));

let slice_ptr = slice.as_mut_ptr();
if slice.len() > 1 {

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
for index in 1 .. slice.len() {
// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {
// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
let mut accum = slice[offset].2.clone();

// LOOP INVARIANT: offset < index
let ptr1 = slice_ptr.add(offset);
let ptr2 = slice_ptr.add(index);

if (*ptr1).0 == (*ptr2).0 && (*ptr1).1 == (*ptr2).1 {
(*ptr1).2.plus_equals(&(*ptr2).2);
for index in 1 .. slice.len() {
if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
accum.plus_equals(&slice[index].2);
}
else {
if !(*ptr1).2.is_zero() {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].2.clone_from(&accum);
offset += 1;
}
let ptr1 = slice_ptr.add(offset);
std::ptr::swap(ptr1, ptr2);
accum.clone_from(&slice[index].2);
}

}
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].2 = accum;
offset += 1;
}

offset
}
if offset < slice.len() && !slice[offset].2.is_zero() {
offset += 1;
else {
slice.iter().filter(|x| !x.2.is_zero()).count()
}

offset
}


/// A container builder that consolidates data in-places into fixed-sized containers. Does not
/// maintain FIFO ordering.
#[derive(Default)]
Expand Down Expand Up @@ -332,6 +316,6 @@ mod tests {
for i in 0..1024 {
assert_eq!((i, 0, 2), collected[i]);
}

}
}

0 comments on commit de6a353

Please sign in to comment.