Skip to content

Commit

Permalink
supports bloom filter join (#532)
Browse files Browse the repository at this point in the history
fix decimal issue

supports spark351

Co-authored-by: zhangli20 <[email protected]>
  • Loading branch information
richox and zhangli20 authored Jul 30, 2024
1 parent 6f27604 commit c2cc15f
Show file tree
Hide file tree
Showing 29 changed files with 1,625 additions and 351 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tpcds-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
cd spark-bin-${{ inputs.sparkver }} && tar -xf ../spark-*.tgz --strip-component=1
run-tpcds-test:
name: Run test ${{ inputs.querygroup }}
name: Run TPC-DS test ${{ matrix.query }}
needs: [build-validator, build-blaze-jar, setup-spark]
runs-on: ubuntu-latest
strategy:
Expand Down
7 changes: 7 additions & 0 deletions .github/workflows/tpcds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,10 @@ jobs:
with:
sparkver: spark333
sparkurl: https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz

test-spark351:
name: Test Spark351
uses: ./.github/workflows/tpcds-reusable.yml
with:
sparkver: spark351
sparkurl: https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ _You could either build Blaze in dev mode for debugging or in release mode to un
Blaze._

```shell
SHIM=spark333 # or spark303/spark324
SHIM=spark333 # or spark303/spark324/spark351
MODE=release # or pre
mvn package -P"${SHIM}" -P"${MODE}"
```
Expand Down
9 changes: 9 additions & 0 deletions native-engine/blaze-serde/proto/blaze.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ message PhysicalExprNode {

// RowNum
RowNumExprNode row_num_expr = 20100;

// BloomFilterMightContain
BloomFilterMightContainExprNode bloom_filter_might_contain_expr = 20200;
}
}

