diff --git a/py-polars/src/lazyframe/visitor/expr_nodes.rs b/py-polars/src/lazyframe/visitor/expr_nodes.rs index 691be602ee72..a9fef57f5ca5 100644 --- a/py-polars/src/lazyframe/visitor/expr_nodes.rs +++ b/py-polars/src/lazyframe/visitor/expr_nodes.rs @@ -1,6 +1,6 @@ use polars::datatypes::TimeUnit; +use polars_core::prelude::{NonExistent, QuantileInterpolOptions}; use polars_core::series::IsSorted; -use polars_core::utils::arrow::legacy::kernels::NonExistent; use polars_ops::prelude::ClosedInterval; use polars_plan::dsl::function_expr::rolling::RollingFunction; use polars_plan::dsl::function_expr::rolling_by::RollingFunctionBy; @@ -321,7 +321,7 @@ pub struct Agg { #[pyo3(get)] name: PyObject, #[pyo3(get)] - arguments: usize, + arguments: Vec, #[pyo3(get)] // Arbitrary control options options: PyObject, @@ -635,7 +635,7 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult { propagate_nans, } => Agg { name: "min".to_object(py), - arguments: input.0, + arguments: vec![input.0], options: propagate_nans.to_object(py), }, IRAggExpr::Max { @@ -643,59 +643,78 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult { propagate_nans, } => Agg { name: "max".to_object(py), - arguments: input.0, + arguments: vec![input.0], options: propagate_nans.to_object(py), }, IRAggExpr::Median(n) => Agg { name: "median".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: py.None(), }, IRAggExpr::NUnique(n) => Agg { name: "n_unique".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: py.None(), }, IRAggExpr::First(n) => Agg { name: "first".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: py.None(), }, IRAggExpr::Last(n) => Agg { name: "last".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: py.None(), }, IRAggExpr::Mean(n) => Agg { name: "mean".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: py.None(), }, - IRAggExpr::Implode(_) => return Err(PyNotImplementedError::new_err("implode")), - IRAggExpr::Quantile { .. } => return Err(PyNotImplementedError::new_err("quantile")), + IRAggExpr::Implode(n) => Agg { + name: "implode".to_object(py), + arguments: vec![n.0], + options: py.None(), + }, + IRAggExpr::Quantile { + expr, + quantile, + interpol, + } => Agg { + name: "quantile".to_object(py), + arguments: vec![expr.0, quantile.0], + options: match interpol { + QuantileInterpolOptions::Nearest => "nearest", + QuantileInterpolOptions::Lower => "lower", + QuantileInterpolOptions::Higher => "higher", + QuantileInterpolOptions::Midpoint => "midpoint", + QuantileInterpolOptions::Linear => "linear", + } + .to_object(py), + }, IRAggExpr::Sum(n) => Agg { name: "sum".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: py.None(), }, IRAggExpr::Count(n, include_null) => Agg { name: "count".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: include_null.to_object(py), }, IRAggExpr::Std(n, ddof) => Agg { name: "std".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: ddof.to_object(py), }, IRAggExpr::Var(n, ddof) => Agg { name: "var".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: ddof.to_object(py), }, IRAggExpr::AggGroups(n) => Agg { name: "agg_groups".to_object(py), - arguments: n.0, + arguments: vec![n.0], options: py.None(), }, } @@ -1040,7 +1059,7 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult { .to_object(py), FunctionExpr::Atan2 => ("atan2",).to_object(py), FunctionExpr::Sign => ("sign",).to_object(py), - FunctionExpr::FillNull => return Err(PyNotImplementedError::new_err("fill null")), + FunctionExpr::FillNull => ("fill_null",).to_object(py), FunctionExpr::RollingExpr(rolling) => match rolling { RollingFunction::Min(_) => { return Err(PyNotImplementedError::new_err("rolling min")) @@ -1104,7 +1123,7 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult { FunctionExpr::Reshape(_, _) => { return Err(PyNotImplementedError::new_err("reshape")) }, - FunctionExpr::RepeatBy => return Err(PyNotImplementedError::new_err("repeat by")), + FunctionExpr::RepeatBy => ("repeat_by",).to_object(py), FunctionExpr::ArgUnique => ("argunique",).to_object(py), FunctionExpr::Rank { options: _, @@ -1115,7 +1134,7 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult { has_max: _, } => return Err(PyNotImplementedError::new_err("clip")), FunctionExpr::AsStruct => return Err(PyNotImplementedError::new_err("as struct")), - FunctionExpr::TopK { .. } => return Err(PyNotImplementedError::new_err("top k")), + FunctionExpr::TopK { descending } => ("top_k", descending).to_object(py), FunctionExpr::CumCount { reverse } => ("cumcount", reverse).to_object(py), FunctionExpr::CumSum { reverse } => ("cumsum", reverse).to_object(py), FunctionExpr::CumProd { reverse } => ("cumprod", reverse).to_object(py), @@ -1238,9 +1257,7 @@ pub(crate) fn into_py(py: Python<'_>, expr: &AExpr) -> PyResult { FunctionExpr::Business(_) => { return Err(PyNotImplementedError::new_err("business")) }, - FunctionExpr::TopKBy { .. } => { - return Err(PyNotImplementedError::new_err("top_k_by")) - }, + FunctionExpr::TopKBy { descending } => ("top_k_by", descending).to_object(py), FunctionExpr::EwmMeanBy { half_life: _ } => { return Err(PyNotImplementedError::new_err("ewm_mean_by")) }, diff --git a/py-polars/src/lazyframe/visitor/nodes.rs b/py-polars/src/lazyframe/visitor/nodes.rs index c206e59254d3..a706b3e253d2 100644 --- a/py-polars/src/lazyframe/visitor/nodes.rs +++ b/py-polars/src/lazyframe/visitor/nodes.rs @@ -2,7 +2,7 @@ use polars_core::prelude::{IdxSize, UniqueKeepStrategy}; use polars_ops::prelude::JoinType; use polars_plan::plans::IR; use polars_plan::prelude::{FileCount, FileScan, FileScanOptions, FunctionNode}; -use pyo3::exceptions::PyNotImplementedError; +use pyo3::exceptions::{PyNotImplementedError, PyValueError}; use pyo3::prelude::*; use super::super::visit::PyExprIR; @@ -291,6 +291,14 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult { predicate: predicate.into(), } .into_py(py), + IR::Scan { + hive_parts: Some(_), + .. + } => { + return Err(PyNotImplementedError::new_err( + "scan with hive partitioning", + )) + }, IR::Scan { paths, file_info: _, @@ -308,11 +316,35 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult { inner: file_options.clone(), }, scan_type: match scan_type { - // TODO: Actually send options through since those are important for correct reads - FileScan::Csv { .. } => "csv".into_py(py), - FileScan::Parquet { .. } => "parquet".into_py(py), + FileScan::Csv { + options, + cloud_options, + } => { + // Since these options structs are serializable, + // we just use the serde json representation + let options = serde_json::to_string(options) + .map_err(|err| PyValueError::new_err(format!("{err:?}")))?; + let cloud_options = serde_json::to_string(cloud_options) + .map_err(|err| PyValueError::new_err(format!("{err:?}")))?; + ("csv", options, cloud_options).into_py(py) + }, + FileScan::Parquet { + options, + cloud_options, + .. + } => { + let options = serde_json::to_string(options) + .map_err(|err| PyValueError::new_err(format!("{err:?}")))?; + let cloud_options = serde_json::to_string(cloud_options) + .map_err(|err| PyValueError::new_err(format!("{err:?}")))?; + ("parquet", options, cloud_options).into_py(py) + }, FileScan::Ipc { .. } => return Err(PyNotImplementedError::new_err("ipc scan")), - FileScan::NDJson { .. } => return Err(PyNotImplementedError::new_err("ipc scan")), + FileScan::NDJson { options } => { + let options = serde_json::to_string(options) + .map_err(|err| PyValueError::new_err(format!("{err:?}")))?; + ("ndjson", options).into_py(py) + }, FileScan::Anonymous { .. } => { return Err(PyNotImplementedError::new_err("anonymous scan")) },