Skip to content

Commit

Permalink
Reuse probe side on expressions values
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 15, 2025
1 parent 3d2d158 commit 2444048
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct JoinLeftData {
hash_map: JoinHashMap,
/// The input rows for the build side
batch: RecordBatch,
/// The build side expressions values
/// The build side on expressions values
values: Vec<ArrayRef>,
/// Shared bitmap builder for visited left indices
visited_indices_bitmap: Mutex<BooleanBufferBuilder>,
Expand Down Expand Up @@ -1152,6 +1152,8 @@ impl HashJoinStreamState {
struct ProcessProbeBatchState {
/// Current probe-side batch
batch: RecordBatch,
/// Probe-side on expressions values
values: Vec<ArrayRef>,
/// Starting offset for JoinHashMap lookups
offset: JoinHashMapOffset,
/// Max joined probe-side index from current batch
Expand Down Expand Up @@ -1264,18 +1266,12 @@ impl RecordBatchStream for HashJoinStream {
fn lookup_join_hashmap(
build_hashmap: &JoinHashMap,
build_side_values: &[ArrayRef],
probe_batch: &RecordBatch,
probe_on: &[PhysicalExprRef],
probe_side_values: &[ArrayRef],
null_equals_null: bool,
hashes_buffer: &[u64],
limit: usize,
offset: JoinHashMapOffset,
) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
let keys_values = probe_on
.iter()
.map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
.collect::<Result<Vec<_>>>()?;

let (probe_indices, build_indices, next_offset) = build_hashmap
.get_matched_indices_with_limit_offset(hashes_buffer, None, limit, offset);

Expand All @@ -1286,7 +1282,7 @@ fn lookup_join_hashmap(
&build_indices,
&probe_indices,
build_side_values,
&keys_values,
probe_side_values,
null_equals_null,
)?;

Expand Down Expand Up @@ -1444,6 +1440,7 @@ impl HashJoinStream {
self.state =
HashJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState {
batch,
values: keys_values,
offset: (0, None),
joined_probe_idx: None,
});
Expand All @@ -1469,8 +1466,7 @@ impl HashJoinStream {
let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
build_side.left_data.hash_map(),
build_side.left_data.values(),
&state.batch,
&self.on_right,
&state.values,
self.null_equals_null,
&self.hashes_buffer,
self.batch_size,
Expand Down Expand Up @@ -3314,13 +3310,16 @@ mod tests {
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes(&[right_keys_values], &random_state, &mut hashes_buffer)?;
create_hashes(
&[Arc::clone(&right_keys_values)],
&random_state,
&mut hashes_buffer,
)?;

let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&[left_keys_values],
&right,
&[key_column],
&[right_keys_values],
false,
&hashes_buffer,
8192,
Expand Down

0 comments on commit 2444048

Please sign in to comment.