Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve stability of dask_cudf.DataFrame.var and dask_cudf.DataFrame.std #7453

Merged
merged 12 commits into from
Feb 26, 2021
122 changes: 92 additions & 30 deletions python/dask_cudf/dask_cudf/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2018-2020, NVIDIA CORPORATION.
import math
import warnings
from distutils.version import LooseVersion

Expand Down Expand Up @@ -280,6 +281,7 @@ def var(
split_every=False,
dtype=None,
out=None,
naive=False,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the naive option once we can confirm that the new approach is more stable and avoids any performance degradation (so far, it seem performance is better anyway).

):
axis = self._validate_axis(axis)
meta = self._meta_nonempty.var(axis=axis, skipna=skipna)
Expand All @@ -294,19 +296,10 @@ def var(
ddof=ddof,
)
return handle_out(out, result)

elif naive:
return _naive_var(self, meta, skipna, ddof, split_every, out)
else:
num = self._get_numeric_data()
x = 1.0 * num.sum(skipna=skipna, split_every=split_every)
x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every)
n = num.count(split_every=split_every)
name = self._token_prefix + "var"
result = map_partitions(
var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof
)
if isinstance(self, DataFrame):
result.divisions = (min(self.columns), max(self.columns))
return handle_out(out, result)
return _parallel_var(self, meta, skipna, split_every, out)

def repartition(self, *args, **kwargs):
""" Wraps dask.dataframe DataFrame.repartition method.
Expand Down Expand Up @@ -375,7 +368,7 @@ class Series(_Frame, dd.core.Series):

def count(self, split_every=False):
return reduction(
self,
[self],
chunk=M.count,
aggregate=np.sum,
split_every=split_every,
Expand All @@ -396,6 +389,7 @@ def var(
split_every=False,
dtype=None,
out=None,
naive=False,
):
axis = self._validate_axis(axis)
meta = self._meta_nonempty.var(axis=axis, skipna=skipna)
Expand All @@ -410,19 +404,10 @@ def var(
ddof=ddof,
)
return handle_out(out, result)

elif naive:
return _naive_var(self, meta, skipna, ddof, split_every, out)
else:
num = self._get_numeric_data()
x = 1.0 * num.sum(skipna=skipna, split_every=split_every)
x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every)
n = num.count(split_every=split_every)
name = self._token_prefix + "var"
result = map_partitions(
var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof
)
if isinstance(self, DataFrame):
result.divisions = (min(self.columns), max(self.columns))
return handle_out(out, result)
return _parallel_var(self, meta, skipna, split_every, out)

def groupby(self, *args, **kwargs):
from .groupby import CudfSeriesGroupBy
Expand All @@ -434,6 +419,86 @@ class Index(Series, dd.core.Index):
_partition_type = cudf.Index


def _naive_var(ddf, meta, skipna, ddof, split_every, out):
num = ddf._get_numeric_data()
x = 1.0 * num.sum(skipna=skipna, split_every=split_every)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naive question: What does the 1.0 * do for us here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess (though Rick please feel free to correct me) this is needed to cast to a float.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question :)

This _naive_varfunciton is a copy-paste of the original code, so I didn't actually write it. However, my assumption was (as John suggested) that the 1.0 is meant to ensure that all results are cast to float.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha! I figured that might be it. Given that it's a copy-paste, I'm good with it, but in general I would call for an explicit float call if we're not already performing a necessary arithmetic operation. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually a single value? Suspect (though don't know) this is probably a Series or DataFrame, in which case calling float(...) on it won't work

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry; then in that case I'd advocate for a more explicit dtype conversion. Regardless, I'm not hung up on this for this particular PR. It's the sort of thing I'd insist on for new code, but I'm not too fussed about a copy-paste; it can always be cleaned up in a specific code clean-up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unclear if a scalar is also a possibility, in which case this is a little handy for allowing us to succinctly cover all those cases, but am not attached to the logic here either (and can see the argument for being more explicit)

Anyways sounds like we can table this for now. Would you like to open an issue and we can revisit later?

x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every)
n = num.count(split_every=split_every)
name = ddf._token_prefix + "var"
result = map_partitions(
var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof
)
if isinstance(ddf, DataFrame):
result.divisions = (min(ddf.columns), max(ddf.columns))
return handle_out(out, result)


def _parallel_var(ddf, meta, skipna, split_every, out):
def _local_var(x, skipna):
if skipna:
n = x.count(skipna=skipna)
avg = x.mean(skipna=skipna)
else:
# Not skipping nulls, so might as well
# avoid the full `count` operation
n = len(x)
avg = x.sum(skipna=skipna) / n
m2 = ((x - avg) ** 2).sum(skipna=skipna)
return n, avg, m2

def _aggregate_var(parts):
n, avg, m2 = parts[0]
for i in range(1, len(parts)):
n_a, avg_a, m2_a = n, avg, m2
n_b, avg_b, m2_b = parts[i]
n = n_a + n_b
avg = (n_a * avg_a + n_b * avg_b) / n
delta = avg_b - avg_a
m2 = m2_a + m2_b + delta ** 2 * n_a * n_b / n
return n, avg, m2

def _finalize_var(vals):
n, _, m2 = vals
return m2 / (n - 1)

# Build graph
nparts = ddf.npartitions
if not split_every:
split_every = nparts
name = "var-" + tokenize(skipna, split_every, out)
local_name = "local-" + name
num = ddf._get_numeric_data()
dsk = {
(local_name, n, 0): (_local_var, (num._name, n), skipna)
for n in range(nparts)
}

# Use reduction tree
widths = [nparts]
while nparts > 1:
nparts = math.ceil(nparts / split_every)
widths.append(nparts)
height = len(widths)
for depth in range(1, height):
for group in range(widths[depth]):
p_max = widths[depth - 1]
lstart = split_every * group
lstop = min(lstart + split_every, p_max)
node_list = [
(local_name, p, depth - 1) for p in range(lstart, lstop)
]
dsk[(local_name, group, depth)] = (_aggregate_var, node_list)
if height == 1:
group = depth = 0
dsk[(name, 0)] = (_finalize_var, (local_name, group, depth))

graph = HighLevelGraph.from_collections(name, dsk, dependencies=[num, ddf])
result = dd.core.new_dd_object(graph, name, meta, (None, None))
if isinstance(ddf, DataFrame):
result.divisions = (min(ddf.columns), max(ddf.columns))
return handle_out(out, result)


def _extract_meta(x):
"""
Extract internal cache data (``_meta``) from dask_cudf objects
Expand Down Expand Up @@ -609,11 +674,8 @@ def reduction(
meta = _emulate(apply, aggregate, [[meta_chunk]], aggregate_kwargs)
meta = dd.core.make_meta(meta)

for arg in args:
if isinstance(arg, _Frame):
dsk.update(arg.dask)

return dd.core.new_dd_object(dsk, b, meta, (None, None))
graph = HighLevelGraph.from_collections(b, dsk, dependencies=args)
return dd.core.new_dd_object(graph, b, meta, (None, None))


def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None):
Expand Down
43 changes: 43 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,19 @@ def test_dataframe_set_index():
assert_eq(ddf.compute(), pddf.compute())


