Skip to content

Commit

Permalink
Add clone_inner() to Ex structs (#473)
Browse files Browse the repository at this point in the history
The idea is to simplify clonning the inner datatypes, since this is a
lot frequent in our codebase.

Internally Polars uses Arc to represent the Series, so cloning is
normally a cheap operation.
  • Loading branch information
philss authored Jan 14, 2023
1 parent 696ed7f commit c8666b0
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 347 deletions.
1 change: 0 additions & 1 deletion lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ defmodule Explorer.PolarsBackend.Native do
def s_add(_s, _other), do: err()
def s_and(_s, _s2), do: err()
def s_argsort(_s, _descending?, _nils_last?), do: err()
def s_as_str(_s), do: err()
def s_cast(_s, _dtype), do: err()
def s_categories(_s), do: err()
def s_categorise(_s, _s_categories), do: err()
Expand Down
125 changes: 57 additions & 68 deletions native/explorer/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ pub fn normalize_numeric_dtypes(df: &mut DataFrame) -> Result<DataFrame, crate::
Ok(df.clone())
}

fn ex_expr_to_exprs(ex_exprs: Vec<ExExpr>) -> Vec<Expr> {
ex_exprs
.iter()
.map(|ex_expr| ex_expr.clone_inner())
.collect()
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_join(
data: ExDataFrame,
Expand All @@ -49,21 +56,23 @@ pub fn df_join(
}
};

let df = &data.resource.0;
let df1 = &other.resource.0;
let new_df = df.join(df1, left_on, right_on, how, suffix)?;
let df = data.clone_inner();
let df1 = other.clone_inner();

let new_df = df.join(&df1, left_on, right_on, how, suffix)?;
Ok(ExDataFrame::new(new_df))
}

#[rustler::nif]
pub fn df_names(data: ExDataFrame) -> Result<Vec<String>, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();

Ok(df.get_column_names_owned())
}

