Skip to content

Commit

Permalink
[FEAT] Outer joins for native executor (#2860)
Browse files Browse the repository at this point in the history
Implement outer joins for Swordfish.

(Yes, this PR is a little big. But: 
1. at least tests run in CI now, so you don't need to just take my word
for it now.
2. A lot of the diff is because I moved left/right joins to be together
with the outer join operator. Therefore the HashJoinProbe operator is
now just InnerHashJoinProbeOperator)

Outer join probes (and left/right now) are implemented as a Streaming
Sink.
- During the `execute` phase of the streaming sink, probing is done
concurrently via workers (this is the same implementation as all the
other join types). The only difference is that during probing, workers
will save the indices on the left side that have matches (using a
mutable bitmap).
- During the `finalize` phase, we merge together all the bitmaps across
the concurrent workers (via a bitwise OR) to get a global view of all
the indices that had matches. Then, we take all the indices that
_**didn't**_ get a match and return them (with nulls for the right
side). This is the same logic we currently use for the python runner.
- Why is left/right with outer joins now? In the future, we may want to
choose the build side for left/right/outer joins based on cardinality.
This means that we may need the `used_indices` bitmaps for left/right
joins as well.

Note: I had to make Streaming Sink concurrency-aware to allow this. The
changes in particular are:
- Streaming Sinks can specify `max concurrency`, currently only LIMIT
will have this set to 1.
- `execute` accepts some `mut state` and finalize will consolidate all
of the state, i.e. `Vec<Box<dyn State>>`.
- In order to make sure that all the workers are done, they are spawned
on a Worker Set, and return their state when done. This ensures that the
`finalize` method doesn't get called before the workers are done with
the `executes`.

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Oct 22, 2024
1 parent 4a8244b commit b9b2d72
Show file tree
Hide file tree
Showing 17 changed files with 943 additions and 414 deletions.
5 changes: 1 addition & 4 deletions src/arrow2/src/array/growable/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::sync::Arc;

use crate::{
array::{Array, PrimitiveArray},
bitmap::MutableBitmap,
datatypes::DataType,
types::NativeType,
array::{Array, PrimitiveArray}, bitmap::MutableBitmap, datatypes::DataType, types::NativeType
};

use super::{
Expand Down
2 changes: 2 additions & 0 deletions src/daft-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//!
//! This module re-exports commonly used items from the Daft core library.
// Re-export arrow2 bitmap
pub use arrow2::bitmap;
// Re-export core series structures
pub use daft_schema::schema::{Schema, SchemaRef};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_core::prelude::SchemaRef;
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;
use daft_plan::JoinType;
Expand Down Expand Up @@ -43,14 +44,18 @@ impl IntermediateOperatorState for AntiSemiProbeState {

pub struct AntiSemiProbeOperator {
probe_on: Vec<ExprRef>,
join_type: JoinType,
is_semi: bool,
output_schema: SchemaRef,
}

impl AntiSemiProbeOperator {
pub fn new(probe_on: Vec<ExprRef>, join_type: JoinType) -> Self {
const DEFAULT_GROWABLE_SIZE: usize = 20;

pub fn new(probe_on: Vec<ExprRef>, join_type: &JoinType, output_schema: &SchemaRef) -> Self {
Self {
probe_on,
join_type,
is_semi: *join_type == JoinType::Semi,
output_schema: output_schema.clone(),
}
}

Expand All @@ -65,8 +70,11 @@ impl AntiSemiProbeOperator {

let input_tables = input.get_tables()?;

let mut probe_side_growable =
GrowableTable::new(&input_tables.iter().collect::<Vec<_>>(), false, 20)?;
let mut probe_side_growable = GrowableTable::new(
&input_tables.iter().collect::<Vec<_>>(),
false,
Self::DEFAULT_GROWABLE_SIZE,
)?;

drop(_growables);
{
Expand All @@ -76,7 +84,7 @@ impl AntiSemiProbeOperator {
let iter = probe_set.probe_exists(&join_keys)?;

for (probe_row_idx, matched) in iter.enumerate() {
match (self.join_type == JoinType::Semi, matched) {
match (self.is_semi, matched) {
(true, true) | (false, false) => {
probe_side_growable.extend(probe_side_table_idx, probe_row_idx, 1);
}
Expand Down Expand Up @@ -109,15 +117,16 @@ impl IntermediateOperator for AntiSemiProbeOperator {
.expect("AntiSemiProbeOperator state should be AntiSemiProbeState");

if idx == 0 {
let (probe_table, _) = input.as_probe_table();
state.set_table(probe_table);
let probe_state = input.as_probe_state();
state.set_table(probe_state.get_probeable());
Ok(IntermediateOperatorResult::NeedMoreInput(None))
} else {
let input = input.as_data();
let out = match self.join_type {
JoinType::Semi | JoinType::Anti => self.probe_anti_semi(input, state),
_ => unreachable!("Only Semi and Anti joins are supported"),
}?;
if input.is_empty() {
let empty = Arc::new(MicroPartition::empty(Some(self.output_schema.clone())));
return Ok(IntermediateOperatorResult::NeedMoreInput(Some(empty)));
}
let out = self.probe_anti_semi(input, state)?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(out)))
}
}
Expand Down
268 changes: 0 additions & 268 deletions src/daft-local-execution/src/intermediate_ops/hash_join_probe.rs

This file was deleted.

Loading

0 comments on commit b9b2d72

Please sign in to comment.