Skip to content

Commit

Permalink
add LogicalPlan Join and CrossJoin (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
iajoiner authored Mar 1, 2023
1 parent 8ec98a8 commit 6cd74a9
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 0 deletions.
8 changes: 8 additions & 0 deletions datafusion/tests/test_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
Aggregate,
Sort,
Analyze,
Join,
JoinType,
JoinConstraint,
CrossJoin,
Union,
Like,
ILike,
Expand Down Expand Up @@ -106,6 +110,10 @@ def test_class_module_is_datafusion():
Limit,
Filter,
Analyze,
Join,
JoinType,
JoinConstraint,
CrossJoin,
Union,
Like,
ILike,
Expand Down
6 changes: 6 additions & 0 deletions src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ pub mod bool_expr;
pub mod case;
pub mod cast;
pub mod column;
pub mod cross_join;
pub mod empty_relation;
pub mod exists;
pub mod filter;
pub mod grouping_set;
pub mod in_list;
pub mod in_subquery;
pub mod indexed_field;
pub mod join;
pub mod like;
pub mod limit;
pub mod literal;
Expand Down Expand Up @@ -267,6 +269,10 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_class::<sort::PySort>()?;
m.add_class::<analyze::PyAnalyze>()?;
m.add_class::<empty_relation::PyEmptyRelation>()?;
m.add_class::<join::PyJoin>()?;
m.add_class::<join::PyJoinType>()?;
m.add_class::<join::PyJoinConstraint>()?;
m.add_class::<cross_join::PyCrossJoin>()?;
m.add_class::<union::PyUnion>()?;
Ok(())
}
90 changes: 90 additions & 0 deletions src/expr/cross_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_expr::logical_plan::CrossJoin;
use pyo3::prelude::*;
use std::fmt::{self, Display, Formatter};

use crate::common::df_schema::PyDFSchema;
use crate::expr::logical_node::LogicalNode;
use crate::sql::logical::PyLogicalPlan;

#[pyclass(name = "CrossJoin", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyCrossJoin {
cross_join: CrossJoin,
}

impl From<CrossJoin> for PyCrossJoin {
fn from(cross_join: CrossJoin) -> PyCrossJoin {
PyCrossJoin { cross_join }
}
}

impl From<PyCrossJoin> for CrossJoin {
fn from(cross_join: PyCrossJoin) -> Self {
cross_join.cross_join
}
}

impl Display for PyCrossJoin {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"CrossJoin
\nLeft: {:?}
\nRight: {:?}
\nSchema: {:?}",
&self.cross_join.left, &self.cross_join.right, &self.cross_join.schema
)
}
}

#[pymethods]
impl PyCrossJoin {
/// Retrieves the left input `LogicalPlan` to this `CrossJoin` node
fn left(&self) -> PyResult<PyLogicalPlan> {
Ok(self.cross_join.left.as_ref().clone().into())
}

/// Retrieves the right input `LogicalPlan` to this `CrossJoin` node
fn right(&self) -> PyResult<PyLogicalPlan> {
Ok(self.cross_join.right.as_ref().clone().into())
}

/// Resulting Schema for this `CrossJoin` node instance
fn schema(&self) -> PyResult<PyDFSchema> {
Ok(self.cross_join.schema.as_ref().clone().into())
}

fn __repr__(&self) -> PyResult<String> {
Ok(format!("CrossJoin({})", self))
}

fn __name__(&self) -> PyResult<String> {
Ok("CrossJoin".to_string())
}
}

impl LogicalNode for PyCrossJoin {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![
PyLogicalPlan::from((*self.cross_join.left).clone()),
PyLogicalPlan::from((*self.cross_join.right).clone()),
]
}
}
181 changes: 181 additions & 0 deletions src/expr/join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_expr::logical_plan::{Join, JoinConstraint, JoinType};
use pyo3::prelude::*;
use std::fmt::{self, Display, Formatter};

use crate::common::df_schema::PyDFSchema;
use crate::expr::{logical_node::LogicalNode, PyExpr};
use crate::sql::logical::PyLogicalPlan;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[pyclass(name = "JoinType", module = "datafusion.expr")]
pub struct PyJoinType {
join_type: JoinType,
}

