Skip to content

Commit

Permalink
Improve stability of dask_cudf.DataFrame.var and dask_cudf.DataFrame.…
Browse files Browse the repository at this point in the history
…std (#7453)

Closes #7402 

This PR improves the numerical stability of the `var` (and indirectly `std`) methods in `DataFrame` and `Series`.  As discussed in #7402, the existing (naive) approach is problematic for large numbers with relatively small var/std.

Note that follow-up work may be needed to improve the algorithm(s) in groupby.

Authors:
  - Richard (Rick) Zamora (@rjzamora)

Approvers:
  - William Hicks (@wphicks)
  - Peter Andreas Entschev (@pentschev)
  - Keith Kraus (@kkraus14)
  - @jakirkham

URL: #7453
  • Loading branch information
rjzamora authored Feb 26, 2021
1 parent d0a5ec8 commit f79a841
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 30 deletions.
122 changes: 92 additions & 30 deletions python/dask_cudf/dask_cudf/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2018-2020, NVIDIA CORPORATION.
import math
import warnings
from distutils.version import LooseVersion

Expand Down Expand Up @@ -280,6 +281,7 @@ def var(
split_every=False,
dtype=None,
out=None,
naive=False,
):
axis = self._validate_axis(axis)
meta = self._meta_nonempty.var(axis=axis, skipna=skipna)
Expand All @@ -294,19 +296,10 @@ def var(
ddof=ddof,
)
return handle_out(out, result)

elif naive:
return _naive_var(self, meta, skipna, ddof, split_every, out)
else:
num = self._get_numeric_data()
x = 1.0 * num.sum(skipna=skipna, split_every=split_every)
x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every)
n = num.count(split_every=split_every)
name = self._token_prefix + "var"
result = map_partitions(
var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof
)
if isinstance(self, DataFrame):
result.divisions = (min(self.columns), max(self.columns))
return handle_out(out, result)
return _parallel_var(self, meta, skipna, split_every, out)

