Skip to content

Commit

Permalink
Upgrading to rs-0.45
Browse files Browse the repository at this point in the history
  • Loading branch information
Bidek56 committed Dec 9, 2024
1 parent fe18e7a commit 47b0525
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 77 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ napi = { version = "2.16.13", default-features = false, features = [
"serde-json",
] }
napi-derive = { version = "2.16.13", default-features = false }
polars-core = { git = "https://github.com/pola-rs/polars.git", rev = "2dce3d3b5c80ae7522a3435f844fac8fed9dc9e8", default-features = false }
polars-io = { git = "https://github.com/pola-rs/polars.git", rev = "2dce3d3b5c80ae7522a3435f844fac8fed9dc9e8", default-features = false }
polars-lazy = { git = "https://github.com/pola-rs/polars.git", rev = "2dce3d3b5c80ae7522a3435f844fac8fed9dc9e8", default-features = false }
polars-core = { git = "https://github.com/pola-rs/polars.git", rev = "58a38af21dccaf3326514494a1db118601c8c2ca", default-features = false }
polars-io = { git = "https://github.com/pola-rs/polars.git", rev = "58a38af21dccaf3326514494a1db118601c8c2ca", default-features = false }
polars-lazy = { git = "https://github.com/pola-rs/polars.git", rev = "58a38af21dccaf3326514494a1db118601c8c2ca", default-features = false }
thiserror = "1"
smartstring = { version = "1" }
serde_json = { version = "1" }
Expand Down Expand Up @@ -162,7 +162,7 @@ features = [
"azure"
]
git = "https://github.com/pola-rs/polars.git"
rev = "2dce3d3b5c80ae7522a3435f844fac8fed9dc9e8"
rev = "58a38af21dccaf3326514494a1db118601c8c2ca"

[build-dependencies]
napi-build = "2.1.3"
Expand Down
4 changes: 2 additions & 2 deletions polars/lazy/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ export function spearmanRankCorr(a: ExprOrString, b: ExprOrString): Expr {
a = exprToLitOrExpr(a, false);
b = exprToLitOrExpr(b, false);

return _Expr(pli.spearmanRankCorr(a, b, null, false));
return _Expr(pli.spearmanRankCorr(a, b, false));
}

/** Get the last n rows of an Expression. */
Expand Down Expand Up @@ -950,7 +950,7 @@ export function sumHorizontal(exprs: ExprOrString | ExprOrString[]): Expr {

exprs = selectionToExprList(exprs);

return _Expr(pli.sumHorizontal(exprs));
return _Expr(pli.sumHorizontal(exprs, true));
}

// // export function collect_all() {}
Expand Down
59 changes: 58 additions & 1 deletion src/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::lazy::dsl::JsExpr;
use crate::prelude::*;
use cloud::CloudOptions;
use napi::bindgen_prelude::*;
use napi::{JsBigInt, JsBoolean, JsDate, JsNumber, JsObject, JsString, JsUnknown};
use polars::frame::NullStrategy;
use polars::prelude::NullStrategy;
use polars::prelude::*;
use polars_core::series::ops::NullBehavior;
use polars_io::RowIndex;
Expand Down Expand Up @@ -574,6 +575,8 @@ pub struct SinkCsvOptions {
pub float_precision: Option<i64>,
pub null_value: Option<String>,
pub maintain_order: bool,
pub cloud_options: Option<HashMap<String, String>>,
pub retries: Option<i64>,
}

#[napi(object)]
Expand All @@ -590,6 +593,29 @@ pub struct SinkParquetOptions {
pub simplify_expression: Option<bool>,
pub slice_pushdown: Option<bool>,
pub no_optimization: Option<bool>,
pub cloud_options: Option<HashMap<String, String>>,
pub retries: Option<i64>,
}