impl From<JoinType> for PyJoinType {
fn from(join_type: JoinType) -> PyJoinType {
PyJoinType { join_type }
}
}

impl From<PyJoinType> for JoinType {
fn from(join_type: PyJoinType) -> Self {
join_type.join_type
}
}

#[pymethods]
impl PyJoinType {
pub fn is_outer(&self) -> bool {
self.join_type.is_outer()
}
}

impl Display for PyJoinType {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.join_type)
}
}

#[derive(Debug, Clone, Copy)]
#[pyclass(name = "JoinConstraint", module = "datafusion.expr")]
pub struct PyJoinConstraint {
join_constraint: JoinConstraint,
}

impl From<JoinConstraint> for PyJoinConstraint {
fn from(join_constraint: JoinConstraint) -> PyJoinConstraint {
PyJoinConstraint { join_constraint }
}
}

impl From<PyJoinConstraint> for JoinConstraint {
fn from(join_constraint: PyJoinConstraint) -> Self {
join_constraint.join_constraint
}
}

#[pyclass(name = "Join", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyJoin {
join: Join,
}

impl From<Join> for PyJoin {
fn from(join: Join) -> PyJoin {
PyJoin { join }
}
}

impl From<PyJoin> for Join {
fn from(join: PyJoin) -> Self {
join.join
}
}

impl Display for PyJoin {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"Join
\nLeft: {:?}
\nRight: {:?}
\nOn: {:?}
\nFilter: {:?}
\nJoinType: {:?}
\nJoinConstraint: {:?}
\nSchema: {:?}
\nNullEqualsNull: {:?}",
&self.join.left,
&self.join.right,
&self.join.on,
&self.join.filter,
&self.join.join_type,
&self.join.join_constraint,
&self.join.schema,
&self.join.null_equals_null,
)
}
}

#[pymethods]
impl PyJoin {
/// Retrieves the left input `LogicalPlan` to this `Join` node
fn left(&self) -> PyResult<PyLogicalPlan> {
Ok(self.join.left.as_ref().clone().into())
}

/// Retrieves the right input `LogicalPlan` to this `Join` node
fn right(&self) -> PyResult<PyLogicalPlan> {
Ok(self.join.right.as_ref().clone().into())
}

/// Retrieves the right input `LogicalPlan` to this `Join` node
fn on(&self) -> PyResult<Vec<(PyExpr, PyExpr)>> {
Ok(self
.join
.on
.iter()
.map(|(l, r)| (PyExpr::from(l.clone()), PyExpr::from(r.clone())))
.collect())
}

/// Retrieves the filter `Option<PyExpr>` of this `Join` node
fn filter(&self) -> PyResult<Option<PyExpr>> {
Ok(self.join.filter.clone().map(Into::into))
}

/// Retrieves the `JoinType` to this `Join` node
fn join_type(&self) -> PyResult<PyJoinType> {
Ok(self.join.join_type.into())
}

/// Retrieves the `JoinConstraint` to this `Join` node
fn join_constraint(&self) -> PyResult<PyJoinConstraint> {
Ok(self.join.join_constraint.into())
}

/// Resulting Schema for this `Join` node instance
fn schema(&self) -> PyResult<PyDFSchema> {
Ok(self.join.schema.as_ref().clone().into())
}

/// If null_equals_null is true, null == null else null != null
fn null_equals_null(&self) -> PyResult<bool> {
Ok(self.join.null_equals_null)
}

fn __repr__(&self) -> PyResult<String> {
Ok(format!("Join({})", self))
}

fn __name__(&self) -> PyResult<String> {
Ok("Join".to_string())
}
}

impl LogicalNode for PyJoin {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![
PyLogicalPlan::from((*self.join.left).clone()),
PyLogicalPlan::from((*self.join.right).clone()),
]
}
}
1 change: 1 addition & 0 deletions src/expr/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use datafusion_expr::{TypeSignature, Volatility};
use pyo3::prelude::*;

#[allow(dead_code)]
#[pyclass(name = "Signature", module = "datafusion.expr", subclass)]
#[allow(dead_code)]
#[derive(Clone)]
Expand Down

0 comments on commit 6cd74a9

Please sign in to comment.