Skip to content

Commit

Permalink
Add map as expression (#855)
Browse files Browse the repository at this point in the history
  • Loading branch information
lkarthee authored Feb 14, 2024
1 parent 66c51d1 commit 2dc0062
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 62 deletions.
107 changes: 55 additions & 52 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2800,77 +2800,80 @@ defmodule Explorer.DataFrame do

result = fun.(ldf)

column_pairs =
to_column_pairs(df, result, fn value ->
case value do
%Series{data: %LazySeries{}} = lazy_series ->
lazy_series

%Series{data: _other} ->
raise ArgumentError,
"expecting a lazy series. Consider using `Explorer.DataFrame.put/3` " <>
"to add eager series to your dataframe."

list when is_list(list) ->
raise ArgumentError,
"expecting a lazy series or scalar value, but instead got a list. " <>
"consider using `Explorer.Series.from_list/2` to create a `Series`, " <>
"and then `Explorer.DataFrame.put/3` to add the series to your dataframe."
column_pairs = to_column_pairs(df, result, &value!/1)

nil ->
lazy_s = LazySeries.new(:lazy, [nil], :null)
Explorer.Backend.Series.new(lazy_s, :null)
new_dtypes =
for {column_name, series} <- column_pairs, into: %{} do
{column_name, series.dtype}
end

number when is_number(number) ->
dtype = if is_integer(number), do: {:s, 64}, else: {:f, 64}
lazy_s = LazySeries.new(:lazy, [number], dtype)
mut_names = Enum.map(column_pairs, &elem(&1, 0))
new_names = Enum.uniq(df.names ++ mut_names)

Explorer.Backend.Series.new(lazy_s, dtype)
df_out = %{df | names: new_names, dtypes: Map.merge(df.dtypes, new_dtypes)}

string when is_binary(string) ->
lazy_s = LazySeries.new(:lazy, [string], :string)
column_pairs = for {name, %Series{data: lazy_series}} <- column_pairs, do: {name, lazy_series}

Explorer.Backend.Series.new(lazy_s, :string)
Shared.apply_impl(df, :mutate_with, [df_out, column_pairs])
end

boolean when is_boolean(boolean) ->
lazy_s = LazySeries.new(:lazy, [boolean], :boolean)
defp value!(%Series{} = series), do: series

Explorer.Backend.Series.new(lazy_s, :boolean)
defp value!(list) when is_list(list) do
raise ArgumentError,
"expecting a lazy series or scalar value, but instead got a list. " <>
"consider using `Explorer.Series.from_list/2` to create a `Series`, " <>
"and then `Explorer.DataFrame.put/3` to add the series to your dataframe."
end

date = %Date{} ->
lazy_s = LazySeries.new(:lazy, [date], :date)
defp value!(scalar) do
lazy_s = lazy_series!(scalar)
Explorer.Backend.Series.new(lazy_s, lazy_s.dtype)
end

Explorer.Backend.Series.new(lazy_s, :date)
defp lazy_series!(scalar) do
case scalar do
%Series{data: %LazySeries{}} = series ->
series.data

datetime = %NaiveDateTime{} ->
lazy_s = LazySeries.new(:lazy, [datetime], {:datetime, :nanosecond})
nil ->
LazySeries.new(:lazy, [nil], :null)

Explorer.Backend.Series.new(lazy_s, {:datetime, :nanosecond})
number when is_number(number) ->
dtype = if is_integer(number), do: {:s, 64}, else: {:f, 64}
LazySeries.new(:lazy, [number], dtype)

duration = %Explorer.Duration{precision: precision} ->
lazy_s = LazySeries.new(:lazy, [duration], {:duration, precision})
string when is_binary(string) ->
LazySeries.new(:lazy, [string], :string)

Explorer.Backend.Series.new(lazy_s, {:duration, precision})
boolean when is_boolean(boolean) ->
LazySeries.new(:lazy, [boolean], :boolean)

other ->
raise ArgumentError,
"expecting a lazy series or scalar value, but instead got #{inspect(other)}"
end
end)
date = %Date{} ->
LazySeries.new(:lazy, [date], :date)

new_dtypes =
for {column_name, series} <- column_pairs, into: %{} do
{column_name, series.dtype}
end
datetime = %NaiveDateTime{} ->
LazySeries.new(:lazy, [datetime], {:datetime, :nanosecond})

mut_names = Enum.map(column_pairs, &elem(&1, 0))
new_names = Enum.uniq(df.names ++ mut_names)
duration = %Explorer.Duration{precision: precision} ->
LazySeries.new(:lazy, [duration], {:duration, precision})

df_out = %{df | names: new_names, dtypes: Map.merge(df.dtypes, new_dtypes)}
map = %{} when not is_struct(map) ->
{series_list, dtype_list} =
Enum.reduce(map, {[], []}, fn {name, series}, {sl, dl} ->
lazy_series = lazy_series!(series)
name = if is_atom(name), do: Atom.to_string(name), else: name
{[{name, lazy_series} | sl], [{name, lazy_series.dtype} | dl]}
end)

column_pairs = for {name, %Series{data: lazy_series}} <- column_pairs, do: {name, lazy_series}
map = Enum.into(series_list, %{})
dtype_list = Enum.sort(dtype_list)
LazySeries.new(:lazy, [map], {:struct, dtype_list})

Shared.apply_impl(df, :mutate_with, [df_out, column_pairs])
other ->
raise ArgumentError,
"expecting a lazy series or scalar value, but instead got #{inspect(other)}"
end
end

@doc """
Expand Down
9 changes: 9 additions & 0 deletions lib/explorer/polars_backend/expression.ex
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,15 @@ defmodule Explorer.PolarsBackend.Expression do
def to_expr(%Explorer.Duration{} = duration), do: Native.expr_duration(duration)
def to_expr(%PolarsSeries{} = polars_series), do: Native.expr_series(polars_series)

def to_expr(map) when is_map(map) do
expr_list =
Enum.map(map, fn {name, series} ->
series |> to_expr() |> alias_expr(name)
end)

Native.expr_struct(expr_list)
end

# Used by Explorer.PolarsBackend.DataFrame
def alias_expr(%__MODULE__{} = expr, alias_name) when is_binary(alias_name) do
Native.expr_alias(expr, alias_name)
Expand Down
1 change: 1 addition & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ defmodule Explorer.PolarsBackend.Native do
def expr_integer(_number), do: err()
def expr_series(_series), do: err()
def expr_string(_string), do: err()
def expr_struct(_map), do: err()

# LazyFrame
def lf_collect(_df), do: err()
Expand Down
19 changes: 13 additions & 6 deletions native/explorer/src/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
// or an expression and returns an expression that is
// wrapped in an Elixir struct.

use polars::prelude::{
col, concat_str, cov, pearson_corr, spearman_rank_corr, when, IntoLazy, LiteralValue,
SortOptions,
};
use polars::prelude::{DataType, EWMOptions, Expr, Literal, StrptimeOptions, TimeUnit};

use crate::datatypes::{
ExCorrelationMethod, ExDate, ExDateTime, ExDuration, ExRankMethod, ExSeriesDtype, ExValidValue,
};
use crate::series::{cast_str_to_f64, ewm_opts, rolling_opts};
use crate::{ExDataFrame, ExExpr, ExSeries};
use polars::lazy::dsl;
use polars::prelude::{
col, concat_str, cov, pearson_corr, spearman_rank_corr, when, IntoLazy, LiteralValue,
SortOptions,
};
use polars::prelude::{DataType, EWMOptions, Expr, Literal, StrptimeOptions, TimeUnit};

// Useful to get an ExExpr vec into a vec of expressions.
pub fn ex_expr_to_exprs(ex_exprs: Vec<ExExpr>) -> Vec<Expr> {
Expand Down Expand Up @@ -1079,3 +1079,10 @@ pub fn expr_json_decode(expr: ExExpr, ex_dtype: ExSeriesDtype) -> ExExpr {
let expr = expr.clone_inner().str().json_decode(Some(dtype), None);
ExExpr::new(expr)
}

#[rustler::nif]
pub fn expr_struct(ex_exprs: Vec<ExExpr>) -> ExExpr {
let exprs = ex_exprs.iter().map(|e| e.clone_inner()).collect();
let expr = dsl::as_struct(exprs);
ExExpr::new(expr)
}
1 change: 1 addition & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ rustler::init!(
// struct expressions
expr_field,
expr_json_decode,
expr_struct,
// lazyframe
lf_collect,
lf_describe_plan,
Expand Down
58 changes: 54 additions & 4 deletions test/explorer/data_frame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,56 @@ defmodule Explorer.DataFrameTest do
}
end

test "with map " do
df = DF.new(%{a: [1, nil, 3], b: ["a", "b", nil]})
df1 = DF.mutate(df, c: %{a: a, b: b, lit: 1, null: is_nil(a)})
assert df1.names == ["a", "b", "c"]

assert df1.dtypes == %{
"a" => {:s, 64},
"b" => :string,
"c" =>
{:struct,
[{"a", {:s, 64}}, {"b", :string}, {"lit", {:s, 64}}, {"null", :boolean}]}
}

assert DF.to_columns(df1, atom_keys: true) == %{
a: [1, nil, 3],
b: ["a", "b", nil],
c: [
%{"a" => 1, "b" => "a", "lit" => 1, "null" => false},
%{"a" => nil, "b" => "b", "lit" => 1, "null" => true},
%{"a" => 3, "b" => nil, "lit" => 1, "null" => false}
]
}
end

test "with map nested" do
df = DF.new(%{a: [1, nil, 3], b: ["a", "b", nil]})
df1 = DF.mutate(df, c: %{s: %{a: a, b: b}})
assert df1.names == ["a", "b", "c"]

assert df1.dtypes == %{
"a" => {:s, 64},
"b" => :string,
"c" => {:struct, [{"s", {:struct, [{"a", {:s, 64}}, {"b", :string}]}}]}
}
end

test "add eager series to dataframe" do
df = DF.new([%{a: 1}, %{a: 2}, %{a: 3}])
s = Series.from_list(["x", "y", "z"])

df1 = DF.mutate(df, s: ^s, a1: a + 1)
assert df1.names == ["a", "s", "a1"]

assert df1.dtypes == %{
"a" => {:s, 64},
"s" => :string,
"a1" => {:s, 64}
}
end

test "adds new columns" do
df = DF.new(a: [1, 2, 3], b: ["a", "b", "c"])

Expand Down Expand Up @@ -1542,12 +1592,12 @@ defmodule Explorer.DataFrameTest do
}
end

test "raises when adding eager series" do
test "raises when adding eager series whose length does not match dataframe's length" do
df = DF.new(a: [1, 2, 3])
series = Series.from_list([4, 5, 6])
series = Series.from_list([4, 5, 6, 7])

assert_raise ArgumentError,
"expecting a lazy series. Consider using `Explorer.DataFrame.put/3` to add eager series to your dataframe.",
assert_raise RuntimeError,
"Polars Error: lengths don't match: unable to add a column of length 4 to a DataFrame of height 3",
fn ->
DF.mutate(df, b: ^series)
end
Expand Down

0 comments on commit 2dc0062

Please sign in to comment.