#[napi(object)]
pub struct ScanParquetOptions {
pub n_rows: Option<i64>,
pub row_index_name: Option<String>,
pub row_index_offset: Option<u32>,
pub cache: Option<bool>,
pub parallel: Wrap<ParallelStrategy>,
pub glob: Option<bool>,
pub hive_partitioning: Option<bool>,
pub hive_schema: Option<Wrap<Schema>>,
pub try_parse_hive_dates: Option<bool>,
pub rechunk: Option<bool>,
pub schema: Option<Wrap<Schema>>,
pub low_memory: Option<bool>,
pub use_statistics: Option<bool>,
pub cloud_options: Option<HashMap<String, String>>,
pub retries: Option<i64>,
pub include_file_paths: Option<String>,
pub allow_missing_columns: Option<bool>,
}

#[napi(object)]
Expand Down Expand Up @@ -803,11 +829,13 @@ impl FromNapiValue for Wrap<SortOptions> {
.map_or(obj.get::<_, bool>("nullsLast")?.unwrap_or(false), |n| n);
let multithreaded = obj.get::<_, bool>("multithreaded")?.unwrap();
let maintain_order: bool = obj.get::<_, bool>("maintain_order")?.unwrap();
let limit = obj.get::<_, _>("limit")?.unwrap();
let options = SortOptions {
descending,
nulls_last,
multithreaded,
maintain_order,
limit,
};
Ok(Wrap(options))
}
Expand Down Expand Up @@ -1302,3 +1330,32 @@ pub(crate) fn parse_parquet_compression(
};
Ok(parsed)
}

pub(crate) fn parse_cloud_options(
uri: &str,
kv: Option<HashMap<String, String>>,
retries: Option<i64>,
) -> Option<CloudOptions> {
let mut cloud_options: Option<CloudOptions> = if let Some(o) = kv {
let co: Vec<(String, String)> = o.into_iter().map(|kv: (String, String)| kv).collect();
Some(
CloudOptions::from_untyped_config(&uri, co)
.map_err(JsPolarsErr::from)
.unwrap(),
)
} else {
None
};

let retries = retries.unwrap_or_else(|| 2) as usize;
if retries > 0 {
cloud_options =
cloud_options
.or_else(|| Some(CloudOptions::default()))
.map(|mut options| {
options.max_retries = retries;
options
});
}
cloud_options
}
7 changes: 3 additions & 4 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::prelude::*;
use crate::series::JsSeries;
use napi::JsUnknown;
use polars::frame::row::{infer_schema, Row};
use polars::frame::NullStrategy;
use polars_io::mmap::MmapBytesReader;
use polars_io::RowIndex;

Expand Down Expand Up @@ -689,7 +688,7 @@ impl JsDataFrame {
}
#[napi(catch_unwind)]
pub fn n_chunks(&self) -> napi::Result<u32> {
let n = self.df.n_chunks();
let n = self.df.first_col_n_chunks();
Ok(n as u32)
}

