Skip to content

Commit

Permalink
Implement From to allow into syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
jc4x4 committed Aug 17, 2024
1 parent 909ef10 commit d8d6e68
Showing 1 changed file with 35 additions and 37 deletions.
72 changes: 35 additions & 37 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,6 @@ pub struct LogicalPlanBuilder {
}

impl LogicalPlanBuilder {
// TODO: implement from
/// Create a builder from an existing plan
pub fn from(plan: LogicalPlan) -> Self {
Self {
plan: Arc::new(plan),
}
}

/// Create a builder from an existing plan
pub fn new(plan: LogicalPlan) -> Self {
Self {
Expand All @@ -126,7 +118,7 @@ impl LogicalPlanBuilder {
///
/// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
pub fn empty(produce_one_row: bool) -> Self {
Self::from(LogicalPlan::EmptyRelation(EmptyRelation {
Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
schema: DFSchemaRef::new(DFSchema::empty()),
}))
Expand Down Expand Up @@ -158,7 +150,7 @@ impl LogicalPlanBuilder {
// Ensure that the recursive term has the same field types as the static term
let coerced_recursive_term =
coerce_plan_expr_for_schema(&recursive_term, self.plan.schema())?;
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
Ok(Self::new(LogicalPlan::RecursiveQuery(RecursiveQuery {
name,
static_term: self.plan.clone(),
recursive_term: Arc::new(coerced_recursive_term),
Expand Down Expand Up @@ -238,7 +230,7 @@ impl LogicalPlanBuilder {
.collect::<Vec<_>>();
let dfschema = DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?;
let schema = DFSchemaRef::new(dfschema);
Ok(Self::from(LogicalPlan::Values(Values { schema, values })))
Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
}

/// Convert a table provider into a builder with a TableScan
Expand Down Expand Up @@ -289,7 +281,7 @@ impl LogicalPlanBuilder {
options: HashMap<String, String>,
partition_by: Vec<String>,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
Ok(Self::new(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
partition_by,
Expand All @@ -313,7 +305,7 @@ impl LogicalPlanBuilder {
WriteOp::InsertInto
};

Ok(Self::from(LogicalPlan::Dml(DmlStatement::new(
Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
table_name.into(),
table_schema,
op,
Expand All @@ -330,7 +322,7 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
TableScan::try_new(table_name, table_source, projection, filters, None)
.map(LogicalPlan::TableScan)
.map(Self::from)
.map(Self::new)
}

/// Wrap a plan in a window
Expand Down Expand Up @@ -375,7 +367,7 @@ impl LogicalPlanBuilder {
self,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
project(unwrap_arc(self.plan), expr).map(Self::from)
project(unwrap_arc(self.plan), expr).map(Self::new)
}

/// Select the given column indices
Expand All @@ -392,12 +384,12 @@ impl LogicalPlanBuilder {
let expr = normalize_col(expr.into(), &self.plan)?;
Filter::try_new(expr, self.plan)
.map(LogicalPlan::Filter)
.map(Self::from)
.map(Self::new)
}

/// Make a builder for a prepare logical plan from the builder's plan
pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
Ok(Self::from(LogicalPlan::Prepare(Prepare {
Ok(Self::new(LogicalPlan::Prepare(Prepare {
name,
data_types,
input: self.plan,
Expand All @@ -411,7 +403,7 @@ impl LogicalPlanBuilder {
/// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
/// if specified.
pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
Ok(Self::from(LogicalPlan::Limit(Limit {
Ok(Self::new(LogicalPlan::Limit(Limit {
skip,
fetch,
input: self.plan,
Expand All @@ -420,7 +412,7 @@ impl LogicalPlanBuilder {

/// Apply an alias
pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
subquery_alias((*self.plan).clone(), alias).map(Self::from)
subquery_alias(unwrap_arc(self.plan), alias).map(Self::new)
}

/// Add missing sort columns to all downstream projection
Expand Down Expand Up @@ -560,7 +552,7 @@ impl LogicalPlanBuilder {
})?;

if missing_cols.is_empty() {
return Ok(Self::from(LogicalPlan::Sort(Sort {
return Ok(Self::new(LogicalPlan::Sort(Sort {
expr: normalize_cols(exprs, &self.plan)?,
input: self.plan,
fetch: None,
Expand All @@ -581,27 +573,27 @@ impl LogicalPlanBuilder {

Projection::try_new(new_expr, Arc::new(sort_plan))
.map(LogicalPlan::Projection)
.map(Self::from)
.map(Self::new)
}

/// Apply a union, preserving duplicate rows
pub fn union(self, plan: LogicalPlan) -> Result<Self> {
union(unwrap_arc(self.plan), plan).map(Self::from)
union(unwrap_arc(self.plan), plan).map(Self::new)
}

/// Apply a union, removing duplicate rows
pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
let left_plan: LogicalPlan = unwrap_arc(self.plan);
let right_plan: LogicalPlan = plan;

Ok(Self::from(LogicalPlan::Distinct(Distinct::All(Arc::new(
Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
union(left_plan, right_plan)?,
)))))
}

/// Apply deduplication: Only distinct (different) values are returned)
pub fn distinct(self) -> Result<Self> {
Ok(Self::from(LogicalPlan::Distinct(Distinct::All(self.plan))))
Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
}

/// Project first values of the specified expression list according to the provided
Expand All @@ -612,7 +604,7 @@ impl LogicalPlanBuilder {
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Distinct(Distinct::On(
Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
))))
}
Expand Down Expand Up @@ -828,7 +820,7 @@ impl LogicalPlanBuilder {
let join_schema =
build_join_schema(self.plan.schema(), right.schema(), &join_type)?;

Ok(Self::from(LogicalPlan::Join(Join {
Ok(Self::new(LogicalPlan::Join(Join {
left: self.plan,
right: Arc::new(right),
on,
Expand Down Expand Up @@ -892,7 +884,7 @@ impl LogicalPlanBuilder {
DataFusionError::Internal("filters should not be None here".to_string())
})?)
} else {
Ok(Self::from(LogicalPlan::Join(Join {
Ok(Self::new(LogicalPlan::Join(Join {
left: self.plan,
right: Arc::new(right),
on: join_on,
Expand All @@ -909,7 +901,7 @@ impl LogicalPlanBuilder {
pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
let join_schema =
build_join_schema(self.plan.schema(), right.schema(), &JoinType::Inner)?;
Ok(Self::from(LogicalPlan::CrossJoin(CrossJoin {
Ok(Self::new(LogicalPlan::CrossJoin(CrossJoin {
left: self.plan,
right: Arc::new(right),
schema: DFSchemaRef::new(join_schema),
Expand All @@ -918,7 +910,7 @@ impl LogicalPlanBuilder {

/// Repartition
pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
Ok(Self::from(LogicalPlan::Repartition(Repartition {
Ok(Self::new(LogicalPlan::Repartition(Repartition {
input: self.plan,
partitioning_scheme,
})))
Expand All @@ -931,7 +923,7 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
let window_expr = normalize_cols(window_expr, &self.plan)?;
validate_unique_names("Windows", &window_expr)?;
Ok(Self::from(LogicalPlan::Window(Window::try_new(
Ok(Self::new(LogicalPlan::Window(Window::try_new(
window_expr,
self.plan,
)?)))
Expand All @@ -952,7 +944,7 @@ impl LogicalPlanBuilder {
add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?;
Aggregate::try_new(self.plan, group_expr, aggr_expr)
.map(LogicalPlan::Aggregate)
.map(Self::from)
.map(Self::new)
}

/// Create an expression to represent the explanation of the plan
Expand All @@ -966,7 +958,7 @@ impl LogicalPlanBuilder {
let schema = schema.to_dfschema_ref()?;

if analyze {
Ok(Self::from(LogicalPlan::Analyze(Analyze {
Ok(Self::new(LogicalPlan::Analyze(Analyze {
verbose,
input: self.plan,
schema,
Expand All @@ -975,7 +967,7 @@ impl LogicalPlanBuilder {
let stringified_plans =
vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];

Ok(Self::from(LogicalPlan::Explain(Explain {
Ok(Self::new(LogicalPlan::Explain(Explain {
verbose,
plan: self.plan,
stringified_plans,
Expand Down Expand Up @@ -1115,7 +1107,7 @@ impl LogicalPlanBuilder {
let join_schema =
build_join_schema(self.plan.schema(), right.schema(), &join_type)?;

Ok(Self::from(LogicalPlan::Join(Join {
Ok(Self::new(LogicalPlan::Join(Join {
left: self.plan,
right: Arc::new(right),
on: join_key_pairs,
Expand All @@ -1129,7 +1121,7 @@ impl LogicalPlanBuilder {

/// Unnest the given column.
pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
unnest(unwrap_arc(self.plan), vec![column.into()]).map(Self::from)
unnest(unwrap_arc(self.plan), vec![column.into()]).map(Self::new)
}

/// Unnest the given column given [`UnnestOptions`]
Expand All @@ -1139,7 +1131,7 @@ impl LogicalPlanBuilder {
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(unwrap_arc(self.plan), vec![column.into()], options)
.map(Self::from)
.map(Self::new)
}

/// Unnest the given columns with the given [`UnnestOptions`]
Expand All @@ -1148,7 +1140,13 @@ impl LogicalPlanBuilder {
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(unwrap_arc(self.plan), columns, options).map(Self::from)
unnest_with_options(unwrap_arc(self.plan), columns, options).map(Self::new)
}
}

impl From<LogicalPlan> for LogicalPlanBuilder {
fn from(plan: LogicalPlan) -> Self {
LogicalPlanBuilder::new(plan)
}
}

Expand Down

0 comments on commit d8d6e68

Please sign in to comment.