Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve stability of dask_cudf.DataFrame.var and dask_cudf.DataFrame.std #7453

Merged
merged 12 commits into from
Feb 26, 2021
116 changes: 86 additions & 30 deletions python/dask_cudf/dask_cudf/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2018-2020, NVIDIA CORPORATION.
import math
import warnings
from distutils.version import LooseVersion

Expand Down Expand Up @@ -280,6 +281,7 @@ def var(
split_every=False,
dtype=None,
out=None,
naive=False,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the naive option once we can confirm that the new approach is more stable and avoids any performance degradation (so far, it seem performance is better anyway).

):
axis = self._validate_axis(axis)
meta = self._meta_nonempty.var(axis=axis, skipna=skipna)
Expand All @@ -294,19 +296,10 @@ def var(
ddof=ddof,
)
return handle_out(out, result)

elif naive:
return _naive_var(self, meta, skipna, ddof, split_every, out)
else:
num = self._get_numeric_data()
x = 1.0 * num.sum(skipna=skipna, split_every=split_every)
x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every)
n = num.count(split_every=split_every)
name = self._token_prefix + "var"
result = map_partitions(
var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof
)
if isinstance(self, DataFrame):
result.divisions = (min(self.columns), max(self.columns))
return handle_out(out, result)
return _parallel_var(self, meta, skipna, split_every, out)

def repartition(self, *args, **kwargs):
""" Wraps dask.dataframe DataFrame.repartition method.
Expand Down Expand Up @@ -375,7 +368,7 @@ class Series(_Frame, dd.core.Series):

def count(self, split_every=False):
return reduction(
self,
[self],
chunk=M.count,
aggregate=np.sum,
split_every=split_every,
Expand All @@ -396,6 +389,7 @@ def var(
split_every=False,
dtype=None,
out=None,
naive=False,
):
axis = self._validate_axis(axis)
meta = self._meta_nonempty.var(axis=axis, skipna=skipna)
Expand All @@ -410,19 +404,10 @@ def var(
ddof=ddof,
)
return handle_out(out, result)

elif naive:
return _naive_var(self, meta, skipna, ddof, split_every, out)
else:
num = self._get_numeric_data()
x = 1.0 * num.sum(skipna=skipna, split_every=split_every)
x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every)
n = num.count(split_every=split_every)
name = self._token_prefix + "var"
result = map_partitions(
var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof
)
if isinstance(self, DataFrame):
result.divisions = (min(self.columns), max(self.columns))
return handle_out(out, result)
return _parallel_var(self, meta, skipna, split_every, out)

def groupby(self, *args, **kwargs):
from .groupby import CudfSeriesGroupBy
Expand All @@ -434,6 +419,80 @@ class Index(Series, dd.core.Index):
_partition_type = cudf.Index


def _naive_var(ddf, meta, skipna, ddof, split_every, out):
num = ddf._get_numeric_data()
x = 1.0 * num.sum(skipna=skipna, split_every=split_every)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naive question: What does the 1.0 * do for us here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess (though Rick please feel free to correct me) this is needed to cast to a float.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question :)

This _naive_varfunciton is a copy-paste of the original code, so I didn't actually write it. However, my assumption was (as John suggested) that the 1.0 is meant to ensure that all results are cast to float.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha! I figured that might be it. Given that it's a copy-paste, I'm good with it, but in general I would call for an explicit float call if we're not already performing a necessary arithmetic operation. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually a single value? Suspect (though don't know) this is probably a Series or DataFrame, in which case calling float(...) on it won't work

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry; then in that case I'd advocate for a more explicit dtype conversion. Regardless, I'm not hung up on this for this particular PR. It's the sort of thing I'd insist on for new code, but I'm not too fussed about a copy-paste; it can always be cleaned up in a specific code clean-up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unclear if a scalar is also a possibility, in which case this is a little handy for allowing us to succinctly cover all those cases, but am not attached to the logic here either (and can see the argument for being more explicit)

Anyways sounds like we can table this for now. Would you like to open an issue and we can revisit later?

x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every)
n = num.count(split_every=split_every)
name = ddf._token_prefix + "var"
result = map_partitions(
var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof
)
if isinstance(ddf, DataFrame):
result.divisions = (min(ddf.columns), max(ddf.columns))
return handle_out(out, result)


def _parallel_var(ddf, meta, skipna, split_every, out):
def _local_var(x, skipna):
n = len(x)
# TODO: x.sum()/n seems to be faster than x.mean()
# on Quadro RTX 8000 - Need to compare on V/A100
avg = x.mean(skipna=skipna)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting result: I found that the new approach is slightly slower on my local RTX 8000-based machine unless I use x.sum()/n in place of x.mean(). I plan to get an environment set up on a V100-based machine soon. I will see if I get different behaviour on datacenter-grade hardware and update here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we hold off on merging until you report back on this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! And very surprising. If I can help dig into that one at all, please feel free to ping me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we hold off on merging until you report back on this?

Not sure - I should have an update in the next few hours (just set up a new env, and building cudf now). Should we remove the "naive" path entirely if the new approach has good performance? Either way, if mean is still slower than sum/n I will certainly welcome help from @wphicks :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to be careful here. x.mean() by default is not the same as x.sum()/len(x) if x has nulls and skipna is True (default)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also Nick's point above may explain some of the performance difference (handling nulls vs. not)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to be careful here. x.mean() by default is not the same as x.sum()/len(x) if x has nulls and skipna is True (default)

Ah - The case I was testing did not have nulls, but this is a good point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm that the "sum/n is faster than mean" statement was only true because n was a scalar value. If skipna=True, we actually need the full count for n, because there may be null values. In the skipna=True case, mean is faster (as expected).

Currently, the new var approach seems to be a bit slower than the original, so it may make sense to keep the naive=True option for now.

m2 = ((x - avg) ** 2).sum(skipna=skipna)
return n, avg, m2

def _aggregate_var(parts):
n, avg, m2 = parts[0]
for i in range(1, len(parts)):
n_a, avg_a, m2_a = n, avg, m2
n_b, avg_b, m2_b = parts[i]
n = n_a + n_b
avg = (n_a * avg_a + n_b * avg_b) / n
delta = avg_b - avg_a
m2 = m2_a + m2_b + delta ** 2 * n_a * n_b / n
return n, avg, m2

def _finalize_var(vals):
n, _, m2 = vals
return m2 / (n - 1)

# Build graph
nparts = ddf.npartitions
if not split_every:
split_every = nparts
name = "var-" + tokenize(skipna, split_every, out)
local_name = "local-" + name
num = ddf._get_numeric_data()
dsk = {
(local_name, n, 0): (_local_var, (num._name, n), skipna)
for n in range(nparts)
}

# Use reduction tree
widths = [nparts]
while nparts > 1:
nparts = math.ceil(nparts / split_every)
widths.append(nparts)
height = len(widths)
for depth in range(1, height):
for group in range(widths[depth]):
p_max = widths[depth - 1]
lstart = split_every * group
lstop = min(lstart + split_every, p_max)
node_list = [
(local_name, p, depth - 1) for p in range(lstart, lstop)
]
dsk[(local_name, group, depth)] = (_aggregate_var, node_list)
dsk[(name, 0)] = (_finalize_var, (local_name, group, depth))

graph = HighLevelGraph.from_collections(name, dsk, dependencies=[num, ddf])
result = dd.core.new_dd_object(graph, name, meta, (None, None))
if isinstance(ddf, DataFrame):
result.divisions = (min(ddf.columns), max(ddf.columns))
return handle_out(out, result)


def _extract_meta(x):
"""
Extract internal cache data (``_meta``) from dask_cudf objects
Expand Down Expand Up @@ -609,11 +668,8 @@ def reduction(
meta = _emulate(apply, aggregate, [[meta_chunk]], aggregate_kwargs)
meta = dd.core.make_meta(meta)

for arg in args:
if isinstance(arg, _Frame):
dsk.update(arg.dask)

return dd.core.new_dd_object(dsk, b, meta, (None, None))
graph = HighLevelGraph.from_collections(b, dsk, dependencies=args)
return dd.core.new_dd_object(graph, b, meta, (None, None))


def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None):
Expand Down
43 changes: 43 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,19 @@ def test_dataframe_set_index():
assert_eq(ddf.compute(), pddf.compute())