Expand Down Expand Up @@ -1038,7 +1037,7 @@ impl JsDataFrame {
.df
.mean_horizontal(null_strategy.0)
.map_err(JsPolarsErr::from)?;
Ok(s.map(|s| s.into()))
Ok(s.map(|s| s.take_materialized_series().into()))
}
#[napi(catch_unwind)]
pub fn hmax(&self) -> napi::Result<Option<JsSeries>> {
Expand All @@ -1058,7 +1057,7 @@ impl JsDataFrame {
.df
.sum_horizontal(null_strategy.0)
.map_err(JsPolarsErr::from)?;
Ok(s.map(|s| s.into()))
Ok(s.map(|s| s.take_materialized_series().into()))
}
#[napi(catch_unwind)]
pub fn to_dummies(
Expand Down
66 changes: 25 additions & 41 deletions src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::dsl::*;
use crate::dataframe::JsDataFrame;
use crate::prelude::*;
use polars::prelude::{col, lit, ClosedWindow, JoinType};
use polars_io::cloud::CloudOptions;
use polars_io::{HiveOptions, RowIndex};
use std::collections::HashMap;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -583,6 +582,7 @@ impl JsLazyFrame {
let include_bom = options.include_bom.unwrap_or(false);
let include_header = options.include_header.unwrap_or(true);
let maintain_order = options.maintain_order;
let cloud_options = parse_cloud_options(&path, options.cloud_options, options.retries);

let options = CsvWriterOptions {
include_bom,
Expand All @@ -594,7 +594,9 @@ impl JsLazyFrame {

let path_buf: PathBuf = PathBuf::from(path);
let ldf = self.ldf.clone().with_comm_subplan_elim(false);
let _ = ldf.sink_csv(path_buf, options).map_err(JsPolarsErr::from);
let _ = ldf
.sink_csv(path_buf, options, cloud_options)
.map_err(JsPolarsErr::from);
Ok(())
}

Expand All @@ -610,6 +612,7 @@ impl JsLazyFrame {
let row_group_size = options.row_group_size.map(|i| i as usize);
let data_page_size = options.data_pagesize_limit.map(|i| i as usize);
let maintain_order = options.maintain_order.unwrap_or(true);
let cloud_options = parse_cloud_options(&path, options.cloud_options, options.retries);

let options = ParquetWriteOptions {
compression,
Expand All @@ -622,7 +625,7 @@ impl JsLazyFrame {
let path_buf: PathBuf = PathBuf::from(path);
let ldf = self.ldf.clone().with_comm_subplan_elim(false);
let _ = ldf
.sink_parquet(path_buf, options)
.sink_parquet(&path_buf, options, cloud_options)
.map_err(JsPolarsErr::from);
Ok(())
}
Expand Down Expand Up @@ -716,27 +719,6 @@ pub fn scan_csv(path: String, options: ScanCsvOptions) -> napi::Result<JsLazyFra
Ok(r.into())
}

#[napi(object)]
pub struct ScanParquetOptions {
pub n_rows: Option<i64>,
pub row_index_name: Option<String>,
pub row_index_offset: Option<u32>,
pub cache: Option<bool>,
pub parallel: Wrap<ParallelStrategy>,
pub glob: Option<bool>,
pub hive_partitioning: Option<bool>,
pub hive_schema: Option<Wrap<Schema>>,
pub try_parse_hive_dates: Option<bool>,
pub rechunk: Option<bool>,
pub schema: Option<Wrap<Schema>>,
pub low_memory: Option<bool>,
pub use_statistics: Option<bool>,
pub cloud_options: Option<HashMap<String, String>>,
pub retries: Option<i64>,
pub include_file_paths: Option<String>,
pub allow_missing_columns: Option<bool>,
}

#[napi(catch_unwind)]
pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<JsLazyFrame> {
let n_rows = options.n_rows.map(|i| i as usize);
Expand All @@ -757,23 +739,25 @@ pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<J
let low_memory = options.low_memory.unwrap_or(false);
let use_statistics = options.use_statistics.unwrap_or(false);

let mut cloud_options: Option<CloudOptions> = if let Some(o) = options.cloud_options {
let co: Vec<(String, String)> = o.into_iter().map(|kv: (String, String)| kv).collect();
Some(CloudOptions::from_untyped_config(&path, co).map_err(JsPolarsErr::from)?)
} else {
None
};

let retries = options.retries.unwrap_or_else(|| 2) as usize;
if retries > 0 {
cloud_options =
cloud_options
.or_else(|| Some(CloudOptions::default()))
.map(|mut options| {
options.max_retries = retries;
options
});
}
let cloud_options = parse_cloud_options(&path, options.cloud_options, options.retries);

// let mut cloud_options: Option<CloudOptions> = if let Some(o) = options.cloud_options {
// let co: Vec<(String, String)> = o.into_iter().map(|kv: (String, String)| kv).collect();
// Some(CloudOptions::from_untyped_config(&path, co).map_err(JsPolarsErr::from)?)
// } else {
// None
// };

// let retries = options.retries.unwrap_or_else(|| 2) as usize;
// if retries > 0 {
// cloud_options =
// cloud_options
// .or_else(|| Some(CloudOptions::default()))
// .map(|mut options| {
// options.max_retries = retries;
// options
// });
// }

let hive_schema = options.hive_schema.map(|s| Arc::new(s.0));
let schema = options.schema.map(|s| Arc::new(s.0));
Expand Down
19 changes: 6 additions & 13 deletions src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1705,20 +1705,13 @@ pub fn int_ranges(
}

#[napi(catch_unwind)]
pub fn pearson_corr(a: Wrap<Expr>, b: Wrap<Expr>, ddof: Option<u8>) -> JsExpr {
let ddof = ddof.unwrap_or(1);
polars::lazy::dsl::pearson_corr(a.0, b.0, ddof).into()
pub fn pearson_corr(a: Wrap<Expr>, b: Wrap<Expr>) -> JsExpr {
polars::lazy::dsl::pearson_corr(a.0, b.0).into()
}

#[napi(catch_unwind)]
pub fn spearman_rank_corr(
a: Wrap<Expr>,
b: Wrap<Expr>,
ddof: Option<u8>,
propagate_nans: bool,
) -> JsExpr {
let ddof = ddof.unwrap_or(1);
polars::lazy::dsl::spearman_rank_corr(a.0, b.0, ddof, propagate_nans).into()
pub fn spearman_rank_corr(a: Wrap<Expr>, b: Wrap<Expr>, propagate_nans: bool) -> JsExpr {
polars::lazy::dsl::spearman_rank_corr(a.0, b.0, propagate_nans).into()
}

#[napi(catch_unwind)]
Expand Down Expand Up @@ -1810,9 +1803,9 @@ pub fn max_horizontal(exprs: Vec<&JsExpr>) -> JsExpr {
.into()
}
#[napi(catch_unwind)]
pub fn sum_horizontal(exprs: Vec<&JsExpr>) -> JsExpr {
pub fn sum_horizontal(exprs: Vec<&JsExpr>, ignore_nulls: bool) -> JsExpr {
let exprs = exprs.to_exprs();
dsl::sum_horizontal(exprs)
dsl::sum_horizontal(exprs, ignore_nulls)
.map_err(JsPolarsErr::from)
.unwrap()
.into()
Expand Down
16 changes: 4 additions & 12 deletions src/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,26 +339,17 @@ impl JsSeries {
}
#[napi(catch_unwind)]
pub fn bitand(&self, other: &JsSeries) -> napi::Result<JsSeries> {
let out = self
.series
.bitand(&other.series)
.map_err(JsPolarsErr::from)?;
let out = (&self.series & &other.series).map_err(JsPolarsErr::from)?;
Ok(out.into())
}
#[napi(catch_unwind)]
pub fn bitor(&self, other: &JsSeries) -> napi::Result<JsSeries> {
let out = self
.series
.bitor(&other.series)
.map_err(JsPolarsErr::from)?;
let out = (&self.series | &other.series).map_err(JsPolarsErr::from)?;
Ok(out.into())
}
#[napi(catch_unwind)]
pub fn bitxor(&self, other: &JsSeries) -> napi::Result<JsSeries> {
let out = self
.series
.bitxor(&other.series)
.map_err(JsPolarsErr::from)?;
let out = (&self.series ^ &other.series).map_err(JsPolarsErr::from)?;
Ok(out.into())
}
#[napi(catch_unwind)]
Expand Down Expand Up @@ -542,6 +533,7 @@ impl JsSeries {
nulls_last,
multithreaded,
maintain_order,
limit: None,
})
.into_series()
.into()
Expand Down

0 comments on commit 47b0525

Please sign in to comment.