Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Update to latest Timely (#519) #17

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ graph_map = "0.1"

[dependencies]
serde = { version = "1.0", features = ["derive"] }
abomonation = "0.7"
abomonation_derive = "0.5"
fnv="1.0.2"
timely = {workspace = true}

Expand Down
2 changes: 0 additions & 2 deletions dogsdogsdogs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ license = "MIT"
edition = "2021"

[dependencies]
abomonation = "0.7"
abomonation_derive = "0.5"
timely = { workspace = true }
differential-dataflow = { path = "../", default-features = false }
serde = "1"
Expand Down
3 changes: 1 addition & 2 deletions dogsdogsdogs/src/altneu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
//! element of the second lattice, if neither first element equals
//! the join.

use abomonation_derive::Abomonation;
use serde_derive::{Deserialize, Serialize};

/// A pair of timestamps, partially ordered by the product order.
#[derive(Debug, Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Serialize, Deserialize)]
#[derive(Debug, Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct AltNeu<T> {
pub time: T,
pub neu: bool, // alt < neu in timestamp comparisons.
Expand Down
4 changes: 1 addition & 3 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ where
stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| {

// Acquire an activator to reschedule the operator when it has unfinished work.
use timely::scheduling::Activator;
let activations = stream.scope().activations();
let activator = Activator::new(&info.address[..], activations);
let activator = stream.scope().activator_for(info.address);

move |input1, input2, output| {

Expand Down
2 changes: 1 addition & 1 deletion examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn main() {
// create a source operator which will produce random edges and delete them.
timely::dataflow::operators::generic::source(scope, "RandomGraph", |mut capability, info| {

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);

let seed: &[_] = &[1, 2, 3, index];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
Expand Down
3 changes: 1 addition & 2 deletions examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use abomonation_derive::Abomonation;
use rand::{Rng, SeedableRng, StdRng};
use serde::{Deserialize, Serialize};

Expand All @@ -13,7 +12,7 @@ use differential_dataflow::lattice::Lattice;
type Node = u32;
type Edge = (Node, Node);

#[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
#[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
pub struct MinSum {
value: u32,
}
Expand Down
6 changes: 2 additions & 4 deletions examples/multitemporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn main() {
mod pair {

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Pair<S, T> {
pub first: S,
pub second: T,
Expand Down Expand Up @@ -203,7 +203,6 @@ mod pair {
}

use std::fmt::{Formatter, Error, Debug};
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// Debug implementation to avoid seeing fully qualified path names.
Expand All @@ -221,11 +220,10 @@ mod pair {
/// from the rest of the library other than the traits it needs to implement. With this
/// type and its implementations, you can use it as a timestamp type.
mod vector {
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Debug, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct Vector<T> {
pub vector: Vec<T>,
}
Expand Down
2 changes: 0 additions & 2 deletions experiments/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ edition = "2021"
[dependencies]
core_affinity = "0.5.9"
rand="0.3.13"
abomonation = "0.7"
abomonation_derive = "0.5"
#timely = "0.7"
timely = { workspace = true }
differential-dataflow = { path = "../" }
Expand Down
6 changes: 2 additions & 4 deletions experiments/src/bin/multitemporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ fn main() {
mod pair {

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Pair<S, T> {
pub first: S,
pub second: T,
Expand Down Expand Up @@ -231,7 +231,6 @@ mod pair {
}

use std::fmt::{Formatter, Error, Debug};
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// Debug implementation to avoid seeing fully qualified path names.
Expand All @@ -249,11 +248,10 @@ mod pair {
/// from the rest of the library other than the traits it needs to implement. With this
/// type and its implementations, you can use it as a timestamp type.
mod vector {
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation, Debug, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct Vector<T> {
pub vector: Vec<T>,
}
Expand Down
2 changes: 1 addition & 1 deletion server/dataflows/random_graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn build((dataflow, handles, probe, timer, args): Environment) -> Result<(),
let mut trace =
source(dataflow, "RandomGraph", |cap, info| {

let activator = dataflow.activator_for(&info.address[..]);
let activator = dataflow.activator_for(info.address);
let mut hist = hdrhist::HDRHist::new();

let probe2 = probe.clone();
Expand Down
19 changes: 9 additions & 10 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
//! this file.

use std::time::Duration;
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A message in the CDC V2 protocol.
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub enum Message<D, T, R> {
/// A batch of updates that are certain to occur.
///
Expand All @@ -32,7 +31,7 @@ pub enum Message<D, T, R> {
/// Each element of `counts` is an irrevocable statement about the exact number of
/// distinct updates that occur at that time.
/// Times not present in `counts` have a count of zero.
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct Progress<T> {
/// The lower bound of times contained in this statement.
pub lower: Vec<T>,
Expand Down Expand Up @@ -310,9 +309,9 @@ pub mod source {
// Step 1: The MESSAGES operator.
let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
let address = messages_op.operator_info().address;
let activator = scope.sync_activator_for(&address);
let activator2 = scope.activator_for(&address);
let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(&address)) };
let activator = scope.sync_activator_for(address.to_vec());
let activator2 = scope.activator_for(Rc::clone(&address));
let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) };
let mut source = source_builder(activator);
let (mut updates_out, updates) = messages_op.new_output();
let (mut progress_out, progress) = messages_op.new_output();
Expand Down Expand Up @@ -582,13 +581,13 @@ pub mod sink {
// We can simply record all updates, under the presumption that the have been consolidated
// and so any record we see is in fact guaranteed to happen.
let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let reactivator = stream.scope().activator_for(builder.operator_info().address);
let mut input = builder.new_input(stream, Pipeline);
let (mut updates_out, updates) = builder.new_output();

builder.build_reschedule(
move |_capability| {
let mut timestamps = ChangeBatch::new();
let mut timestamps = <ChangeBatch<_>>::new();
let mut send_queue = std::collections::VecDeque::new();
move |_frontiers| {
let mut output = updates_out.activate();
Expand Down Expand Up @@ -636,15 +635,15 @@ pub mod sink {

// We use a lower-level builder here to get access to the operator address, for rescheduling.
let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let reactivator = stream.scope().activator_for(builder.operator_info().address);
let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();

// We now record the numbers of updates at each timestamp between lower and upper bounds.
// Track the advancing frontier, to know when to produce utterances.
let mut frontier = Antichain::from_elem(T::minimum());
// Track accumulated counts for timestamps.
let mut timestamps = ChangeBatch::new();
let mut timestamps = <ChangeBatch<_>>::new();
// Stash for serialized data yet to send.
let mut send_queue = std::collections::VecDeque::new();
let mut retain = Vec::new();
Expand Down
3 changes: 1 addition & 2 deletions src/difference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ wrapping_implementation!(std::num::Wrapping<isize>);

pub use self::present::Present;
mod present {
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A zero-sized difference that indicates the presence of a record.
Expand All @@ -168,7 +167,7 @@ mod present {
/// The primary feature of this type is that it has zero size, which reduces the overhead
/// of differential dataflow's representations for settings where collections either do
/// not change, or for which records are only added (for example, derived facts in Datalog).
#[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
#[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
pub struct Present;

impl<T: Clone> super::Multiply<T> for Present {
Expand Down
9 changes: 2 additions & 7 deletions src/dynamic/pointstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
//! (as iteration within a scope requires leaving contained scopes), and then to any number of appended
//! default coordinates (which is effectively just *setting* the coordinate).

use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// A sequence of timestamps, partially ordered by the product order.
///
/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`.
/// Sequences are guaranteed to be "minimal", and may not end with `T::minimum()` entries.
#[derive(
Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation,
)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct PointStamp<T> {
/// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes.
vector: Vec<T>,
Expand Down Expand Up @@ -118,9 +115,7 @@ impl<T: Timestamp> Refines<()> for PointStamp<T> {
use timely::progress::PathSummary;

/// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`.
#[derive(
Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation
)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
pub struct PointStampSummary<TS> {
/// Number of leading coordinates to retain.
///
Expand Down
16 changes: 8 additions & 8 deletions src/logging.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Loggers and logging events for differential dataflow.

use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};

/// Logger for differential dataflow events.
pub type Logger = ::timely::logging::Logger<DifferentialEvent>;
Expand All @@ -19,7 +19,7 @@ where
}

/// Possible different differential events.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub enum DifferentialEvent {
/// Batch creation.
Batch(BatchEvent),
Expand All @@ -36,7 +36,7 @@ pub enum DifferentialEvent {
}

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct BatchEvent {
/// Operator identifier.
pub operator: usize,
Expand All @@ -48,7 +48,7 @@ impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { D


/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct BatcherEvent {
/// Operator identifier.
pub operator: usize,
Expand All @@ -65,7 +65,7 @@ pub struct BatcherEvent {
impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct DropEvent {
/// Operator identifier.
pub operator: usize,
Expand All @@ -76,7 +76,7 @@ pub struct DropEvent {
impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct MergeEvent {
/// Operator identifier.
pub operator: usize,
Expand All @@ -93,7 +93,7 @@ pub struct MergeEvent {
impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }

/// A merge failed to complete in time.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct MergeShortfall {
/// Operator identifer.
pub operator: usize,
Expand All @@ -106,7 +106,7 @@ pub struct MergeShortfall {
impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct TraceShare {
/// Operator identifier.
pub operator: usize,
Expand Down
8 changes: 4 additions & 4 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,10 @@ where

let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);
*shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down Expand Up @@ -439,10 +439,10 @@ where

let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(&info.address[..]);
let activator = scope.activator_for(info.address);
*shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use timely::dataflow::operators::{Enter, Map};
use timely::order::PartialOrder;
use timely::dataflow::{Scope, Stream, StreamCore};

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::dataflow::StreamCore`

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

unresolved import `timely::dataflow::StreamCore`

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::dataflow::StreamCore`

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::dataflow::StreamCore`

Check failure on line 22 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.78

unresolved import `timely::dataflow::StreamCore`
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
use timely::progress::Timestamp;
Expand Down Expand Up @@ -75,8 +75,8 @@

use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::Container;

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::Container`

Check failure on line 78 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.78

unresolved import `timely::Container`
use timely::container::PushInto;

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::container`

Check failure on line 79 in src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.78

unresolved import `timely::container`

impl<G, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -439,7 +439,7 @@
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

let activator = Some(scope.activator_for(&info.address[..]));
let activator = Some(scope.activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
// If there is default exertion logic set, install it.
if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let activator = Some(stream.scope().activator_for(&info.address[..]));
let activator = Some(stream.scope().activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);

if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
//! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c)
//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
use std::cmp::Ordering;
use timely::Container;

Check failure on line 7 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::Container`

Check failure on line 7 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

unresolved import `timely::Container`

Check failure on line 7 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::Container`

Check failure on line 7 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::Container`

Check failure on line 7 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.78

unresolved import `timely::Container`

use timely::container::{ContainerBuilder, PushInto};

Check failure on line 9 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::container`

Check failure on line 9 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

unresolved import `timely::container`

Check failure on line 9 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::container`

Check failure on line 9 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::container`

Check failure on line 9 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.78

unresolved import `timely::container`
use timely::order::PartialOrder;
use timely::progress::Timestamp;
use timely::dataflow::{Scope, StreamCore};

Check failure on line 12 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::dataflow::StreamCore`

Check failure on line 12 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

unresolved import `timely::dataflow::StreamCore`

Check failure on line 12 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::dataflow::StreamCore`

Check failure on line 12 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::dataflow::StreamCore`

Check failure on line 12 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.78

unresolved import `timely::dataflow::StreamCore`
use timely::dataflow::operators::generic::{Operator, OutputHandleCore};

Check failure on line 13 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::dataflow::operators::generic::OutputHandleCore`

Check failure on line 13 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

unresolved import `timely::dataflow::operators::generic::OutputHandleCore`

Check failure on line 13 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::dataflow::operators::generic::OutputHandleCore`

Check failure on line 13 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::dataflow::operators::generic::OutputHandleCore`

Check failure on line 13 in src/operators/join.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.78

unresolved import `timely::dataflow::operators::generic::OutputHandleCore`
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::Counter;
Expand Down Expand Up @@ -381,7 +381,7 @@
// Acquire an activator to reschedule the operator when it has unfinished work.
use timely::scheduling::Activator;
let activations = arranged1.stream.scope().activations().clone();
let activator = Activator::new(&info.address[..], activations);
let activator = Activator::new(info.address, activations);

// Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
// These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
Expand Down
2 changes: 1 addition & 1 deletion src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
//! to the key and the list of values.
//! The function is expected to populate a list of output values.

use timely::Container;

Check failure on line 8 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::Container`

Check failure on line 8 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

unresolved import `timely::Container`

Check failure on line 8 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::Container`

Check failure on line 8 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::Container`

Check failure on line 8 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.78

unresolved import `timely::Container`
use timely::container::PushInto;

Check failure on line 9 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / test mdBook

unresolved import `timely::container`

Check failure on line 9 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

unresolved import `timely::container`

Check failure on line 9 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::container`

Check failure on line 9 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust stable

unresolved import `timely::container`

Check failure on line 9 in src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / cargo test on macos, rust 1.78

unresolved import `timely::container`
use crate::hashable::Hashable;
use crate::{Data, ExchangeData, Collection};
use crate::difference::{Semigroup, Abelian};
Expand Down Expand Up @@ -338,7 +338,7 @@
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
};

let activator = Some(trace.stream.scope().activator_for(&operator_info.address[..]));
let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
// If there is default exert logic set, install it.
if let Some(exert_logic) = trace.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
Expand Down
Loading
Loading