Skip to content

Commit

Permalink
Add DataFrame.explode/2 (#751)
Browse files Browse the repository at this point in the history
  • Loading branch information
costaraphael authored Dec 1, 2023
1 parent 71597ad commit 0955a1d
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/explorer/backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ defmodule Explorer.Backend.DataFrame do
@callback put(df, out_df :: df(), column_name(), series()) :: df
@callback describe(df, percentiles :: option(list(float()))) :: df()
@callback nil_count(df) :: df()
@callback explode(df, out_df :: df(), columns :: [column_name()]) :: df()

# Two or more table verbs

Expand Down
81 changes: 79 additions & 2 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5283,8 +5283,8 @@ defmodule Explorer.DataFrame do
%Series{data: %LazySeries{aggregation: true}} ->
value

%Series{data: %LazySeries{op: :column}} ->
value
%Series{data: %LazySeries{op: :column}} = value ->
%{value | dtype: {:list, value.dtype}}

%Series{data: %LazySeries{}} = series ->
raise "expecting summarise with an aggregation operation or plain column, " <>
Expand Down Expand Up @@ -5315,6 +5315,83 @@ defmodule Explorer.DataFrame do
groups ++ names_with_dtypes
end

@doc """
Explodes one or multiple list column into multiple rows.
When exploding multiple columns, the number of list elements in each row for
the exploded columns must be the same.
## Examples
iex> df = Explorer.DataFrame.new(a: [[1, 2], [3, 4]], b: [[5, 6], [7, 8]], c: ["a", "b"])
iex> Explorer.DataFrame.explode(df, :a)
#Explorer.DataFrame<
Polars[4 x 3]
a integer [1, 2, 3, 4]
b list[integer] [[5, 6], [5, 6], [7, 8], [7, ...]]
c string ["a", "a", "b", "b"]
>
iex> Explorer.DataFrame.explode(df, [:a, :b])
#Explorer.DataFrame<
Polars[4 x 3]
a integer [1, 2, 3, 4]
b integer [5, 6, 7, 8]
c string ["a", "a", "b", "b"]
>
You can think of exploding multiple list columns as being the inverse of
aggregating the elements the exploded columns into lists:
iex> df = Explorer.DataFrame.new(a: [1, 2, 3, 4], b: [5, 6, 7, 8], c: ["a", "a", "b", "b"])
iex> df = df |> Explorer.DataFrame.group_by(:c) |> Explorer.DataFrame.summarise(a: a, b: b)
#Explorer.DataFrame<
Polars[2 x 3]
c string ["a", "b"]
a list[integer] [[1, 2], [3, 4]]
b list[integer] [[5, 6], [7, 8]]
>
iex> Explorer.DataFrame.explode(df, [:a, :b]) # we are back where we started
#Explorer.DataFrame<
Polars[4 x 3]
c string ["a", "a", "b", "b"]
a integer [1, 2, 3, 4]
b integer [5, 6, 7, 8]
>
If you want to perform the cartesian product of two list columns, you must
call `explode/2` once for each column:
iex> df = Explorer.DataFrame.new(a: [[1, 2], [3, 4]], b: [[5, 6], [7, 8]], c: ["a", "b"])
iex> df |> Explorer.DataFrame.explode(:a) |> Explorer.DataFrame.explode(:b)
#Explorer.DataFrame<
Polars[8 x 3]
a integer [1, 1, 2, 2, 3, ...]
b integer [5, 6, 5, 6, 7, ...]
c string ["a", "a", "a", "a", "b", ...]
>
"""
@doc type: :single
@spec explode(df :: DataFrame.t(), column :: column_name() | [column_name()]) :: DataFrame.t()
def explode(%DataFrame{} = df, column_or_columns) do
columns = to_existing_columns(df, List.wrap(column_or_columns))

dtypes = Enum.map(columns, &Map.fetch!(df.dtypes, &1))

unless Enum.all?(dtypes, &match?({:list, _}, &1)) do
raise ArgumentError,
"explode/2 expects list columns, but the given columns have the types: #{inspect(dtypes)}"
end

out_dtypes =
Enum.reduce(columns, df.dtypes, fn column, dtypes ->
Map.update!(dtypes, column, fn {:list, inner_dtype} -> inner_dtype end)
end)

out_df = %{df | dtypes: out_dtypes}

Shared.apply_impl(df, :explode, [out_df, columns])
end

@doc """
Prints the DataFrame in a tabular fashion.
Expand Down
5 changes: 5 additions & 0 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,11 @@ defmodule Explorer.PolarsBackend.DataFrame do
|> Shared.create_dataframe()
end

@impl true
def explode(df, out_df, columns) do
Shared.apply_dataframe(df, out_df, :df_explode, [columns])
end

# Two or more table verbs

@impl true
Expand Down
4 changes: 4 additions & 0 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ defmodule Explorer.PolarsBackend.LazyFrame do
values_to
])

@impl true
def explode(%DF{} = df, %DF{} = out_df, columns),
do: Shared.apply_dataframe(df, out_df, :lf_explode, [columns])

# Groups

@impl true
Expand Down
2 changes: 2 additions & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ defmodule Explorer.PolarsBackend.Native do
def df_width(_df), do: err()
def df_describe(_df, _percentiles), do: err()
def df_nil_count(_df), do: err()
def df_explode(_df, _columns), do: err()

# Expressions (for lazy queries)
@multi_arity_expressions [slice: 2, slice: 3, log: 1, log: 2]
Expand Down Expand Up @@ -207,6 +208,7 @@ defmodule Explorer.PolarsBackend.Native do
def lf_select(_df, _columns), do: err()
def lf_tail(_df, _n_rows), do: err()
def lf_slice(_df, _offset, _length), do: err()
def lf_explode(_df, _columns), do: err()
def lf_from_ipc(_filename), do: err()
def lf_from_ndjson(_filename, _infer_schema_length, _batch_size), do: err()
def lf_from_parquet(_filename, _stop_after_n_rows, _maybe_columns), do: err()
Expand Down
6 changes: 6 additions & 0 deletions native/explorer/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,12 @@ pub fn df_pivot_wider(
Ok(ExDataFrame::new(new_df))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_explode(df: ExDataFrame, columns: Vec<&str>) -> Result<ExDataFrame, ExplorerError> {
let new_df = df.explode(columns)?;
Ok(ExDataFrame::new(new_df))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn df_lazy(df: ExDataFrame) -> Result<ExLazyFrame, ExplorerError> {
let new_lf = df.clone_inner().lazy();
Expand Down
6 changes: 6 additions & 0 deletions native/explorer/src/lazyframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ pub fn lf_slice(data: ExLazyFrame, offset: i64, length: u32) -> Result<ExLazyFra
Ok(ExLazyFrame::new(lf.slice(offset, length)))
}

#[rustler::nif]
pub fn lf_explode(data: ExLazyFrame, columns: Vec<&str>) -> Result<ExLazyFrame, ExplorerError> {
let lf = data.clone_inner().explode(columns);
Ok(ExLazyFrame::new(lf))
}

#[rustler::nif]
pub fn lf_filter_with(data: ExLazyFrame, ex_expr: ExExpr) -> Result<ExLazyFrame, ExplorerError> {
let ldf = data.clone_inner();
Expand Down
2 changes: 2 additions & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ rustler::init!(
df_dump_parquet,
df_dump_ipc,
df_dump_ipc_stream,
df_explode,
df_filter_with,
df_from_csv,
df_from_ipc,
Expand Down Expand Up @@ -282,6 +283,7 @@ rustler::init!(
lf_select,
lf_tail,
lf_slice,
lf_explode,
lf_from_csv,
lf_from_ipc,
lf_from_parquet,
Expand Down
14 changes: 14 additions & 0 deletions test/explorer/data_frame/lazy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1403,4 +1403,18 @@ defmodule Explorer.DataFrame.LazyTest do
assert Exception.message(error) =~ "syntax error"
end
end

describe "explode/2" do
test "explodes a list column" do
ldf = DF.new([letters: [~w(a e), ~w(b c d)], is_vowel: [true, false]], lazy: true)

ldf1 = DF.explode(ldf, :letters)
df1 = DF.collect(ldf1)

assert DF.to_columns(df1, atom_keys: true) == %{
letters: ["a", "e", "b", "c", "d"],
is_vowel: [true, true, false, false, false]
}
end
end
end
41 changes: 41 additions & 0 deletions test/explorer/data_frame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3608,6 +3608,8 @@ defmodule Explorer.DataFrameTest do
is_vowel: [true, false],
letters: [["a", "e", "i"], ["b", "c", "d", "f", "g", "h", "j"]]
}

assert DF.dtypes(df) == %{"is_vowel" => :boolean, "letters" => {:list, :string}}
end

test "mode/1" do
Expand Down Expand Up @@ -3702,4 +3704,43 @@ defmodule Explorer.DataFrameTest do
}
end
end

describe "explode/2" do
test "explodes a list column" do
df = DF.new(letters: [~w(a e), ~w(b c d)], is_vowel: [true, false])

df1 = DF.explode(df, :letters)

assert DF.to_columns(df1, atom_keys: true) == %{
letters: ["a", "e", "b", "c", "d"],
is_vowel: [true, true, false, false, false]
}

assert DF.dtypes(df1) == %{"is_vowel" => :boolean, "letters" => :string}
end

test "works with multiple columns" do
df = DF.new(a: [[1, 2], [3, 4]], b: [[5, 6], [7, 8]], c: ["a", "b"])

df1 = DF.explode(df, [:a, :b])

assert DF.to_columns(df1, atom_keys: true) == %{
a: [1, 2, 3, 4],
b: [5, 6, 7, 8],
c: ["a", "a", "b", "b"]
}
end

test "raises if the columns are not of the list type" do
df = DF.new(a: [1, 2, 3], b: [[1, 2], [3, 4], [5, 6]])

assert_raise ArgumentError,
"explode/2 expects list columns, but the given columns have the types: [:integer]",
fn -> DF.explode(df, :a) end

assert_raise ArgumentError,
"explode/2 expects list columns, but the given columns have the types: [:integer, {:list, :integer}]",
fn -> DF.explode(df, [:a, :b]) end
end
end
end

0 comments on commit 0955a1d

Please sign in to comment.