diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index d9c1e85a8fd2..1911fa98b02f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -86,7 +86,7 @@ struct JoinLeftData { hash_map: JoinHashMap, /// The input rows for the build side batch: RecordBatch, - /// The build side expressions values + /// The build side on expressions values values: Vec, /// Shared bitmap builder for visited left indices visited_indices_bitmap: Mutex, @@ -1152,6 +1152,8 @@ impl HashJoinStreamState { struct ProcessProbeBatchState { /// Current probe-side batch batch: RecordBatch, + /// Probe-side on expressions values + values: Vec, /// Starting offset for JoinHashMap lookups offset: JoinHashMapOffset, /// Max joined probe-side index from current batch @@ -1264,18 +1266,12 @@ impl RecordBatchStream for HashJoinStream { fn lookup_join_hashmap( build_hashmap: &JoinHashMap, build_side_values: &[ArrayRef], - probe_batch: &RecordBatch, - probe_on: &[PhysicalExprRef], + probe_side_values: &[ArrayRef], null_equals_null: bool, hashes_buffer: &[u64], limit: usize, offset: JoinHashMapOffset, ) -> Result<(UInt64Array, UInt32Array, Option)> { - let keys_values = probe_on - .iter() - .map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())) - .collect::>>()?; - let (probe_indices, build_indices, next_offset) = build_hashmap .get_matched_indices_with_limit_offset(hashes_buffer, None, limit, offset); @@ -1286,7 +1282,7 @@ fn lookup_join_hashmap( &build_indices, &probe_indices, build_side_values, - &keys_values, + probe_side_values, null_equals_null, )?; @@ -1444,6 +1440,7 @@ impl HashJoinStream { self.state = HashJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { batch, + values: keys_values, offset: (0, None), joined_probe_idx: None, }); @@ -1469,8 +1466,7 @@ impl HashJoinStream { let (left_indices, right_indices, next_offset) = lookup_join_hashmap( build_side.left_data.hash_map(), build_side.left_data.values(), - &state.batch, - &self.on_right, + &state.values, self.null_equals_null, &self.hashes_buffer, self.batch_size, @@ -3314,13 +3310,16 @@ mod tests { let right_keys_values = key_column.evaluate(&right)?.into_array(right.num_rows())?; let mut hashes_buffer = vec![0; right.num_rows()]; - create_hashes(&[right_keys_values], &random_state, &mut hashes_buffer)?; + create_hashes( + &[Arc::clone(&right_keys_values)], + &random_state, + &mut hashes_buffer, + )?; let (l, r, _) = lookup_join_hashmap( &join_hash_map, &[left_keys_values], - &right, - &[key_column], + &[right_keys_values], false, &hashes_buffer, 8192,