From 173577b093d919dd56dae68537d8230981429ade Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 29 Aug 2023 14:13:31 -0700 Subject: [PATCH] fix(7181): improve performance by using hasher on tuple (unqiue sliced record batch) --- datafusion/core/src/physical_plan/sorts/stream.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs b/datafusion/core/src/physical_plan/sorts/stream.rs index d61922481e47..f7b88f9f7c30 100644 --- a/datafusion/core/src/physical_plan/sorts/stream.rs +++ b/datafusion/core/src/physical_plan/sorts/stream.rs @@ -19,6 +19,7 @@ use crate::physical_plan::sorts::builder::SortOrder; use crate::physical_plan::sorts::cursor::{Cursor, FieldArray, FieldCursor, RowCursor}; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{PhysicalExpr, PhysicalSortExpr}; +use ahash::RandomState; use arrow::array::Array; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; @@ -286,7 +287,7 @@ impl std::fmt::Debug for OffsetCursorStream { pub struct BatchTrackingStream { /// Write once, read many [`RecordBatch`]s - batches: HashMap>, + batches: HashMap, RandomState>, /// Input streams yielding [`Cursor`]s and [`RecordBatch`]es streams: BatchCursorStream, /// Accounts for memory used by buffered batches @@ -296,7 +297,7 @@ pub struct BatchTrackingStream { impl BatchTrackingStream { pub fn new(streams: BatchCursorStream, reservation: MemoryReservation) -> Self { Self { - batches: HashMap::new(), + batches: HashMap::with_hasher(RandomState::new()), streams, reservation, } @@ -383,8 +384,8 @@ impl YieldedCursorStream { cursors: Vec<(C, Uuid, BatchOffset)>, sort_order: Vec, ) -> Result<()> { - let mut cursors_per_batch: HashMap<(Uuid, BatchOffset), C> = - HashMap::with_capacity(cursors.len()); + let mut cursors_per_batch: HashMap<(Uuid, BatchOffset), C, RandomState> = + HashMap::with_capacity_and_hasher(cursors.len(), RandomState::new()); for (cursor, batch_id, batch_offset) in cursors { cursors_per_batch.insert((batch_id, batch_offset), cursor); }