From 13c4f1ff2f25925977068a6c53122ef645a37882 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 4 Dec 2024 19:40:41 -0500 Subject: [PATCH] Update around container traits --- src/consolidation.rs | 6 +++--- src/trace/implementations/chunker.rs | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index 310628f4a..deb859fa5 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -13,7 +13,7 @@ use std::cmp::Ordering; use std::collections::VecDeque; use timely::Container; -use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use timely::container::{ContainerBuilder, PushInto}; use timely::container::flatcontainer::{FlatStack, Push, Region}; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::Data; @@ -156,7 +156,7 @@ where // TODO: Can we replace `multiple` by a bool? #[cold] fn consolidate_and_flush_through(&mut self, multiple: usize) { - let preferred_capacity = >::preferred_capacity(); + let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>(); consolidate_updates(&mut self.current); let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable(); while drain.peek().is_some() { @@ -180,7 +180,7 @@ where /// Precondition: `current` is not allocated or has space for at least one element. #[inline] fn push_into(&mut self, item: P) { - let preferred_capacity = >::preferred_capacity(); + let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>(); if self.current.capacity() < preferred_capacity * 2 { self.current.reserve(preferred_capacity * 2 - self.current.capacity()); } diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 662957c82..40553897c 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -264,15 +264,15 @@ where + PushInto>, { fn push_into(&mut self, container: &'a mut Input) { - if self.pending.capacity() < Output::preferred_capacity() { - self.pending.reserve(Output::preferred_capacity() - self.pending.len()); - } + self.pending.ensure_capacity(&mut None); + let form_batch = |this: &mut Self| { - if this.pending.len() == this.pending.capacity() { + if this.pending.at_capacity() { + let starting_len = this.pending.len(); consolidate_container(&mut this.pending, &mut this.empty); std::mem::swap(&mut this.pending, &mut this.empty); this.empty.clear(); - if this.pending.len() > this.pending.capacity() / 2 { + if this.pending.len() > starting_len / 2 { // Note that we're pushing non-full containers, which is a deviation from // other implementation. The reason for this is that we cannot extract // partial data from `this.pending`. We should revisit this in the future.