diff --git a/python/dask_cudf/dask_cudf/core.py b/python/dask_cudf/dask_cudf/core.py index 0ba35460835..aa83bad7630 100644 --- a/python/dask_cudf/dask_cudf/core.py +++ b/python/dask_cudf/dask_cudf/core.py @@ -1,4 +1,5 @@ # Copyright (c) 2018-2020, NVIDIA CORPORATION. +import math import warnings from distutils.version import LooseVersion @@ -280,6 +281,7 @@ def var( split_every=False, dtype=None, out=None, + naive=False, ): axis = self._validate_axis(axis) meta = self._meta_nonempty.var(axis=axis, skipna=skipna) @@ -294,19 +296,10 @@ def var( ddof=ddof, ) return handle_out(out, result) - + elif naive: + return _naive_var(self, meta, skipna, ddof, split_every, out) else: - num = self._get_numeric_data() - x = 1.0 * num.sum(skipna=skipna, split_every=split_every) - x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every) - n = num.count(split_every=split_every) - name = self._token_prefix + "var" - result = map_partitions( - var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof - ) - if isinstance(self, DataFrame): - result.divisions = (min(self.columns), max(self.columns)) - return handle_out(out, result) + return _parallel_var(self, meta, skipna, split_every, out) def repartition(self, *args, **kwargs): """ Wraps dask.dataframe DataFrame.repartition method. @@ -375,7 +368,7 @@ class Series(_Frame, dd.core.Series): def count(self, split_every=False): return reduction( - self, + [self], chunk=M.count, aggregate=np.sum, split_every=split_every, @@ -396,6 +389,7 @@ def var( split_every=False, dtype=None, out=None, + naive=False, ): axis = self._validate_axis(axis) meta = self._meta_nonempty.var(axis=axis, skipna=skipna) @@ -410,19 +404,10 @@ def var( ddof=ddof, ) return handle_out(out, result) - + elif naive: + return _naive_var(self, meta, skipna, ddof, split_every, out) else: - num = self._get_numeric_data() - x = 1.0 * num.sum(skipna=skipna, split_every=split_every) - x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every) - n = num.count(split_every=split_every) - name = self._token_prefix + "var" - result = map_partitions( - var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof - ) - if isinstance(self, DataFrame): - result.divisions = (min(self.columns), max(self.columns)) - return handle_out(out, result) + return _parallel_var(self, meta, skipna, split_every, out) def groupby(self, *args, **kwargs): from .groupby import CudfSeriesGroupBy @@ -434,6 +419,86 @@ class Index(Series, dd.core.Index): _partition_type = cudf.Index +def _naive_var(ddf, meta, skipna, ddof, split_every, out): + num = ddf._get_numeric_data() + x = 1.0 * num.sum(skipna=skipna, split_every=split_every) + x2 = 1.0 * (num ** 2).sum(skipna=skipna, split_every=split_every) + n = num.count(split_every=split_every) + name = ddf._token_prefix + "var" + result = map_partitions( + var_aggregate, x2, x, n, token=name, meta=meta, ddof=ddof + ) + if isinstance(ddf, DataFrame): + result.divisions = (min(ddf.columns), max(ddf.columns)) + return handle_out(out, result) + + +def _parallel_var(ddf, meta, skipna, split_every, out): + def _local_var(x, skipna): + if skipna: + n = x.count(skipna=skipna) + avg = x.mean(skipna=skipna) + else: + # Not skipping nulls, so might as well + # avoid the full `count` operation + n = len(x) + avg = x.sum(skipna=skipna) / n + m2 = ((x - avg) ** 2).sum(skipna=skipna) + return n, avg, m2 + + def _aggregate_var(parts): + n, avg, m2 = parts[0] + for i in range(1, len(parts)): + n_a, avg_a, m2_a = n, avg, m2 + n_b, avg_b, m2_b = parts[i] + n = n_a + n_b + avg = (n_a * avg_a + n_b * avg_b) / n + delta = avg_b - avg_a + m2 = m2_a + m2_b + delta ** 2 * n_a * n_b / n + return n, avg, m2 + + def _finalize_var(vals): + n, _, m2 = vals + return m2 / (n - 1) + + # Build graph + nparts = ddf.npartitions + if not split_every: + split_every = nparts + name = "var-" + tokenize(skipna, split_every, out) + local_name = "local-" + name + num = ddf._get_numeric_data() + dsk = { + (local_name, n, 0): (_local_var, (num._name, n), skipna) + for n in range(nparts) + } + + # Use reduction tree + widths = [nparts] + while nparts > 1: + nparts = math.ceil(nparts / split_every) + widths.append(nparts) + height = len(widths) + for depth in range(1, height): + for group in range(widths[depth]): + p_max = widths[depth - 1] + lstart = split_every * group + lstop = min(lstart + split_every, p_max) + node_list = [ + (local_name, p, depth - 1) for p in range(lstart, lstop) + ] + dsk[(local_name, group, depth)] = (_aggregate_var, node_list) + if height == 1: + group = depth = 0 + dsk[(name, 0)] = (_finalize_var, (local_name, group, depth)) + + graph = HighLevelGraph.from_collections(name, dsk, dependencies=[num, ddf]) + result = dd.core.new_dd_object(graph, name, meta, (None, None)) + if isinstance(ddf, DataFrame): + result.divisions = (min(ddf.columns), max(ddf.columns)) + return handle_out(out, result) + + def _extract_meta(x): """ Extract internal cache data (``_meta``) from dask_cudf objects @@ -609,11 +674,8 @@ def reduction( meta = _emulate(apply, aggregate, [[meta_chunk]], aggregate_kwargs) meta = dd.core.make_meta(meta) - for arg in args: - if isinstance(arg, _Frame): - dsk.update(arg.dask) - - return dd.core.new_dd_object(dsk, b, meta, (None, None)) + graph = HighLevelGraph.from_collections(b, dsk, dependencies=args) + return dd.core.new_dd_object(graph, b, meta, (None, None)) def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None): diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index e2b77ba192e..e19fe016cc9 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -711,6 +711,19 @@ def test_dataframe_set_index(): assert_eq(ddf.compute(), pddf.compute()) +def test_series_describe(): + random.seed(0) + sr = cudf.datasets.randomdata(20)["x"] + psr = sr.to_pandas() + + dsr = dgd.from_cudf(sr, npartitions=4) + pdsr = dd.from_pandas(psr, npartitions=4) + + dd.assert_eq( + dsr.describe(), pdsr.describe(), check_less_precise=3, + ) + + def test_dataframe_describe(): random.seed(0) df = cudf.datasets.randomdata(20) @@ -724,6 +737,36 @@ def test_dataframe_describe(): ) +def test_zero_std_describe(): + num = 84886781 + df = cudf.DataFrame( + { + "x": np.full((20,), num, dtype=np.float64), + "y": np.full((20,), num, dtype=np.float64), + } + ) + pdf = df.to_pandas() + ddf = dgd.from_cudf(df, npartitions=4) + pddf = dd.from_pandas(pdf, npartitions=4) + + dd.assert_eq(ddf.describe(), pddf.describe(), check_less_precise=3) + + +def test_large_numbers_var(): + num = 8488678001 + df = cudf.DataFrame( + { + "x": np.arange(num, num + 1000, dtype=np.float64), + "y": np.arange(num, num + 1000, dtype=np.float64), + } + ) + pdf = df.to_pandas() + ddf = dgd.from_cudf(df, npartitions=4) + pddf = dd.from_pandas(pdf, npartitions=4) + + dd.assert_eq(ddf.var(), pddf.var(), check_less_precise=3) + + def test_index_map_partitions(): # https://github.com/rapidsai/cudf/issues/6738