Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Expose some more information in translated expression IR to python #17209

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 39 additions & 22 deletions py-polars/src/lazyframe/visitor/expr_nodes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use polars::datatypes::TimeUnit;
use polars_core::prelude::{NonExistent, QuantileInterpolOptions};
use polars_core::series::IsSorted;
use polars_core::utils::arrow::legacy::kernels::NonExistent;
use polars_ops::prelude::ClosedInterval;
use polars_plan::dsl::function_expr::rolling::RollingFunction;
use polars_plan::dsl::function_expr::rolling_by::RollingFunctionBy;
Expand Down Expand Up @@ -321,7 +321,7 @@ pub struct Agg {
#[pyo3(get)]
name: PyObject,
#[pyo3(get)]
arguments: usize,
arguments: Vec<usize>,
#[pyo3(get)]
// Arbitrary control options
options: PyObject,
Expand Down Expand Up @@ -635,67 +635,86 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult<PyObject> {
propagate_nans,
} => Agg {
name: "min".to_object(py),
arguments: input.0,
arguments: vec![input.0],
wence- marked this conversation as resolved.
Show resolved Hide resolved
options: propagate_nans.to_object(py),
},
IRAggExpr::Max {
input,
propagate_nans,
} => Agg {
name: "max".to_object(py),
arguments: input.0,
arguments: vec![input.0],
options: propagate_nans.to_object(py),
},
IRAggExpr::Median(n) => Agg {
name: "median".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: py.None(),
},
IRAggExpr::NUnique(n) => Agg {
name: "n_unique".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: py.None(),
},
IRAggExpr::First(n) => Agg {
name: "first".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: py.None(),
},
IRAggExpr::Last(n) => Agg {
name: "last".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: py.None(),
},
IRAggExpr::Mean(n) => Agg {
name: "mean".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: py.None(),
},
IRAggExpr::Implode(_) => return Err(PyNotImplementedError::new_err("implode")),
IRAggExpr::Quantile { .. } => return Err(PyNotImplementedError::new_err("quantile")),
IRAggExpr::Implode(n) => Agg {
name: "implode".to_object(py),
arguments: vec![n.0],
options: py.None(),
},
IRAggExpr::Quantile {
expr,
quantile,
interpol,
} => Agg {
name: "quantile".to_object(py),
arguments: vec![expr.0, quantile.0],
options: match interpol {
QuantileInterpolOptions::Nearest => "nearest",
QuantileInterpolOptions::Lower => "lower",
QuantileInterpolOptions::Higher => "higher",
QuantileInterpolOptions::Midpoint => "midpoint",
QuantileInterpolOptions::Linear => "linear",
}
.to_object(py),
},
IRAggExpr::Sum(n) => Agg {
name: "sum".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: py.None(),
},
IRAggExpr::Count(n, include_null) => Agg {
name: "count".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: include_null.to_object(py),
},
IRAggExpr::Std(n, ddof) => Agg {
name: "std".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: ddof.to_object(py),
},
IRAggExpr::Var(n, ddof) => Agg {
name: "var".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: ddof.to_object(py),
},
IRAggExpr::AggGroups(n) => Agg {
name: "agg_groups".to_object(py),
arguments: n.0,
arguments: vec![n.0],
options: py.None(),
},
}
Expand Down Expand Up @@ -1040,7 +1059,7 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult<PyObject> {
.to_object(py),
FunctionExpr::Atan2 => ("atan2",).to_object(py),
FunctionExpr::Sign => ("sign",).to_object(py),
FunctionExpr::FillNull => return Err(PyNotImplementedError::new_err("fill null")),
FunctionExpr::FillNull => ("fill_null",).to_object(py),
FunctionExpr::RollingExpr(rolling) => match rolling {
RollingFunction::Min(_) => {
return Err(PyNotImplementedError::new_err("rolling min"))
Expand Down Expand Up @@ -1104,7 +1123,7 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult<PyObject> {
FunctionExpr::Reshape(_, _) => {
return Err(PyNotImplementedError::new_err("reshape"))
},
FunctionExpr::RepeatBy => return Err(PyNotImplementedError::new_err("repeat by")),
FunctionExpr::RepeatBy => ("repeat_by",).to_object(py),
FunctionExpr::ArgUnique => ("argunique",).to_object(py),
FunctionExpr::Rank {
options: _,
Expand All @@ -1115,7 +1134,7 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult<PyObject> {
has_max: _,
} => return Err(PyNotImplementedError::new_err("clip")),
FunctionExpr::AsStruct => return Err(PyNotImplementedError::new_err("as struct")),
FunctionExpr::TopK { .. } => return Err(PyNotImplementedError::new_err("top k")),
FunctionExpr::TopK { descending } => ("top_k", descending).to_object(py),
FunctionExpr::CumCount { reverse } => ("cumcount", reverse).to_object(py),
FunctionExpr::CumSum { reverse } => ("cumsum", reverse).to_object(py),
FunctionExpr::CumProd { reverse } => ("cumprod", reverse).to_object(py),
Expand Down Expand Up @@ -1238,9 +1257,7 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult<PyObject> {
FunctionExpr::Business(_) => {
return Err(PyNotImplementedError::new_err("business"))
},
FunctionExpr::TopKBy { .. } => {
return Err(PyNotImplementedError::new_err("top_k_by"))
},
FunctionExpr::TopKBy { descending } => ("top_k_by", descending).to_object(py),
FunctionExpr::EwmMeanBy { half_life: _ } => {
return Err(PyNotImplementedError::new_err("ewm_mean_by"))
},
Expand Down
42 changes: 37 additions & 5 deletions py-polars/src/lazyframe/visitor/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use polars_core::prelude::{IdxSize, UniqueKeepStrategy};
use polars_ops::prelude::JoinType;
use polars_plan::plans::IR;
use polars_plan::prelude::{FileCount, FileScan, FileScanOptions, FunctionNode};
use pyo3::exceptions::PyNotImplementedError;
use pyo3::exceptions::{PyNotImplementedError, PyValueError};
use pyo3::prelude::*;

use super::super::visit::PyExprIR;
Expand Down Expand Up @@ -291,6 +291,14 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
predicate: predicate.into(),
}
.into_py(py),
IR::Scan {
hive_parts: Some(_),
..
} => {
return Err(PyNotImplementedError::new_err(
"scan with hive partitioning",
))
Comment on lines +298 to +300
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously we were silently discarding this information.

},
IR::Scan {
paths,
file_info: _,
Expand All @@ -308,11 +316,35 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
inner: file_options.clone(),
},
scan_type: match scan_type {
// TODO: Actually send options through since those are important for correct reads
FileScan::Csv { .. } => "csv".into_py(py),
FileScan::Parquet { .. } => "parquet".into_py(py),
FileScan::Csv {
options,
cloud_options,
} => {
// Since these options structs are serializable,
// we just use the serde json representation
let options = serde_json::to_string(options)
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
Comment on lines +325 to +326
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only enabled when the json feature is active (which it is in release builds I believe). I can gate and return Err in the case that it is not active (only the case for dev builds when trying to speed up compile cycles?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is always enabled for release builds. I think this is fine. 👍

let cloud_options = serde_json::to_string(cloud_options)
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
("csv", options, cloud_options).into_py(py)
},
FileScan::Parquet {
options,
cloud_options,
..
} => {
let options = serde_json::to_string(options)
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
let cloud_options = serde_json::to_string(cloud_options)
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
("parquet", options, cloud_options).into_py(py)
},
FileScan::Ipc { .. } => return Err(PyNotImplementedError::new_err("ipc scan")),
FileScan::NDJson { .. } => return Err(PyNotImplementedError::new_err("ipc scan")),
FileScan::NDJson { options } => {
let options = serde_json::to_string(options)
.map_err(|err| PyValueError::new_err(format!("{err:?}")))?;
("ndjson", options).into_py(py)
},
FileScan::Anonymous { .. } => {
return Err(PyNotImplementedError::new_err("anonymous scan"))
},
Expand Down