#[rustler::nif]
pub fn df_dtypes(data: ExDataFrame) -> Result<Vec<String>, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let result = df.dtypes().iter().map(|dtype| dtype.to_string()).collect();
Ok(result)
}
Expand All @@ -88,11 +97,11 @@ pub fn df_concat_rows(
data: ExDataFrame,
others: Vec<ExDataFrame>,
) -> Result<ExDataFrame, ExplorerError> {
let mut out_df = data.resource.0.clone();
let mut out_df = data.clone_inner();
let names = out_df.get_column_names();
let dfs = others
.into_iter()
.map(|ex_df| ex_df.resource.0.select(&names))
.map(|ex_df| ex_df.clone_inner().select(&names))
.collect::<Result<Vec<_>, _>>()?;

for df in dfs {
Expand All @@ -109,25 +118,14 @@ pub fn df_concat_columns(
others: Vec<ExDataFrame>,
) -> Result<ExDataFrame, ExplorerError> {
let id_column = "__row_count_id__";
let first = data
.resource
.0
.clone()
.lazy()
.with_row_count(id_column, None);
let first = data.clone_inner().lazy().with_row_count(id_column, None);

// We need to be able to handle arbitrary column name overlap.
// This builds up a join and suffixes conflicting names with _N where
// N is the index of the df in the join array.
let (out_df, _) = others
.iter()
.map(|data| {
data.resource
.0
.clone()
.lazy()
.with_row_count(id_column, None)
})
.map(|data| data.clone_inner().lazy().with_row_count(id_column, None))
.fold((first, 1), |(acc_df, count), df| {
let suffix = format!("_{count}");
let new_df = acc_df
Expand All @@ -151,43 +149,43 @@ pub fn df_drop_nulls(
data: ExDataFrame,
subset: Option<Vec<String>>,
) -> Result<ExDataFrame, ExplorerError> {
let df: &DataFrame = &data.resource.0;
let df = data.clone_inner();
let new_df = df.drop_nulls(subset.as_ref().map(|s| s.as_ref()))?;
Ok(ExDataFrame::new(new_df))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_drop(data: ExDataFrame, name: &str) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let new_df = df.drop(name)?;
Ok(ExDataFrame::new(new_df))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_select_at_idx(data: ExDataFrame, idx: usize) -> Result<Option<ExSeries>, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let result = df.select_at_idx(idx).map(|s| ExSeries::new(s.clone()));
Ok(result)
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_pull(data: ExDataFrame, name: &str) -> Result<ExSeries, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let series = df.column(name).map(|s| ExSeries::new(s.clone()))?;
Ok(series)
}

#[rustler::nif]
pub fn df_select(data: ExDataFrame, selection: Vec<&str>) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let new_df = df.select(selection)?;
Ok(ExDataFrame::new(new_df))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_mask(data: ExDataFrame, mask: ExSeries) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let filter_series = &mask.resource.0;
let df = data.clone_inner();
let filter_series = mask.clone_inner();
if let Ok(ca) = filter_series.bool() {
let new_df = df.filter(ca)?;
Ok(ExDataFrame::new(new_df))
Expand All @@ -202,8 +200,8 @@ pub fn df_filter_with(
ex_expr: ExExpr,
groups: Vec<String>,
) -> Result<ExDataFrame, ExplorerError> {
let df: DataFrame = data.resource.0.clone();
let exp: Expr = ex_expr.resource.0.clone();
let df = data.clone_inner();
let exp = ex_expr.clone_inner();

let new_df = if groups.is_empty() {
df.lazy().filter(exp).collect()?
Expand All @@ -221,7 +219,7 @@ pub fn df_slice_by_indices(
indices: Vec<u32>,
groups: Vec<&str>,
) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let idx = UInt32Chunked::from_vec("idx", indices);
let new_df = if groups.is_empty() {
df.take(&idx)?
Expand All @@ -236,8 +234,8 @@ pub fn df_slice_by_series(
data: ExDataFrame,
series: ExSeries,
) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let cast = series.resource.0.cast(&DataType::UInt32)?;
let df = data.clone_inner();
let cast = series.clone_inner().cast(&DataType::UInt32)?;
Ok(ExDataFrame::new(df.take(cast.u32()?)?))
}

Expand All @@ -249,7 +247,7 @@ pub fn df_sample_n(
seed: Option<u64>,
groups: Vec<String>,
) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let new_df = if groups.is_empty() {
df.sample_n(n, with_replacement, false, seed)?
} else {
Expand All @@ -268,7 +266,7 @@ pub fn df_sample_frac(
seed: Option<u64>,
groups: Vec<String>,
) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let new_df = if groups.is_empty() {
df.sample_frac(frac, with_replacement, false, seed)?
} else {
Expand All @@ -286,7 +284,7 @@ pub fn df_arrange(
reverse: Vec<bool>,
groups: Vec<String>,
) -> Result<ExDataFrame, ExplorerError> {
let df: DataFrame = data.resource.0.clone();
let df = data.clone_inner();

let new_df = if groups.is_empty() {
let new_df = &df.sort(by_columns, reverse)?;
Expand All @@ -308,7 +306,7 @@ pub fn df_arrange_with(
directions: Vec<bool>,
groups: Vec<String>,
) -> Result<ExDataFrame, ExplorerError> {
let df: DataFrame = data.resource.0.clone();
let df = data.clone_inner();
let exprs = ex_expr_to_exprs(expressions);

let new_df = if groups.is_empty() {
Expand All @@ -333,7 +331,7 @@ pub fn df_slice(
length: usize,
groups: Vec<&str>,
) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let new_df = if groups.is_empty() {
df.slice(offset, length)
} else {
Expand All @@ -349,7 +347,7 @@ pub fn df_head(
length: Option<usize>,
groups: Vec<&str>,
) -> Result<ExDataFrame, ExplorerError> {
let df: DataFrame = data.resource.0.clone();
let df = data.clone_inner();

let new_df = if groups.is_empty() {
df.head(length)
Expand All @@ -365,7 +363,7 @@ pub fn df_tail(
length: Option<usize>,
groups: Vec<&str>,
) -> Result<ExDataFrame, ExplorerError> {
let df: DataFrame = data.resource.0.clone();
let df = data.clone_inner();

let new_df = if groups.is_empty() {
df.tail(length)
Expand All @@ -383,7 +381,7 @@ pub fn df_pivot_longer(
names_to: String,
values_to: String,
) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let melt_opts = MeltArgs {
id_vars,
value_vars,
Expand All @@ -401,7 +399,7 @@ pub fn df_distinct(
subset: Vec<String>,
columns_to_keep: Option<Vec<&str>>,
) -> Result<ExDataFrame, ExplorerError> {
let df: &DataFrame = &data.resource.0;
let df = data.clone_inner();
let new_df = match maintain_order {
false => df.unique(Some(&subset), UniqueKeepStrategy::First)?,
true => df.unique_stable(Some(&subset), UniqueKeepStrategy::First)?,
Expand All @@ -418,7 +416,7 @@ pub fn df_to_dummies(
data: ExDataFrame,
selection: Vec<&str>,
) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let dummies = df.select(selection).and_then(|df| df.to_dummies())?;
let series = dummies
.iter()
Expand All @@ -429,19 +427,19 @@ pub fn df_to_dummies(

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_put_column(data: ExDataFrame, series: ExSeries) -> Result<ExDataFrame, ExplorerError> {
let mut df: DataFrame = data.resource.0.clone();
let s: Series = series.resource.0.clone();
let new_df = df.with_column(s)?;
let mut df = data.clone_inner();
let s = series.clone_inner();
let new_df = df.with_column(s)?.clone();

Ok(ExDataFrame::new(new_df.clone()))
Ok(ExDataFrame::new(new_df))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_describe(
data: ExDataFrame,
percentiles: Option<Vec<f64>>,
) -> Result<ExDataFrame, ExplorerError> {
let df: DataFrame = data.resource.0.clone();
let df = data.clone_inner();

let new_df = match percentiles {
Some(percentiles) => df.describe(Some(percentiles.as_slice())),
Expand All @@ -457,7 +455,7 @@ pub fn df_mutate_with_exprs(
columns: Vec<ExExpr>,
groups: Vec<&str>,
) -> Result<ExDataFrame, ExplorerError> {
let df: DataFrame = data.resource.0.clone();
let df = data.clone_inner();
let mutations = ex_expr_to_exprs(columns);
let new_df = if groups.is_empty() {
df.lazy().with_columns(mutations).collect()?
Expand All @@ -471,11 +469,10 @@ pub fn df_mutate_with_exprs(

#[rustler::nif]
pub fn df_from_series(columns: Vec<ExSeries>) -> Result<ExDataFrame, ExplorerError> {
let columns = columns
.into_iter()
.map(|c| c.resource.0.clone())
.collect::<Vec<Series>>();
let columns = columns.into_iter().map(|c| c.clone_inner()).collect();

let df = DataFrame::new(columns)?;

Ok(ExDataFrame::new(df))
}

Expand All @@ -484,7 +481,7 @@ pub fn df_rename_columns(
data: ExDataFrame,
renames: Vec<(&str, &str)>,
) -> Result<ExDataFrame, ExplorerError> {
let mut df: DataFrame = data.resource.0.clone();
let mut df = data.clone_inner();
for (original, new_name) in renames {
df.rename(original, new_name).expect("should rename");
}
Expand All @@ -494,8 +491,9 @@ pub fn df_rename_columns(

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_groups(data: ExDataFrame, groups: Vec<&str>) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let groups = df.groupby(groups)?.groups()?;

Ok(ExDataFrame::new(groups))
}

Expand All @@ -504,7 +502,7 @@ pub fn df_group_indices(
data: ExDataFrame,
groups: Vec<&str>,
) -> Result<Vec<ExSeries>, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let series = df
.groupby_stable(groups)?
.groups()?
Expand All @@ -522,33 +520,24 @@ pub fn df_summarise_with_exprs(
groups: Vec<ExExpr>,
aggs: Vec<ExExpr>,
) -> Result<ExDataFrame, ExplorerError> {
let df = data.resource.0.clone();
let df = data.clone_inner();
let groups = ex_expr_to_exprs(groups);
let aggs = ex_expr_to_exprs(aggs);

let new_df = df.lazy().groupby_stable(groups).agg(aggs).collect()?;
Ok(ExDataFrame::new(new_df))
}

fn ex_expr_to_exprs(ex_exprs: Vec<ExExpr>) -> Vec<Expr> {
let exprs: Vec<Expr> = ex_exprs
.iter()
.map(|ex_expr| ex_expr.resource.0.clone())
.collect();

exprs
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_pivot_wider(
data: ExDataFrame,
id_columns: Vec<&str>,
pivot_column: &str,
values_column: &str,
) -> Result<ExDataFrame, ExplorerError> {
let df = &data.resource.0;
let df = data.clone_inner();
let new_df = pivot_stable(
df,
&df,
[values_column],
id_columns,
[pivot_column],
Expand All @@ -560,6 +549,6 @@ pub fn df_pivot_wider(

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_to_lazy(data: ExDataFrame) -> Result<ExLazyFrame, ExplorerError> {
let new_lf = data.resource.0.clone().lazy();
let new_lf = data.clone_inner().lazy();
Ok(ExLazyFrame::new(new_lf))
}
Loading

0 comments on commit c8666b0

Please sign in to comment.