def test_series_describe():
random.seed(0)
sr = cudf.datasets.randomdata(20)["x"]
psr = sr.to_pandas()

dsr = dgd.from_cudf(sr, npartitions=4)
pdsr = dd.from_pandas(psr, npartitions=4)

dd.assert_eq(
dsr.describe(), pdsr.describe(), check_less_precise=3,
)


def test_dataframe_describe():
random.seed(0)
df = cudf.datasets.randomdata(20)
Expand All @@ -720,6 +733,36 @@ def test_dataframe_describe():
dd.assert_eq(ddf.describe(), pddf.describe(), check_less_precise=3)


def test_zero_std_describe():
num = 84886781
df = cudf.DataFrame(
{
"x": np.full((20,), num, dtype=np.float),
"y": np.full((20,), num, dtype=np.float),
}
)
pdf = df.to_pandas()
ddf = dgd.from_cudf(df, npartitions=4)
pddf = dd.from_pandas(pdf, npartitions=4)

dd.assert_eq(ddf.describe(), pddf.describe(), check_less_precise=3)


def test_large_numbers_describe():
num = 8488678001
df = cudf.DataFrame(
{
"x": np.arange(num, num + 1000, dtype=np.float),
"y": np.arange(num, num + 1000, dtype=np.float),
}
)
pdf = df.to_pandas()
ddf = dgd.from_cudf(df, npartitions=4)
pddf = dd.from_pandas(pdf, npartitions=4)

dd.assert_eq(ddf.describe(), pddf.describe(), check_less_precise=3)


def test_index_map_partitions():
# https://github.com/rapidsai/cudf/issues/6738

Expand Down