Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Introduce and integrate IntoOwned trait (TimelyDataflow#495)
Browse files Browse the repository at this point in the history
* Introduce and integrate IntoOwned trait

* Respond to feedback
  • Loading branch information
frankmcsherry authored May 24, 2024
1 parent 85b126c commit 2de0cbd
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 182 deletions.
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ where
// Use TOTAL ORDER to allow the release of `time`.
yielded = yielded || yield_function(timer, work);
if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(key));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(key));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) {
while let Some(val2) = cursor.get_val(&storage) {
cursor.map_times(&storage, |t, d| {
if comparison(t, initial) {
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ where
for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
if !input2.frontier.less_equal(time) {
logic2(prefix, &mut key1);
use differential_dataflow::trace::cursor::MyTrait;
cursor.seek_key(&storage, MyTrait::borrow_as(&key1));
if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) {
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(&key1));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
Expand Down
6 changes: 3 additions & 3 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,14 @@ where
let mut builder = Tr::Builder::new();
for (key, mut list) in to_process.drain(..) {

use trace::cursor::MyTrait;
use trace::cursor::IntoOwned;

// The prior value associated with the key.
let mut prev_value: Option<V> = None;

// Attempt to find the key in the trace.
trace_cursor.seek_key_owned(&trace_storage, &key);
if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) {
trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key));
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) {
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>>,
{
use crate::operators::arrange::arrangement::Arrange;
use crate::trace::cursor::MyTrait;
use crate::trace::cursor::IntoOwned;
self.map(|k| (k, ()))
.arrange_named::<Tr>(name)
.as_collection(|d, _| d.into_owned())
Expand Down
2 changes: 1 addition & 1 deletion src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where

move |input, output| {

use crate::trace::cursor::MyTrait;
use crate::trace::cursor::IntoOwned;
input.for_each(|capability, batches| {
batches.swap(&mut buffer);
let mut session = output.session(&capability);
Expand Down
6 changes: 3 additions & 3 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,10 @@ where
while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() {

use std::borrow::Borrow;
use crate::trace::cursor::MyTrait;
use crate::trace::cursor::IntoOwned;

// Determine the next key we will work on; could be synthetic, could be from a batch.
let key1 = exposed.get(exposed_position).map(|x| <_ as MyTrait>::borrow_as(&x.0));
let key1 = exposed.get(exposed_position).map(|x| <_ as IntoOwned>::borrow_as(&x.0));
let key2 = batch_cursor.get_key(batch_storage);
let key = match (key1, key2) {
(Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
Expand All @@ -497,7 +497,7 @@ where
interesting_times.clear();

// Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.equals(k)).unwrap_or(false) {
while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(&<T1::Key<'_> as IntoOwned>::borrow_as(&k))).unwrap_or(false) {
interesting_times.push(exposed[exposed_position].1.clone());
exposed_position += 1;
}
Expand Down
46 changes: 13 additions & 33 deletions src/trace/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,40 @@ pub mod cursor_list;
pub use self::cursor_list::CursorList;

use std::borrow::Borrow;
use std::cmp::Ordering;

/// A type that may be converted into and compared with another type.
/// A reference type corresponding to an owned type, supporting conversion in each direction.
///
/// The type must also be comparable with itself, and follow the same
/// order as if converting instances to `T` and comparing the results.
pub trait MyTrait<'a> : Ord {
/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT.
/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the
/// requirement that the other trait implement `Borrow`, for which a borrow must result in a
/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type.
pub trait IntoOwned<'a> {
/// Owned type into which this type can be converted.
type Owned;
/// Conversion from an instance of this type to the owned type.
fn into_owned(self) -> Self::Owned;
///
/// Clones `self` onto an existing instance of the owned type.
fn clone_onto(&self, other: &mut Self::Owned);
/// Indicates that `self <= other`; used for sorting.
fn compare(&self, other: &Self::Owned) -> Ordering;
/// `self <= other`
fn less_equals(&self, other: &Self::Owned) -> bool {
self.compare(other) != Ordering::Greater
}
/// `self == other`
fn equals(&self, other: &Self::Owned) -> bool {
self.compare(other) == Ordering::Equal
}
/// `self < other`
fn less_than(&self, other: &Self::Owned) -> bool {
self.compare(other) == Ordering::Less
}
/// Borrows an owned instance as onesself.
fn borrow_as(other: &'a Self::Owned) -> Self;
/// Borrows an owned instance as oneself.
fn borrow_as(owned: &'a Self::Owned) -> Self;
}

impl<'a, T: Ord+ToOwned+?Sized> MyTrait<'a> for &'a T {
impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T {
type Owned = T::Owned;
fn into_owned(self) -> Self::Owned { self.to_owned() }
fn clone_onto(&self, other: &mut Self::Owned) { <T as ToOwned>::clone_into(self, other) }
fn compare(&self, other: &Self::Owned) -> Ordering { self.cmp(&other.borrow()) }
fn borrow_as(other: &'a Self::Owned) -> Self {
other.borrow()
}
fn borrow_as(owned: &'a Self::Owned) -> Self { owned.borrow() }
}

/// A cursor for navigating ordered `(key, val, time, diff)` updates.
pub trait Cursor {

/// Key by which updates are indexed.
type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>;
type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>;
/// Owned version of the above.
type KeyOwned: Ord + Clone;
/// Values associated with keys.
type Val<'a>: Copy + Clone + MyTrait<'a> + for<'b> PartialOrd<Self::Val<'b>>;
type Val<'a>: Copy + Clone + Ord + IntoOwned<'a> + for<'b> PartialOrd<Self::Val<'b>>;
/// Timestamps associated with updates
type Time: Timestamp + Lattice + Ord + Clone;
/// Associated update.
Expand Down Expand Up @@ -103,10 +87,6 @@ pub trait Cursor {
fn step_key(&mut self, storage: &Self::Storage);
/// Advances the cursor to the specified key.
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>);
/// Convenience method to get access by reference to an owned key.
fn seek_key_owned(&mut self, storage: &Self::Storage, key: &Self::KeyOwned) {
self.seek_key(storage, MyTrait::borrow_as(key));
}

/// Advances the cursor to the next value.
fn step_val(&mut self, storage: &Self::Storage);
Expand Down
32 changes: 12 additions & 20 deletions src/trace/implementations/huffman_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,8 @@ where
}
}

impl<B> BatchContainer for HuffmanContainer<B>
where
B: Ord + Clone + Sized + 'static,
{
type PushItem = Vec<B>;
type ReadItem<'a> = Wrapped<'a, B>;
use crate::trace::implementations::containers::Push;
impl<B: Ord + Clone + 'static> Push<Vec<B>> for HuffmanContainer<B> {
fn push(&mut self, item: Vec<B>) {
for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
match &mut self.inner {
Expand All @@ -51,10 +47,12 @@ where
}
}
}
fn copy_push(&mut self, item: &Vec<B>) {
use crate::trace::MyTrait;
self.copy(<_ as MyTrait>::borrow_as(item));
}
}

impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
type OwnedItem = Vec<B>;
type ReadItem<'a> = Wrapped<'a, B>;

fn copy(&mut self, item: Self::ReadItem<'_>) {
match item.decode() {
Ok(decoded) => {
Expand Down Expand Up @@ -152,7 +150,7 @@ impl<B: Ord+Clone> Default for HuffmanContainer<B> {

mod wrapper {

use crate::trace::MyTrait;
use crate::trace::IntoOwned;
use super::Encoded;

pub struct Wrapped<'a, B: Ord> {
Expand Down Expand Up @@ -205,7 +203,7 @@ mod wrapper {
self.partial_cmp(other).unwrap()
}
}
impl<'a, B: Ord+Clone> MyTrait<'a> for Wrapped<'a, B> {
impl<'a, B: Ord+Clone> IntoOwned<'a> for Wrapped<'a, B> {
type Owned = Vec<B>;
fn into_owned(self) -> Self::Owned {
match self.decode() {
Expand All @@ -220,14 +218,8 @@ mod wrapper {
Err(bytes) => other.extend_from_slice(bytes),
}
}
fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering {
match self.decode() {
Ok(decode) => decode.partial_cmp(other.iter()).unwrap(),
Err(bytes) => bytes.cmp(&other[..]),
}
}
fn borrow_as(other: &'a Self::Owned) -> Self {
Self { inner: Err(&other[..]) }
fn borrow_as(owned: &'a Self::Owned) -> Self {
Self { inner: Err(&owned[..]) }
}
}
}
Expand Down
Loading

0 comments on commit 2de0cbd

Please sign in to comment.