diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 8886a3ac5588..5e0d8759a7d4 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -669,6 +669,7 @@ impl ExecutionPlan for HashJoinExec { state: HashJoinStreamState::WaitBuildSide, build_side: BuildSide::Initial(BuildSideInitialState { left_fut }), batch_size, + hashes_buffer: vec![], })) } @@ -979,6 +980,8 @@ struct HashJoinStream { build_side: BuildSide, /// Maximum output batch size batch_size: usize, + /// Scratch space for computing hashes + hashes_buffer: Vec, } impl RecordBatchStream for HashJoinStream { @@ -1044,6 +1047,7 @@ fn lookup_join_hashmap( probe_on: &[Column], random_state: &RandomState, null_equals_null: bool, + hashes_buffer: &mut Vec, limit: usize, offset: JoinHashMapOffset, ) -> Result<(UInt64Array, UInt32Array, Option)> { @@ -1059,8 +1063,9 @@ fn lookup_join_hashmap( }) .collect::>>()?; - let mut hashes_buffer = vec![0; probe_batch.num_rows()]; - let hash_values = create_hashes(&keys_values, random_state, &mut hashes_buffer)?; + hashes_buffer.clear(); + hashes_buffer.resize(probe_batch.num_rows(), 0); + let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; let (mut probe_builder, mut build_builder, next_offset) = build_hashmap .get_matched_indices_with_limit_offset( @@ -1263,6 +1268,7 @@ impl HashJoinStream { &self.on_right, &self.random_state, self.null_equals_null, + &mut self.hashes_buffer, self.batch_size, state.offset, )?; @@ -2930,6 +2936,7 @@ mod tests { ); let join_hash_map = JoinHashMap::new(hashmap_left, next); + let mut hashes_buffer = vec![0]; let (l, r, _) = lookup_join_hashmap( &join_hash_map, @@ -2939,6 +2946,7 @@ mod tests { &[Column::new("a", 0)], &random_state, false, + &mut hashes_buffer, 8192, (0, None), )?;