Expand All @@ -134,6 +137,7 @@ enum AggFunction {
COLLECT_SET = 6;
FIRST = 7;
FIRST_IGNORES_NULL = 8;
BLOOM_FILTER = 9;
BRICKHOUSE_COLLECT = 1000;
BRICKHOUSE_COMBINE_UNIQUE = 1001;
}
Expand Down Expand Up @@ -341,6 +345,11 @@ message StringContainsExprNode {
message RowNumExprNode {
}

message BloomFilterMightContainExprNode {
PhysicalExprNode bloom_filter_expr = 1;
PhysicalExprNode value_expr = 2;
}

message FilterExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalExprNode expr = 2;
Expand Down
10 changes: 9 additions & 1 deletion native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ use datafusion::{
};
use datafusion_ext_commons::downcast_any;
use datafusion_ext_exprs::{
cast::TryCastExpr, get_indexed_field::GetIndexedFieldExpr, get_map_value::GetMapValueExpr,
bloom_filter_might_contain::BloomFilterMightContainExpr, cast::TryCastExpr,
get_indexed_field::GetIndexedFieldExpr, get_map_value::GetMapValueExpr,
named_struct::NamedStructExpr, row_num::RowNumExpr,
spark_scalar_subquery_wrapper::SparkScalarSubqueryWrapperExpr,
spark_udf_wrapper::SparkUDFWrapperExpr, string_contains::StringContainsExpr,
Expand Down Expand Up @@ -619,6 +620,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
protobuf::AggFunction::FirstIgnoresNull => {
WindowFunction::Agg(AggFunction::FirstIgnoresNull)
}
protobuf::AggFunction::BloomFilter => {
WindowFunction::Agg(AggFunction::BloomFilter)
}
protobuf::AggFunction::BrickhouseCollect => {
WindowFunction::Agg(AggFunction::BrickhouseCollect)
}
Expand Down Expand Up @@ -1029,6 +1033,10 @@ fn try_parse_physical_expr(
Arc::new(StringContainsExpr::new(expr, e.infix.clone()))
}
ExprType::RowNumExpr(_) => Arc::new(RowNumExpr::default()),
ExprType::BloomFilterMightContainExpr(e) => Arc::new(BloomFilterMightContainExpr::new(
try_parse_physical_expr_box_required(&e.bloom_filter_expr, input_schema)?,
try_parse_physical_expr_box_required(&e.value_expr, input_schema)?,
)),
ExprType::ScAndExpr(e) => {
let l = try_parse_physical_expr_box_required(&e.left, input_schema)?;
let r = try_parse_physical_expr_box_required(&e.right, input_schema)?;
Expand Down
1 change: 1 addition & 0 deletions native-engine/blaze-serde/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl From<protobuf::AggFunction> for AggFunction {
protobuf::AggFunction::CollectSet => AggFunction::CollectSet,
protobuf::AggFunction::First => AggFunction::First,
protobuf::AggFunction::FirstIgnoresNull => AggFunction::FirstIgnoresNull,
protobuf::AggFunction::BloomFilter => AggFunction::BloomFilter,
protobuf::AggFunction::BrickhouseCollect => AggFunction::BrickhouseCollect,
protobuf::AggFunction::BrickhouseCombineUnique => AggFunction::BrickhouseCombineUnique,
}
Expand Down
1 change: 1 addition & 0 deletions native-engine/datafusion-ext-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async-trait = "0.1.81"
bitvec = "1.0.1"
blaze-jni-bridge = { workspace = true }
bigdecimal = "0.4.5"
byteorder = "1.5.0"
bytes = "1.6.1"
datafusion = { workspace = true }
futures = "0.3"
Expand Down
38 changes: 38 additions & 0 deletions native-engine/datafusion-ext-commons/src/hash/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod mur;
pub mod xxhash;

fn read32(data: &[u8], offset: usize) -> u32 {
let v = unsafe {
// safety: boundary check is done by caller
std::ptr::read_unaligned(data.as_ptr().add(offset) as *const u32)
};
if cfg!(target_endian = "big") {
return v.swap_bytes();
}
v
}

fn read64(data: &[u8], offset: usize) -> u64 {
let v = unsafe {
// safety: boundary check is done by caller
std::ptr::read_unaligned(data.as_ptr().add(offset) as *const u64)
};
if cfg!(target_endian = "big") {
return v.swap_bytes();
}
v
}
103 changes: 103 additions & 0 deletions native-engine/datafusion-ext-commons/src/hash/mur.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::hash::read32;

#[inline]
pub fn spark_compatible_murmur3_hash<T: AsRef<[u8]>>(data: T, seed: i32) -> i32 {
let data = data.as_ref();
let len = data.len();
let (data_aligned, data_trailing) = data.split_at(len - len % 4);

let mut h1 = hash_bytes_by_int(data_aligned, seed);
for &b in data_trailing {
let half_word = b as i8 as i32;
h1 = mix_h1(h1, mix_k1(half_word));
}
fmix(h1, len as i32)
}

#[inline]
pub fn spark_compatible_murmur3_hash_long(value: i64, seed: i32) -> i32 {
hash_long(value, seed)
}

#[inline]
fn mix_k1(mut k1: i32) -> i32 {
k1 *= 0xcc9e2d51u32 as i32;
k1 = k1.rotate_left(15);
k1 *= 0x1b873593u32 as i32;
k1
}

#[inline]
fn mix_h1(mut h1: i32, k1: i32) -> i32 {
h1 ^= k1;
h1 = h1.rotate_left(13);
h1 = h1 * 5 + 0xe6546b64u32 as i32;
h1
}

#[inline]
fn fmix(mut h1: i32, len: i32) -> i32 {
h1 ^= len;
h1 ^= ((h1 as u32) >> 16) as i32;
h1 *= 0x85ebca6bu32 as i32;
h1 ^= ((h1 as u32) >> 13) as i32;
h1 *= 0xc2b2ae35u32 as i32;
h1 ^= ((h1 as u32) >> 16) as i32;
h1
}

#[inline]
fn hash_bytes_by_int(data: &[u8], seed: i32) -> i32 {
// safety: data length must be aligned to 4 bytes
let mut h1 = seed;
for i in (0..data.len()).step_by(4) {
let half_word = read32(data, i) as i32;
h1 = mix_h1(h1, mix_k1(half_word));
}
h1
}

#[inline]
fn hash_long(input: i64, seed: i32) -> i32 {
let low = input as i32;
let high = (input >> 32) as i32;

let k1 = mix_k1(low);
let h1 = mix_h1(seed, k1);

let k1 = mix_k1(high);
let h1 = mix_h1(h1, k1);

fmix(h1, 8)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_murmur3() {
let _hashes = ["", "a", "ab", "abc", "abcd", "abcde"]
.into_iter()
.map(|s| spark_compatible_murmur3_hash(s.as_bytes(), 42))
.collect::<Vec<_>>();
let _expected = vec![
142593372, 1485273170, -97053317, 1322437556, -396302900, 814637928,
];
assert_eq!(_hashes, _expected)
}
}
Loading

0 comments on commit c2cc15f

Please sign in to comment.