def test_series_describe():
random.seed(0)
sr = cudf.datasets.randomdata(20)["x"]
psr = sr.to_pandas()

dsr = dgd.from_cudf(sr, npartitions=4)
pdsr = dd.from_pandas(psr, npartitions=4)

dd.assert_eq(
dsr.describe(), pdsr.describe(), check_less_precise=3,
)


def test_dataframe_describe():
random.seed(0)
df = cudf.datasets.randomdata(20)
Expand All @@ -720,6 +733,36 @@ def test_dataframe_describe():
dd.assert_eq(ddf.describe(), pddf.describe(), check_less_precise=3)


def test_zero_std_describe():
num = 84886781
df = cudf.DataFrame(
{
"x": np.full((20,), num, dtype=np.float64),
"y": np.full((20,), num, dtype=np.float64),
}
)
pdf = df.to_pandas()
ddf = dgd.from_cudf(df, npartitions=4)
pddf = dd.from_pandas(pdf, npartitions=4)

dd.assert_eq(ddf.describe(), pddf.describe(), check_less_precise=3)


def test_large_numbers_describe():
num = 8488678001
df = cudf.DataFrame(
{
"x": np.arange(num, num + 1000, dtype=np.float64),
"y": np.arange(num, num + 1000, dtype=np.float64),
}
)
pdf = df.to_pandas()
ddf = dgd.from_cudf(df, npartitions=4)
pddf = dd.from_pandas(pdf, npartitions=4)

dd.assert_eq(ddf.describe(), pddf.describe(), check_less_precise=3)


def test_index_map_partitions():
# https://github.com/rapidsai/cudf/issues/6738

Expand Down