def repartition(self, *args, **kwargs):
""" Wraps dask.dataframe DataFrame.repartition method.
Expand Down Expand Up @@ -375,7 +368,7 @@ class Series(_Frame, dd.core.Series):

def count(self, split_every=False):
return reduction(
self,
[self],
chunk=M.count,
aggregate=np.sum,
split_every=split_every,
Expand All @@ -396,6 +389,7 @@ def var(
split_every=False,
dtype=None,
out=None,
naive=False,
):
axis = self._validate_axis(axis)
meta = self._meta_nonempty.var(axis=axis, skipna=skipna)
Expand All @@ -410,19 +404,10 @@ def var(
ddof=ddof,
)
return handle_out(out, result)

elif naive:
return _naive_var(self, meta, skipna, ddof, split_every, out)
else:
num = self._get_numeric_data()
x = 1.0 * num.sum(skipna=skipna, split_every=split_every)
x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every)
n = num.count(split_every=split_every)
name = self._token_prefix + "var"
result = map_partitions(
var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof
)
if isinstance(self, DataFrame):
result.divisions = (min(self.columns), max(self.columns))
return handle_out(out, result)
return _parallel_var(self, meta, skipna, split_every, out)

def groupby(self, *args, **kwargs):
from .groupby import CudfSeriesGroupBy
Expand All @@ -434,6 +419,86 @@ class Index(Series, dd.core.Index):
_partition_type = cudf.Index


def _naive_var(ddf, meta, skipna, ddof, split_every, out):
num = ddf._get_numeric_data()
x = 1.0 * num.sum(skipna=skipna, split_every=split_every)
x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every)
n = num.count(split_every=split_every)
name = ddf._token_prefix + "var"
result = map_partitions(
var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof
)
if isinstance(ddf, DataFrame):
result.divisions = (min(ddf.columns), max(ddf.columns))
return handle_out(out, result)


def _parallel_var(ddf, meta, skipna, split_every, out):
def _local_var(x, skipna):
if skipna:
n = x.count(skipna=skipna)
avg = x.mean(skipna=skipna)
else:
# Not skipping nulls, so might as well
# avoid the full `count` operation
n = len(x)
avg = x.sum(skipna=skipna) / n
m2 = ((x - avg) ** 2).sum(skipna=skipna)
return n, avg, m2

def _aggregate_var(parts):
n, avg, m2 = parts[0]
for i in range(1, len(parts)):
n_a, avg_a, m2_a = n, avg, m2
n_b, avg_b, m2_b = parts[i]
n = n_a + n_b
avg = (n_a * avg_a + n_b * avg_b) / n
delta = avg_b - avg_a
m2 = m2_a + m2_b + delta ** 2 * n_a * n_b / n
return n, avg, m2

def _finalize_var(vals):
n, _, m2 = vals
return m2 / (n - 1)

# Build graph
nparts = ddf.npartitions
if not split_every:
split_every = nparts
name = "var-" + tokenize(skipna, split_every, out)
local_name = "local-" + name
num = ddf._get_numeric_data()
dsk = {
(local_name, n, 0): (_local_var, (num._name, n), skipna)
for n in range(nparts)
}

# Use reduction tree
widths = [nparts]
while nparts > 1:
nparts = math.ceil(nparts / split_every)
widths.append(nparts)
height = len(widths)
for depth in range(1, height):
for group in range(widths[depth]):
p_max = widths[depth - 1]
lstart = split_every * group
lstop = min(lstart + split_every, p_max)
node_list = [
(local_name, p, depth - 1) for p in range(lstart, lstop)
]
dsk[(local_name, group, depth)] = (_aggregate_var, node_list)
if height == 1:
group = depth = 0
dsk[(name, 0)] = (_finalize_var, (local_name, group, depth))

graph = HighLevelGraph.from_collections(name, dsk, dependencies=[num, ddf])
result = dd.core.new_dd_object(graph, name, meta, (None, None))
if isinstance(ddf, DataFrame):
result.divisions = (min(ddf.columns), max(ddf.columns))
return handle_out(out, result)


def _extract_meta(x):
"""
Extract internal cache data (``_meta``) from dask_cudf objects
Expand Down Expand Up @@ -609,11 +674,8 @@ def reduction(
meta = _emulate(apply, aggregate, [[meta_chunk]], aggregate_kwargs)
meta = dd.core.make_meta(meta)

for arg in args:
if isinstance(arg, _Frame):
dsk.update(arg.dask)

return dd.core.new_dd_object(dsk, b, meta, (None, None))
graph = HighLevelGraph.from_collections(b, dsk, dependencies=args)
return dd.core.new_dd_object(graph, b, meta, (None, None))


def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None):
Expand Down
43 changes: 43 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,19 @@ def test_dataframe_set_index():
assert_eq(ddf.compute(), pddf.compute())


def test_series_describe():
random.seed(0)
sr = cudf.datasets.randomdata(20)["x"]
psr = sr.to_pandas()

dsr = dgd.from_cudf(sr, npartitions=4)
pdsr = dd.from_pandas(psr, npartitions=4)

dd.assert_eq(
dsr.describe(), pdsr.describe(), check_less_precise=3,
)


def test_dataframe_describe():
random.seed(0)
df = cudf.datasets.randomdata(20)
Expand All @@ -724,6 +737,36 @@ def test_dataframe_describe():
)


def test_zero_std_describe():
num = 84886781
df = cudf.DataFrame(
{
"x": np.full((20,), num, dtype=np.float64),
"y": np.full((20,), num, dtype=np.float64),
}
)
pdf = df.to_pandas()
ddf = dgd.from_cudf(df, npartitions=4)
pddf = dd.from_pandas(pdf, npartitions=4)

dd.assert_eq(ddf.describe(), pddf.describe(), check_less_precise=3)


def test_large_numbers_var():
num = 8488678001
df = cudf.DataFrame(
{
"x": np.arange(num, num + 1000, dtype=np.float64),
"y": np.arange(num, num + 1000, dtype=np.float64),
}
)
pdf = df.to_pandas()
ddf = dgd.from_cudf(df, npartitions=4)
pddf = dd.from_pandas(pdf, npartitions=4)

dd.assert_eq(ddf.var(), pddf.var(), check_less_precise=3)


def test_index_map_partitions():
# https://github.com/rapidsai/cudf/issues/6738

Expand Down

0 comments on commit f79a841

Please sign in to comment.