From 7d52970a37e71ad70429997d99dd2ee274a428ff Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 27 Jan 2021 18:19:49 -0800 Subject: [PATCH 1/5] Support `numeric_only` field for `rank()` (#7213) Closes #7174 This PR adds support for `numeric_only` field for `Dataframe.rank()` and `Series.rank()`. When user specifies `numeric_only=True`, only the numerical data type columns are selected to construct a cudf object and passed to lower level for processing. Two minor refactors are also included in this PR: - This PR refactors internal API of `Frame._get_columns_by_label`, which now supports dispatching to this method from both `Dataframe` and `Series`. - This PR refactors `test_rank.py`, moving test functions inside class `TestRank` out as top level functions. All test variables shared among test cases are moved to a `pytests.fixture` method. A `Dataframe.rank` test case that expects to raise due to a [pandas bug](https://github.com/pandas-dev/pandas/issues/32593) is now captured under `pytest.raises`. Authors: - Michael Wang (@isVoid) Approvers: - Ashwin Srinath (@shwina) - @brandon-b-miller URL: https://github.com/rapidsai/cudf/pull/7213 --- python/cudf/cudf/core/dataframe.py | 20 ++ python/cudf/cudf/core/frame.py | 29 ++- python/cudf/cudf/core/series.py | 14 ++ python/cudf/cudf/tests/test_rank.py | 281 ++++++++++++++-------------- 4 files changed, 183 insertions(+), 161 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index e5626190098..d4b7a042757 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -1318,6 +1318,26 @@ def _repr_html_(self): def _repr_latex_(self): return self._get_renderable_dataframe().to_pandas()._repr_latex_() + def _get_columns_by_label(self, labels, downcast=False): + """ + Return columns of dataframe by `labels` + + If downcast is True, try and downcast from a DataFrame to a Series + """ + new_data = super()._get_columns_by_label(labels, downcast) + if downcast: + if is_scalar(labels): + nlevels = 1 + elif isinstance(labels, tuple): + nlevels = len(labels) + if self._data.multiindex is False or nlevels == self._data.nlevels: + return self._constructor_sliced( + new_data, name=labels, index=self.index + ) + return self._constructor( + new_data, columns=new_data.to_pandas_index(), index=self.index + ) + # unary, binary, rbinary, orderedcompare, unorderedcompare def _apply_op(self, fn, other=None, fill_value=None): diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 3d12ac2e6cc..5bc5675e1e6 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -483,25 +483,12 @@ def equals(self, other, **kwargs): else: return self._index.equals(other._index) - def _get_columns_by_label(self, labels, downcast=False): + def _get_columns_by_label(self, labels, downcast): """ Returns columns of the Frame specified by `labels` - If downcast is True, try and downcast from a DataFrame to a Series - """ - new_data = self._data.select_by_label(labels) - if downcast: - if is_scalar(labels): - nlevels = 1 - elif isinstance(labels, tuple): - nlevels = len(labels) - if self._data.multiindex is False or nlevels == self._data.nlevels: - return self._constructor_sliced( - new_data, name=labels, index=self.index - ) - return self._constructor( - new_data, columns=new_data.to_pandas_index(), index=self.index - ) + """ + return self._data.select_by_label(labels) def _get_columns_by_index(self, indices): """ @@ -1643,10 +1630,16 @@ def rank( "na_option must be one of 'keep', 'top', or 'bottom'" ) - # TODO code for selecting numeric columns source = self if numeric_only: - warnings.warn("numeric_only=True is not implemented yet") + numeric_cols = ( + name + for name in self._data.names + if is_numerical_dtype(self._data[name]) + ) + source = self._get_columns_by_label(numeric_cols) + if source.empty: + return source.astype("float64") out_rank_table = libcudf.sort.rank_columns( source, method_enum, na_option, ascending, pct diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index dfc687eb76d..2e96c582b0a 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -405,6 +405,20 @@ def _copy_construct(self, **kwargs): params.update(kwargs) return cls(**params) + def _get_columns_by_label(self, labels, downcast=False): + """Return the column specified by `labels` + + For cudf.Series, either the column, or an empty series is returned. + Parameter `downcast` does not have effects. + """ + new_data = super()._get_columns_by_label(labels, downcast) + + return ( + self._constructor(data=new_data, index=self.index) + if len(new_data) > 0 + else self._constructor(dtype=self.dtype, name=self.name) + ) + @classmethod def from_arrow(cls, array): """ diff --git a/python/cudf/cudf/tests/test_rank.py b/python/cudf/cudf/tests/test_rank.py index eb1e63f73ed..c86b2c61aa5 100644 --- a/python/cudf/cudf/tests/test_rank.py +++ b/python/cudf/cudf/tests/test_rank.py @@ -10,152 +10,147 @@ from cudf.tests.utils import assert_eq, assert_exceptions_equal -class TestRank: - index = np.array([5, 4, 3, 2, 1, 6, 7, 8, 9, 10]) - col1 = np.array([5, 4, 3, 5, 8, 5, 2, 1, 6, 6]) - col2 = np.array([5, 4, np.nan, 5, 8, 5, np.inf, np.nan, 6, -np.inf]) - - @pytest.mark.parametrize("dtype", ["O", "f8", "i4"]) - @pytest.mark.parametrize("ascending", [True, False]) - @pytest.mark.parametrize( - "method", ["average", "min", "max", "first", "dense"] - ) - @pytest.mark.parametrize("na_option", ["keep", "top", "bottom"]) - @pytest.mark.parametrize("pct", [True, False]) - def test_rank_all_arguments( - self, dtype, ascending, method, na_option, pct - ): - if method == "first" and dtype == "O": - # not supported by pandas - return - pdf = pd.DataFrame(index=self.index) - pdf["col1"] = self.col1.astype(dtype) - pdf["col2"] = self.col2.astype(dtype) - gdf = DataFrame.from_pandas(pdf) - - def _check(gs, ps, method, na_option, ascending, pct): - ranked_gs = gs.rank( - method=method, - na_option=na_option, - ascending=ascending, - pct=pct, - ) - ranked_ps = ps.rank( - method=method, - na_option=na_option, - ascending=ascending, - pct=pct, - ) - assert_eq(ranked_ps, ranked_gs.to_pandas()) - - # # Series - _check( - gdf["col1"], - pdf["col1"], - method=method, - na_option=na_option, - ascending=ascending, - pct=pct, - ) - _check( - gdf["col2"], - pdf["col2"], - method=method, - na_option=na_option, - ascending=ascending, - pct=pct, - ) - # TODO: https://github.com/pandas-dev/pandas/issues/32593 - # Dataframe (bug in pandas) - # _check( - # gdf, - # pdf, - # method=method, - # na_option=na_option, - # ascending=ascending, - # pct=pct, - # ) - - def test_rank_error_arguments(self): - pdf = pd.DataFrame(index=self.index) - pdf["col1"] = self.col1 - pdf["col2"] = self.col2 - gdf = DataFrame.from_pandas(pdf) - - assert_exceptions_equal( - lfunc=pdf["col1"].rank, - rfunc=gdf["col1"].rank, - lfunc_args_and_kwargs=( - [], - { - "method": "randomname", - "na_option": "keep", - "ascending": True, - "pct": True, - }, +@pytest.fixture +def pdf(): + return pd.DataFrame( + { + "col1": np.array([5, 4, 3, 5, 8, 5, 2, 1, 6, 6]), + "col2": np.array( + [5, 4, np.nan, 5, 8, 5, np.inf, np.nan, 6, -np.inf] ), - rfunc_args_and_kwargs=( - [], - { - "method": "randomname", - "na_option": "keep", - "ascending": True, - "pct": True, - }, - ), - ) + }, + index=np.array([5, 4, 3, 2, 1, 6, 7, 8, 9, 10]), + ) - assert_exceptions_equal( - lfunc=pdf["col1"].rank, - rfunc=gdf["col1"].rank, - lfunc_args_and_kwargs=( - [], - { - "method": "first", - "na_option": "randomname", - "ascending": True, - "pct": True, - }, - ), - rfunc_args_and_kwargs=( - [], - { - "method": "first", - "na_option": "randomname", - "ascending": True, - "pct": True, - }, - ), + +@pytest.mark.parametrize("dtype", ["O", "f8", "i4"]) +@pytest.mark.parametrize("ascending", [True, False]) +@pytest.mark.parametrize("method", ["average", "min", "max", "first", "dense"]) +@pytest.mark.parametrize("na_option", ["keep", "top", "bottom"]) +@pytest.mark.parametrize("pct", [True, False]) +@pytest.mark.parametrize("numeric_only", [True, False]) +def test_rank_all_arguments( + pdf, dtype, ascending, method, na_option, pct, numeric_only +): + if method == "first" and dtype == "O": + # not supported by pandas + return + + pdf = pdf.copy(deep=True) # for parallel pytest + if numeric_only: + pdf["str"] = np.array( + ["a", "b", "c", "d", "e", "1", "2", "3", "4", "5"] ) + gdf = DataFrame.from_pandas(pdf) + + kwargs = { + "method": method, + "na_option": na_option, + "ascending": ascending, + "pct": pct, + "numeric_only": numeric_only, + } + + # Series + assert_eq(gdf["col1"].rank(**kwargs), pdf["col1"].rank(**kwargs)) + assert_eq(gdf["col2"].rank(**kwargs), pdf["col2"].rank(**kwargs)) + if numeric_only: + expect = pdf["str"].rank(**kwargs) + got = gdf["str"].rank(**kwargs) + assert expect.empty == got.empty + + # TODO: https://github.com/pandas-dev/pandas/issues/32593 + # Dataframe (bug in pandas) + if ( + na_option == "top" + and method == "first" + and not dtype == "O" + and ascending + ): + assert_eq(gdf.rank(**kwargs), pdf.rank(**kwargs)) + else: + with pytest.raises(AssertionError, match="values are different"): + assert_eq(gdf.rank(**kwargs), pdf.rank(**kwargs)) + + +def test_rank_error_arguments(pdf): + gdf = DataFrame.from_pandas(pdf) + + assert_exceptions_equal( + lfunc=pdf["col1"].rank, + rfunc=gdf["col1"].rank, + lfunc_args_and_kwargs=( + [], + { + "method": "randomname", + "na_option": "keep", + "ascending": True, + "pct": True, + }, + ), + rfunc_args_and_kwargs=( + [], + { + "method": "randomname", + "na_option": "keep", + "ascending": True, + "pct": True, + }, + ), + ) - sort_group_args = [ - np.full((3,), np.nan), - 100 * np.random.random(10), - np.full((3,), np.inf), - np.full((3,), -np.inf), - ] - sort_dtype_args = [np.int32, np.float32, np.float64] - # TODO: np.int64, disabled because of bug - # https://github.com/pandas-dev/pandas/issues/32859 - - @pytest.mark.parametrize( - "elem,dtype", - list( - product( - combinations_with_replacement(sort_group_args, 4), - sort_dtype_args, - ) + assert_exceptions_equal( + lfunc=pdf["col1"].rank, + rfunc=gdf["col1"].rank, + lfunc_args_and_kwargs=( + [], + { + "method": "first", + "na_option": "randomname", + "ascending": True, + "pct": True, + }, ), + rfunc_args_and_kwargs=( + [], + { + "method": "first", + "na_option": "randomname", + "ascending": True, + "pct": True, + }, + ), + ) + + +sort_group_args = [ + np.full((3,), np.nan), + 100 * np.random.random(10), + np.full((3,), np.inf), + np.full((3,), -np.inf), +] +sort_dtype_args = [np.int32, np.float32, np.float64] +# TODO: np.int64, disabled because of bug +# https://github.com/pandas-dev/pandas/issues/32859 + + +@pytest.mark.parametrize( + "elem,dtype", + list( + product( + combinations_with_replacement(sort_group_args, 4), sort_dtype_args, + ) + ), +) +def test_series_rank_combinations(elem, dtype): + np.random.seed(0) + gdf = DataFrame() + gdf["a"] = aa = np.fromiter(chain.from_iterable(elem), np.float64).astype( + dtype ) - def test_series_rank_combinations(self, elem, dtype): - np.random.seed(0) - gdf = DataFrame() - gdf["a"] = aa = np.fromiter( - chain.from_iterable(elem), np.float64 - ).astype(dtype) - ranked_gs = gdf["a"].rank(method="first") - df = pd.DataFrame() - df["a"] = aa - ranked_ps = df["a"].rank(method="first") - # Check - assert_eq(ranked_ps, ranked_gs.to_pandas()) + ranked_gs = gdf["a"].rank(method="first") + df = pd.DataFrame() + df["a"] = aa + ranked_ps = df["a"].rank(method="first") + # Check + assert_eq(ranked_ps, ranked_gs.to_pandas()) From ab345805623f2f204bfa565a3f7238fda10798c2 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Thu, 28 Jan 2021 09:37:02 -0600 Subject: [PATCH 2/5] Fix test column vector leak (#7238) https://github.com/rapidsai/cudf/pull/7125 added a test column vector leak. This PR fixes this minor leak. Authors: - Kuhu Shukla (@kuhushukla) Approvers: - Jason Lowe (@jlowe) - Thomas Graves (@tgravescs) URL: https://github.com/rapidsai/cudf/pull/7238 --- .../test/java/ai/rapids/cudf/ColumnVectorTest.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java b/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java index 582b67b8287..7806bd1797b 100644 --- a/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java +++ b/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java @@ -2909,7 +2909,8 @@ void testListContainsString() { try (ColumnVector v = ColumnVector.fromLists(new HostColumnVector.ListType(true, new HostColumnVector.BasicType(true, DType.STRING)), list1, list2, list3, list4, list5); ColumnVector expected = ColumnVector.fromBoxedBooleans(true, false, true, null, null); - ColumnVector result = v.listContains(Scalar.fromString("thésé"))) { + Scalar strScalar = Scalar.fromString("thésé"); + ColumnVector result = v.listContains(strScalar)) { assertColumnsAreEqual(expected, result); } } @@ -2923,7 +2924,8 @@ void testListContainsInt() { try (ColumnVector v = ColumnVector.fromLists(new HostColumnVector.ListType(true, new HostColumnVector.BasicType(true, DType.INT32)), list1, list2, list3, list4); ColumnVector expected = ColumnVector.fromBoxedBooleans(false, false, true, null); - ColumnVector result = v.listContains(Scalar.fromInt(7))) { + Scalar intScalar = Scalar.fromInt(7); + ColumnVector result = v.listContains(intScalar)) { assertColumnsAreEqual(expected, result); } } @@ -2939,8 +2941,8 @@ void testListContainsStringCol() { try (ColumnVector v = ColumnVector.fromLists(new HostColumnVector.ListType(true, new HostColumnVector.BasicType(true, DType.STRING)), list1, list2, list3, list4, list5, list6); ColumnVector expected = ColumnVector.fromBoxedBooleans(true, true, true, true, null, null); - ColumnVector result = v.listContainsColumn( - ColumnVector.fromStrings("thésé", "", "test", "test", "iotA", null))) { + ColumnVector strCol = ColumnVector.fromStrings("thésé", "", "test", "test", "iotA", null); + ColumnVector result = v.listContainsColumn(strCol)) { assertColumnsAreEqual(expected, result); } } @@ -2955,7 +2957,8 @@ void testListContainsIntCol() { try (ColumnVector v = ColumnVector.fromLists(new HostColumnVector.ListType(true, new HostColumnVector.BasicType(true, DType.INT32)), list1, list2, list3, list4, list5); ColumnVector expected = ColumnVector.fromBoxedBooleans(true, false, true, null, null); - ColumnVector result = v.listContainsColumn(ColumnVector.fromBoxedInts(3, 3, 8, 3, null))) { + ColumnVector intCol = ColumnVector.fromBoxedInts(3, 3, 8, 3, null); + ColumnVector result = v.listContainsColumn(intCol)) { assertColumnsAreEqual(expected, result); } } From 02166dab827779b31f7b71aeec1b20bf319ae004 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 28 Jan 2021 09:41:54 -0600 Subject: [PATCH 3/5] Fix some bugs in java scalar support for decimal (#7237) This fixes some bugs in the java support for decimal scalar values. They are fairly minor but prevented me from doing some debugging earlier, and could impact tests in the future. Authors: - Robert (Bobby) Evans (@revans2) Approvers: - Jason Lowe (@jlowe) URL: https://github.com/rapidsai/cudf/pull/7237 --- java/src/main/java/ai/rapids/cudf/Scalar.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/Scalar.java b/java/src/main/java/ai/rapids/cudf/Scalar.java index 20222b5ada8..50aa6dc1cc8 100644 --- a/java/src/main/java/ai/rapids/cudf/Scalar.java +++ b/java/src/main/java/ai/rapids/cudf/Scalar.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -525,6 +525,7 @@ public boolean equals(Object o) { case INT32: case UINT32: case TIMESTAMP_DAYS: + case DECIMAL32: return getInt() == other.getInt(); case FLOAT32: return getFloat() == other.getFloat(); @@ -536,7 +537,8 @@ public boolean equals(Object o) { case TIMESTAMP_MILLISECONDS: case TIMESTAMP_MICROSECONDS: case TIMESTAMP_NANOSECONDS: - return getLong() == getLong(); + case DECIMAL64: + return getLong() == other.getLong(); case STRING: return Arrays.equals(getUTF8(), other.getUTF8()); default: @@ -566,6 +568,7 @@ public int hashCode() { case INT32: case UINT32: case TIMESTAMP_DAYS: + case DECIMAL32: valueHash = getInt(); break; case INT64: @@ -574,6 +577,7 @@ public int hashCode() { case TIMESTAMP_MILLISECONDS: case TIMESTAMP_MICROSECONDS: case TIMESTAMP_NANOSECONDS: + case DECIMAL64: valueHash = Long.hashCode(getLong()); break; case FLOAT32: @@ -642,6 +646,11 @@ public String toString() { sb.append(getJavaString()); sb.append('"'); break; + case DECIMAL32: + // FALL THROUGH + case DECIMAL64: + sb.append(getBigDecimal()); + break; default: throw new IllegalArgumentException("Unknown scalar type: " + type); } From 9672e3dbe38e680754db16a15cd01b5707e93227 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 28 Jan 2021 11:14:19 -0600 Subject: [PATCH 4/5] Fix Arrow column test leaks (#7241) Found leaks in the ArrowColumnVectorTest so fix them. Signed-off-by: Thomas Graves Authors: - Thomas Graves (@tgravescs) Approvers: - Robert (Bobby) Evans (@revans2) - Jason Lowe (@jlowe) URL: https://github.com/rapidsai/cudf/pull/7241 --- .../ai/rapids/cudf/ArrowColumnVectorTest.java | 72 ++++++++++--------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/java/src/test/java/ai/rapids/cudf/ArrowColumnVectorTest.java b/java/src/test/java/ai/rapids/cudf/ArrowColumnVectorTest.java index d8ba4548b6d..d5d4059d18d 100644 --- a/java/src/test/java/ai/rapids/cudf/ArrowColumnVectorTest.java +++ b/java/src/test/java/ai/rapids/cudf/ArrowColumnVectorTest.java @@ -71,10 +71,11 @@ void testArrowIntMultiBatches() { ByteBuffer valid = vector.getValidityBuffer().nioBuffer(); builder.addBatch(vector.getValueCount(), vector.getNullCount(), data, valid, null); } - ColumnVector cv = builder.buildAndPutOnDevice(); - ColumnVector expected = ColumnVector.fromBoxedInts(expectedArr.toArray(new Integer[0])); - assertEquals(cv.getType(), DType.INT32); - assertColumnsAreEqual(expected, cv, "ints"); + try (ColumnVector cv = builder.buildAndPutOnDevice(); + ColumnVector expected = ColumnVector.fromBoxedInts(expectedArr.toArray(new Integer[0]))) { + assertEquals(cv.getType(), DType.INT32); + assertColumnsAreEqual(expected, cv, "ints"); + } } finally { for (int i = 0; i < numVecs; i++) { vectors[i].close(); @@ -97,10 +98,11 @@ void testArrowLong() { ByteBuffer data = vector.getDataBuffer().nioBuffer(); ByteBuffer valid = vector.getValidityBuffer().nioBuffer(); builder.addBatch(vector.getValueCount(), vector.getNullCount(), data, valid, null); - ColumnVector cv = builder.buildAndPutOnDevice(); - assertEquals(cv.getType(), DType.INT64); - ColumnVector expected = ColumnVector.fromBoxedLongs(expectedArr.toArray(new Long[0])); - assertColumnsAreEqual(expected, cv, "Longs"); + try (ColumnVector cv = builder.buildAndPutOnDevice(); + ColumnVector expected = ColumnVector.fromBoxedLongs(expectedArr.toArray(new Long[0]))) { + assertEquals(cv.getType(), DType.INT64); + assertColumnsAreEqual(expected, cv, "Longs"); + } } } @@ -126,10 +128,11 @@ void testArrowLongOnHeap() { validOnHeap.put(data); validOnHeap.flip(); builder.addBatch(vector.getValueCount(), vector.getNullCount(), dataOnHeap, validOnHeap, null); - ColumnVector cv = builder.buildAndPutOnDevice(); - assertEquals(cv.getType(), DType.INT64); - ColumnVector expected = ColumnVector.fromBoxedLongs(expectedArr.toArray(new Long[0])); - assertColumnsAreEqual(expected, cv, "Longs"); + try (ColumnVector cv = builder.buildAndPutOnDevice(); + ColumnVector expected = ColumnVector.fromBoxedLongs(expectedArr.toArray(new Long[0]))) { + assertEquals(cv.getType(), DType.INT64); + assertColumnsAreEqual(expected, cv, "Longs"); + } } } @@ -148,11 +151,12 @@ void testArrowDouble() { ByteBuffer data = vector.getDataBuffer().nioBuffer(); ByteBuffer valid = vector.getValidityBuffer().nioBuffer(); builder.addBatch(vector.getValueCount(), vector.getNullCount(), data, valid, null); - ColumnVector cv = builder.buildAndPutOnDevice(); - assertEquals(cv.getType(), DType.FLOAT64); double[] array = expectedArr.stream().mapToDouble(i->i).toArray(); - ColumnVector expected = ColumnVector.fromDoubles(array); - assertColumnsAreEqual(expected, cv, "doubles"); + try (ColumnVector cv = builder.buildAndPutOnDevice(); + ColumnVector expected = ColumnVector.fromDoubles(array)) { + assertEquals(cv.getType(), DType.FLOAT64); + assertColumnsAreEqual(expected, cv, "doubles"); + } } } @@ -171,15 +175,16 @@ void testArrowFloat() { ByteBuffer data = vector.getDataBuffer().nioBuffer(); ByteBuffer valid = vector.getValidityBuffer().nioBuffer(); builder.addBatch(vector.getValueCount(), vector.getNullCount(), data, valid, null); - ColumnVector cv = builder.buildAndPutOnDevice(); - assertEquals(cv.getType(), DType.FLOAT32); float[] floatArray = new float[expectedArr.size()]; int i = 0; for (Float f : expectedArr) { floatArray[i++] = (f != null ? f : Float.NaN); // Or whatever default you want. } - ColumnVector expected = ColumnVector.fromFloats(floatArray); - assertColumnsAreEqual(expected, cv, "floats"); + try (ColumnVector cv = builder.buildAndPutOnDevice(); + ColumnVector expected = ColumnVector.fromFloats(floatArray)) { + assertEquals(cv.getType(), DType.FLOAT32); + assertColumnsAreEqual(expected, cv, "floats"); + } } } @@ -200,10 +205,11 @@ void testArrowString() { ByteBuffer valid = vector.getValidityBuffer().nioBuffer(); ByteBuffer offsets = vector.getOffsetBuffer().nioBuffer(); builder.addBatch(vector.getValueCount(), vector.getNullCount(), data, valid, offsets); - ColumnVector cv = builder.buildAndPutOnDevice(); - assertEquals(cv.getType(), DType.STRING); - ColumnVector expected = ColumnVector.fromStrings(expectedArr.toArray(new String[0])); - assertColumnsAreEqual(expected, cv, "Strings"); + try (ColumnVector cv = builder.buildAndPutOnDevice(); + ColumnVector expected = ColumnVector.fromStrings(expectedArr.toArray(new String[0]))) { + assertEquals(cv.getType(), DType.STRING); + assertColumnsAreEqual(expected, cv, "Strings"); + } } } @@ -233,10 +239,11 @@ void testArrowStringOnHeap() { offsetsOnHeap.put(offsets); offsetsOnHeap.flip(); builder.addBatch(vector.getValueCount(), vector.getNullCount(), dataOnHeap, validOnHeap, offsetsOnHeap); - ColumnVector cv = builder.buildAndPutOnDevice(); - assertEquals(cv.getType(), DType.STRING); - ColumnVector expected = ColumnVector.fromStrings(expectedArr.toArray(new String[0])); - assertColumnsAreEqual(expected, cv, "Strings"); + try (ColumnVector cv = builder.buildAndPutOnDevice(); + ColumnVector expected = ColumnVector.fromStrings(expectedArr.toArray(new String[0]));) { + assertEquals(cv.getType(), DType.STRING); + assertColumnsAreEqual(expected, cv, "Strings"); + } } } @@ -255,11 +262,12 @@ void testArrowDays() { ByteBuffer data = vector.getDataBuffer().nioBuffer(); ByteBuffer valid = vector.getValidityBuffer().nioBuffer(); builder.addBatch(vector.getValueCount(), vector.getNullCount(), data, valid, null); - ColumnVector cv = builder.buildAndPutOnDevice(); - assertEquals(cv.getType(), DType.TIMESTAMP_DAYS); int[] array = expectedArr.stream().mapToInt(i->i).toArray(); - ColumnVector expected = ColumnVector.daysFromInts(array); - assertColumnsAreEqual(expected, cv, "timestamp days"); + try (ColumnVector cv = builder.buildAndPutOnDevice(); + ColumnVector expected = ColumnVector.daysFromInts(array);) { + assertEquals(cv.getType(), DType.TIMESTAMP_DAYS); + assertColumnsAreEqual(expected, cv, "timestamp days"); + } } } From b608832f4f60ee91c033baa34f93338e46ea02e9 Mon Sep 17 00:00:00 2001 From: David <45795991+davidwendt@users.noreply.github.com> Date: Thu, 28 Jan 2021 15:49:55 -0500 Subject: [PATCH 5/5] Add dictionary column support to rolling_window (#7186) Reference #5963 Add support for dictionary column to `cudf::rolling_window` (non-udf) Rolling aggregations - [x] min/max - [x] lead/lag - [x] counting, row-number These only require aggregating the dictionary indices and do not need to access the keys. Authors: - David (@davidwendt) Approvers: - Mark Harris (@harrism) - Ram (Ramakrishna Prabhu) (@rgsl888prabhu) URL: https://github.com/rapidsai/cudf/pull/7186 --- cpp/src/rolling/rolling.cu | 10 +++-- cpp/src/rolling/rolling_detail.cuh | 64 +++++++++++++++++++-------- cpp/tests/rolling/rolling_test.cpp | 70 +++++++++++++++++++++++++++++- 3 files changed, 123 insertions(+), 21 deletions(-) diff --git a/cpp/src/rolling/rolling.cu b/cpp/src/rolling/rolling.cu index 13359f23d82..c187b8720b1 100644 --- a/cpp/src/rolling/rolling.cu +++ b/cpp/src/rolling/rolling.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,8 +26,10 @@ std::unique_ptr rolling_window(column_view const& input, std::unique_ptr const& agg, rmm::mr::device_memory_resource* mr) { + auto defaults = + cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input; return rolling_window( - input, empty_like(input)->view(), preceding_window, following_window, min_periods, agg, mr); + input, empty_like(defaults)->view(), preceding_window, following_window, min_periods, agg, mr); } namespace detail { @@ -107,8 +109,10 @@ std::unique_ptr rolling_window(column_view const& input, stream, mr); } else { + auto defaults_col = + cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input; return cudf::detail::rolling_window(input, - empty_like(input)->view(), + empty_like(defaults_col)->view(), preceding_window.begin(), following_window.begin(), min_periods, diff --git a/cpp/src/rolling/rolling_detail.cuh b/cpp/src/rolling/rolling_detail.cuh index e0ae16bed7f..5abef95310b 100644 --- a/cpp/src/rolling/rolling_detail.cuh +++ b/cpp/src/rolling/rolling_detail.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,8 @@ #include #include #include +#include +#include #include #include #include @@ -623,8 +625,6 @@ struct rolling_window_launcher { rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - if (input.is_empty()) return empty_like(input); - auto output = make_fixed_width_column( target_type(input.type(), op), input.size(), mask_state::UNINITIALIZED, stream, mr); @@ -663,8 +663,6 @@ struct rolling_window_launcher { rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - if (input.is_empty()) return empty_like(input); - auto output = make_numeric_column(cudf::data_type{cudf::type_to_id()}, input.size(), cudf::mask_state::UNINITIALIZED, @@ -755,8 +753,6 @@ struct rolling_window_launcher { rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - if (input.is_empty()) return empty_like(input); - CUDF_EXPECTS(default_outputs.type().id() == input.type().id(), "Defaults column type must match input column."); // Because LEAD/LAG. @@ -1036,18 +1032,52 @@ std::unique_ptr rolling_window(column_view const& input, static_assert(warp_size == cudf::detail::size_in_bits(), "bitmask_type size does not match CUDA warp size"); + if (input.is_empty()) return empty_like(input); + + if (cudf::is_dictionary(input.type())) + CUDF_EXPECTS(agg->kind == aggregation::COUNT_ALL || agg->kind == aggregation::COUNT_VALID || + agg->kind == aggregation::ROW_NUMBER || agg->kind == aggregation::MIN || + agg->kind == aggregation::MAX || agg->kind == aggregation::LEAD || + agg->kind == aggregation::LAG, + "Invalid aggregation for dictionary column"); + min_periods = std::max(min_periods, 0); - return cudf::type_dispatcher(input.type(), - dispatch_rolling{}, - input, - default_outputs, - preceding_window_begin, - following_window_begin, - min_periods, - agg, - stream, - mr); + auto input_col = cudf::is_dictionary(input.type()) + ? dictionary_column_view(input).get_indices_annotated() + : input; + auto output = cudf::type_dispatcher(input_col.type(), + dispatch_rolling{}, + input_col, + default_outputs, + preceding_window_begin, + following_window_begin, + min_periods, + agg, + stream, + mr); + if (!cudf::is_dictionary(input.type())) return output; + + // dictionary column post processing + if (agg->kind == aggregation::COUNT_ALL || agg->kind == aggregation::COUNT_VALID || + agg->kind == aggregation::ROW_NUMBER) + return output; + + // output is new dictionary indices (including nulls) + auto keys = std::make_unique(dictionary_column_view(input).keys(), stream, mr); + auto const indices_type = output->type(); // capture these + auto const output_size = output->size(); // before calling + auto const null_count = output->null_count(); // release() + auto contents = output->release(); + // create indices column from output column data + auto indices = std::make_unique(indices_type, + output_size, + std::move(*(contents.data.release())), + rmm::device_buffer{0, stream, mr}, + 0); + // create dictionary from keys and indices + return make_dictionary_column( + std::move(keys), std::move(indices), std::move(*(contents.null_mask.release())), null_count); } } // namespace detail diff --git a/cpp/tests/rolling/rolling_test.cpp b/cpp/tests/rolling/rolling_test.cpp index a9d52d8405c..26e4d485b30 100644 --- a/cpp/tests/rolling/rolling_test.cpp +++ b/cpp/tests/rolling/rolling_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -1048,4 +1049,71 @@ TYPED_TEST(FixedPointTests, MinMaxCountLagLeadNulls) cudf::logic_error); } +class RollingDictionaryTest : public cudf::test::BaseFixture { +}; + +TEST_F(RollingDictionaryTest, Count) +{ + cudf::test::dictionary_column_wrapper input( + {"This", "is", "rolling", "test", "being", "operated", "on", "string", "column"}, + {1, 0, 0, 1, 0, 1, 1, 1, 0}); + fixed_width_column_wrapper expected_count_val({1, 2, 1, 2, 3, 3, 3, 2, 1}, + {1, 1, 1, 1, 1, 1, 1, 1, 1}); + fixed_width_column_wrapper expected_count_all({3, 4, 4, 4, 4, 4, 4, 3, 2}, + {1, 1, 1, 1, 1, 1, 1, 1, 1}); + fixed_width_column_wrapper expected_row_number({1, 2, 2, 2, 2, 2, 2, 2, 2}, + {1, 1, 1, 1, 1, 1, 1, 1, 1}); + + auto got_count_valid = cudf::rolling_window(input, 2, 2, 1, cudf::make_count_aggregation()); + auto got_count_all = + cudf::rolling_window(input, 2, 2, 1, cudf::make_count_aggregation(cudf::null_policy::INCLUDE)); + auto got_row_number = cudf::rolling_window(input, 2, 2, 1, cudf::make_row_number_aggregation()); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_val, got_count_valid->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_all, got_count_all->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_row_number, got_row_number->view()); +} + +TEST_F(RollingDictionaryTest, MinMax) +{ + cudf::test::dictionary_column_wrapper input( + {"This", "is", "rolling", "test", "being", "operated", "on", "string", "column"}, + {1, 0, 0, 1, 0, 1, 1, 1, 0}); + cudf::test::strings_column_wrapper expected_min( + {"This", "This", "test", "operated", "on", "on", "on", "on", "string"}, + {1, 1, 1, 1, 1, 1, 1, 1, 1}); + cudf::test::strings_column_wrapper expected_max( + {"This", "test", "test", "test", "test", "string", "string", "string", "string"}, + {1, 1, 1, 1, 1, 1, 1, 1, 1}); + + auto got_min_dict = cudf::rolling_window(input, 2, 2, 1, cudf::make_min_aggregation()); + auto got_min = cudf::dictionary::decode(cudf::dictionary_column_view(got_min_dict->view())); + + auto got_max_dict = cudf::rolling_window(input, 2, 2, 1, cudf::make_max_aggregation()); + auto got_max = cudf::dictionary::decode(cudf::dictionary_column_view(got_max_dict->view())); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_min, got_min->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_max, got_max->view()); +} + +TEST_F(RollingDictionaryTest, LeadLag) +{ + cudf::test::dictionary_column_wrapper input( + {"This", "is", "rolling", "test", "being", "operated", "on", "string", "column"}, + {1, 0, 0, 1, 0, 1, 1, 1, 0}); + cudf::test::strings_column_wrapper expected_lead( + {"", "", "test", "", "operated", "on", "string", "", ""}, {0, 0, 1, 0, 1, 1, 1, 0, 0}); + cudf::test::strings_column_wrapper expected_lag( + {"", "This", "", "", "test", "", "operated", "on", "string"}, {0, 1, 0, 0, 1, 0, 1, 1, 1}); + + auto got_lead_dict = cudf::rolling_window(input, 2, 1, 1, cudf::make_lead_aggregation(1)); + auto got_lead = cudf::dictionary::decode(cudf::dictionary_column_view(got_lead_dict->view())); + + auto got_lag_dict = cudf::rolling_window(input, 2, 2, 1, cudf::make_lag_aggregation(1)); + auto got_lag = cudf::dictionary::decode(cudf::dictionary_column_view(got_lag_dict->view())); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lead, got_lead->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lag, got_lag->view()); +} + CUDF_TEST_PROGRAM_MAIN()