Skip to content

Commit

Permalink
fix[rust]: dynamically determine union parallelism (#4890)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Sep 18, 2022
1 parent b93083b commit 4beb08e
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 21 deletions.
12 changes: 10 additions & 2 deletions polars/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,11 @@ pub fn duration(args: DurationArgs) -> Expr {
}

/// Concat multiple
pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, rechunk: bool) -> PolarsResult<LazyFrame> {
pub fn concat<L: AsRef<[LazyFrame]>>(
inputs: L,
rechunk: bool,
parallel: bool,
) -> PolarsResult<LazyFrame> {
let mut inputs = inputs.as_ref().to_vec();
let lf = std::mem::take(
inputs
Expand All @@ -578,10 +582,14 @@ pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, rechunk: bool) -> PolarsResult<L
let lp = std::mem::take(&mut lf.logical_plan);
lps.push(lp)
}
let options = UnionOptions {
parallel,
..Default::default()
};

let lp = LogicalPlan::Union {
inputs: lps,
options: Default::default(),
options,
};
let mut lf = LazyFrame::from(lp);
lf.opt_state = opt_state;
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ impl<'a> LazyCsvReader<'a> {
builder.finish_impl()
})
.collect::<PolarsResult<Vec<_>>>()?;
concat(&lfs, self.rechunk)
// set to false, as the csv parser has full thread utilization
concat(&lfs, self.rechunk, false)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|lf| {
if self.skip_rows != 0 || self.n_rows.is_some() {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl LazyFrame {
})
.collect::<PolarsResult<Vec<_>>>()?;

concat(&lfs, args.rechunk)
concat(&lfs, args.rechunk, true)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|mut lf| {
if let Some(n_rows) = args.n_rows {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ impl LazyFrame {
let alp = lp_arena.take(lp_top);
let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
lp_arena.replace(lp_top, alp);
cache_states::set_cache_states(lp_top, lp_arena, expr_arena, &mut scratch);
cache_states::set_cache_states(lp_top, lp_arena, expr_arena, &mut scratch, cse_changed);
}

if predicate_pushdown {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl LazyFrame {
}

fn concat_impl(lfs: Vec<LazyFrame>, args: ScanArgsParquet) -> PolarsResult<LazyFrame> {
concat(&lfs, args.rechunk).map(|mut lf| {
concat(&lfs, args.rechunk, true).map(|mut lf| {
if let Some(n_rows) = args.n_rows {
lf = lf.slice(0, n_rows as IdxSize)
};
Expand Down
13 changes: 11 additions & 2 deletions polars/polars-lazy/src/logical_plan/optimizer/cache_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub(crate) fn set_cache_states(
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
has_caches: bool,
) {
scratch.clear();

Expand All @@ -52,12 +53,20 @@ pub(crate) fn set_cache_states(

use ALogicalPlan::*;
match lp {
// don't allow parallelism if underneath a cache
Join { .. } if cache_id.is_some() => {
// don't allow parallelism as caches need eachothers work
// also self-referencing plans can deadlock on the files they lock
Join { options, .. } if has_caches && options.allow_parallel => {
if let Join { options, .. } = lp_arena.get_mut(node) {
options.allow_parallel = false;
}
}
// don't allow parallelism as caches need eachothers work
// also self-referencing plans can deadlock on the files they lock
Union { options, .. } if has_caches && options.parallel => {
if let Union { options, .. } = lp_arena.get_mut(node) {
options.parallel = false;
}
}
Cache { input, id, .. } => {
if let Some(cache_id) = cache_id {
previous_cache = Some(cache_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,10 @@ impl SlicePushDown {
Ok(lp)
}

(Union {inputs, .. }, Some(state)) => {
let options = UnionOptions {
slice: true,
slice_offset: state.offset,
slice_len: state.len,
};
(Union {inputs, mut options }, Some(state)) => {
options.slice = true;
options.slice_offset = state.offset;
options.slice_len = state.len;
Ok(Union {inputs, options})
},
(Join {
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct UnionOptions {
pub(crate) slice: bool,
pub(crate) slice_offset: i64,
pub(crate) slice_len: IdxSize,
pub(crate) parallel: bool,
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down
14 changes: 12 additions & 2 deletions polars/polars-lazy/src/physical_plan/executors/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ impl Executor for UnionExec {
}
let mut inputs = std::mem::take(&mut self.inputs);

if self.options.slice && self.options.slice_offset >= 0 {
let sliced_path = self.options.slice && self.options.slice_offset >= 0;

if self.options.parallel || sliced_path {
if state.verbose() {
println!("SLICE AT UNION: union is run sequentially")
if self.options.parallel {
println!("UNION: `parallel=false` union is run sequentially")
} else {
println!("UNION: `slice is set` union is run sequentially")
}
}

let mut offset = self.options.slice_offset as usize;
Expand All @@ -36,6 +42,10 @@ impl Executor for UnionExec {
state.branch_idx += idx;
let df = input.execute(&mut state)?;

if !sliced_path {
return Ok(Some(df));
}

Ok(if offset > df.height() {
offset -= df.height();
None
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/tests/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn test_cse_unions() -> PolarsResult<()> {

let lf1 = lf.clone().with_column(col("category").str().to_uppercase());

let lf = concat(&[lf1.clone(), lf, lf1], false)?
let lf = concat(&[lf1.clone(), lf, lf1], false, false)?
.select([col("category"), col("fats_g")])
.with_common_subplan_elimination(true);

Expand Down
2 changes: 1 addition & 1 deletion polars/tests/it/lazy/predicate_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ fn test_count_blocked_at_union_3963() -> PolarsResult<()> {
]?;

for rechunk in [true, false] {
let out = concat([lf1.clone(), lf2.clone()], rechunk)?
let out = concat([lf1.clone(), lf2.clone()], rechunk, true)?
.filter(count().over([col("k")]).gt(lit(1)))
.collect()?;

Expand Down
10 changes: 9 additions & 1 deletion py-polars/polars/internals/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def concat(
items: Sequence[pli.DataFrame],
rechunk: bool = True,
how: ConcatMethod = "vertical",
parallel: bool = True,
) -> pli.DataFrame:
...

Expand All @@ -55,6 +56,7 @@ def concat(
items: Sequence[pli.Series],
rechunk: bool = True,
how: ConcatMethod = "vertical",
parallel: bool = True,
) -> pli.Series:
...

Expand All @@ -64,6 +66,7 @@ def concat(
items: Sequence[pli.LazyFrame],
rechunk: bool = True,
how: ConcatMethod = "vertical",
parallel: bool = True,
) -> pli.LazyFrame:
...

Expand All @@ -73,6 +76,7 @@ def concat(
items: Sequence[pli.Expr],
rechunk: bool = True,
how: ConcatMethod = "vertical",
parallel: bool = True,
) -> pli.Expr:
...

Expand All @@ -86,6 +90,7 @@ def concat(
),
rechunk: bool = True,
how: ConcatMethod = "vertical",
parallel: bool = True,
) -> pli.DataFrame | pli.Series | pli.LazyFrame | pli.Expr:
"""
Aggregate multiple Dataframes/Series to a single DataFrame/Series.
Expand All @@ -104,6 +109,9 @@ def concat(
values with null.
- Horizontal: stacks Series horizontally and fills with nulls if the lengths
don't match.
parallel
Only relevant for LazyFrames. This determines if the concattenated
lazy computations may be executed in parallel.
Examples
--------
Expand Down Expand Up @@ -139,7 +147,7 @@ def concat(
f"how must be one of {{'vertical', 'diagonal'}}, got {how}"
)
elif isinstance(first, pli.LazyFrame):
return pli.wrap_ldf(_concat_lf(items, rechunk))
return pli.wrap_ldf(_concat_lf(items, rechunk, parallel))
elif isinstance(first, pli.Series):
out = pli.wrap_s(_concat_series(items))
elif isinstance(first, pli.Expr):
Expand Down
4 changes: 2 additions & 2 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ fn concat_df(dfs: &PyAny, py: Python) -> PyResult<PyDataFrame> {
}

#[pyfunction]
fn concat_lf(lfs: &PyAny, rechunk: bool) -> PyResult<PyLazyFrame> {
fn concat_lf(lfs: &PyAny, rechunk: bool, parallel: bool) -> PyResult<PyLazyFrame> {
let (seq, len) = get_pyseq(lfs)?;
let mut lfs = Vec::with_capacity(len);

Expand All @@ -308,7 +308,7 @@ fn concat_lf(lfs: &PyAny, rechunk: bool) -> PyResult<PyLazyFrame> {
lfs.push(lf);
}

let lf = polars::lazy::dsl::concat(lfs, rechunk).map_err(PyPolarsErr::from)?;
let lf = polars::lazy::dsl::concat(lfs, rechunk, parallel).map_err(PyPolarsErr::from)?;
Ok(lf.into())
}

Expand Down

0 comments on commit 4beb08e

Please sign in to comment.