From c47180107e2bf266825e3b136f36cbc3923d1955 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Thu, 3 Dec 2020 13:22:20 -0500 Subject: [PATCH 01/10] Fix null mask writing --- cpp/src/io/orc/dict_enc.cu | 9 ++++--- cpp/src/io/orc/orc_gpu.h | 3 +++ cpp/src/io/orc/stripe_enc.cu | 24 +++++++++++++++--- cpp/src/io/orc/writer_impl.cu | 24 ++++++++++++------ cpp/src/io/parquet/page_enc.cu | 30 ++++++++++++++++------- cpp/src/io/parquet/writer_impl.cu | 4 +++ cpp/src/io/statistics/column_stats.cu | 15 +++++++++--- cpp/src/io/statistics/column_stats.h | 2 ++ cpp/tests/io/orc_test.cpp | 35 +++++++++++++++++++++------ cpp/tests/io/parquet_test.cpp | 21 ++++++++++++++++ 10 files changed, 134 insertions(+), 33 deletions(-) diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 714ac195dfc..63b932fee2e 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -75,10 +75,13 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, int t) s->scratch_red[t] = 0xffffffffu; } else { uint32_t row = s->chunk.start_row + i + t * 32; - uint32_t v = (row < s->chunk.start_row + s->chunk.num_rows) ? valid_map[row >> 5] : 0; + uint32_t v = (row < s->chunk.start_row + s->chunk.num_rows) + ? valid_map[(row + s->chunk.column_offset) >> 5] + : 0; if (row & 0x1f) { - uint32_t v1 = - (row + 32 < s->chunk.start_row + s->chunk.num_rows) ? valid_map[(row >> 5) + 1] : 0; + uint32_t v1 = (row + 32 < s->chunk.start_row + s->chunk.num_rows) + ? valid_map[((row + s->chunk.column_offset) >> 5) + 1] + : 0; v = __funnelshift_r(v, v1, row & 0x1f); } s->scratch_red[t] = v; diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index a905b76f75d..f6ceff9e6bd 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -127,6 +128,7 @@ struct EncChunk { int32_t strm_id[CI_NUM_STREAMS]; // stream id or -1 if not present uint32_t strm_len[CI_NUM_STREAMS]; // in: max length, out: actual length const uint32_t *valid_map_base; // base ptr of input valid bit map + size_type column_offset; // index of the first element relative to the base memory const void *column_data_base; // base ptr of input column data uint32_t start_row; // start row of this chunk uint32_t num_rows; // number of rows in this chunk @@ -156,6 +158,7 @@ struct StripeStream { **/ struct DictionaryChunk { const uint32_t *valid_map_base; // base ptr of input valid bit map + size_type column_offset; // index of the first element relative to the base memory const void *column_data_base; // base ptr of column data (ptr,len pair) uint32_t *dict_data; // dictionary data (index of non-null rows) uint32_t *dict_index; // row indices of corresponding string (row from dictionary index) diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index b086b6945c7..a9350acef38 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -712,10 +712,26 @@ __global__ void __launch_bounds__(block_size) uint8_t valid = 0; if (row < s->chunk.valid_rows) { if (s->chunk.valid_map_base) { - uint8_t valid_map[4]; - auto const valid_map_byte_idx = row >> 3; - memcpy(valid_map, &s->chunk.valid_map_base[valid_map_byte_idx / 4], 4); - valid = valid_map[valid_map_byte_idx % 4]; + // uint8_t valid_map[4]; + // auto const valid_map_byte_idx = row >> 3; + // memcpy(valid_map, &s->chunk.valid_map_base[valid_map_byte_idx / 4], 4); + // valid = valid_map[valid_map_byte_idx % 4]; + //////////////////////// + + size_type current_valid_offset = row + s->chunk.column_offset; + size_type next_valid_offset = current_valid_offset + min(32, s->chunk.valid_rows); + + size_type current_byte_index = current_valid_offset / 32; + size_type next_byte_index = next_valid_offset / 32; + + bitmask_type current_mask_word = s->chunk.valid_map_base[current_byte_index]; + bitmask_type next_mask_word = 0; + if (next_byte_index != current_byte_index) { + next_mask_word = s->chunk.valid_map_base[next_byte_index]; + } + bitmask_type mask = + __funnelshift_r(current_mask_word, next_mask_word, current_valid_offset); + valid = 0xff & mask; } else { valid = 0xff; } diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index ba3696fbefb..a0f42b51b5e 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -108,11 +108,13 @@ __global__ void stringdata_to_nvstrdesc(gpu::nvstrdesc_s *dst, const size_type *offsets, const char *strdata, const uint32_t *nulls, + const size_type column_offset, size_type column_size) { size_type row = blockIdx.x * blockDim.x + threadIdx.x; if (row < column_size) { - uint32_t is_valid = (nulls) ? (nulls[row >> 5] >> (row & 0x1f)) & 1 : 1; + uint32_t is_valid = + (nulls) ? (nulls[(row + column_offset) >> 5] >> ((row + column_offset) & 0x1f)) & 1 : 1; size_t count; const char *ptr; if (is_valid) { @@ -151,6 +153,7 @@ class orc_column_view { _null_count(col.null_count()), _data(col.head() + col.offset() * _type_width), _nulls(col.nullable() ? col.null_mask() : nullptr), + _column_offset(col.offset()), _clockscale(to_clockscale(col.type().id())), _type_kind(to_orc_type(col.type().id())) { @@ -163,6 +166,7 @@ class orc_column_view { view.offsets().data() + view.offset(), view.chars().data(), _nulls, + _column_offset, _data_count); _data = _indexes.data(); @@ -217,6 +221,7 @@ class orc_column_view { bool nullable() const noexcept { return (_nulls != nullptr); } void const *data() const noexcept { return _data; } uint32_t const *nulls() const noexcept { return _nulls; } + size_type column_offset() const noexcept { return _column_offset; } uint8_t clockscale() const noexcept { return _clockscale; } void set_orc_encoding(ColumnEncodingKind e) { _encoding_kind = e; } @@ -230,12 +235,13 @@ class orc_column_view { size_t _str_id = 0; bool _string_type = false; - size_t _type_width = 0; - size_t _data_count = 0; - size_t _null_count = 0; - void const *_data = nullptr; - uint32_t const *_nulls = nullptr; - uint8_t _clockscale = 0; + size_t _type_width = 0; + size_t _data_count = 0; + size_t _null_count = 0; + void const *_data = nullptr; + uint32_t const *_nulls = nullptr; + size_type _column_offset = 0; + uint8_t _clockscale = 0; // ORC-related members std::string _name{}; @@ -270,6 +276,7 @@ void writer::impl::init_dictionaries(orc_column_view *columns, for (size_t g = 0; g < num_rowgroups; g++) { auto *ck = &dict[g * str_col_ids.size() + i]; ck->valid_map_base = str_column.nulls(); + ck->column_offset = str_column.column_offset(); ck->column_data_base = str_column.data(); ck->dict_data = dict_data + i * num_rows + g * row_index_stride_; ck->dict_index = dict_index + i * num_rows; // Indexed by abs row @@ -572,12 +579,14 @@ rmm::device_buffer writer::impl::encode_columns(orc_column_view *columns, ck->type_kind = columns[i].orc_kind(); if (ck->type_kind == TypeKind::STRING) { ck->valid_map_base = columns[i].nulls(); + ck->column_offset = columns[i].column_offset(); ck->column_data_base = (ck->encoding_kind == DICTIONARY_V2) ? columns[i].host_stripe_dict(stripe_id)->dict_index : columns[i].data(); ck->dtype_len = 1; } else { ck->valid_map_base = columns[i].nulls(); + ck->column_offset = columns[i].column_offset(); ck->column_data_base = columns[i].data(); ck->dtype_len = columns[i].type_width(); } @@ -753,6 +762,7 @@ std::vector> writer::impl::gather_statistic_blobs( desc->num_rows = columns[i].data_count(); desc->num_values = columns[i].data_count(); desc->valid_map_base = columns[i].nulls(); + desc->column_offset = columns[i].column_offset(); desc->column_data_base = columns[i].data(); if (desc->stats_dtype == dtype_timestamp64) { // Timestamp statistics are in milliseconds diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index b0b4ee251cb..78e3a5298e7 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -190,12 +190,16 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment size_type nvals = s->frag.num_leaf_values; size_type start_value_idx = s->start_value_idx; + size_type validity_offset = (s->col.nesting_offsets == nullptr) ? s->col.column_offset : 0; for (uint32_t i = 0; i < nvals; i += block_size) { const uint32_t *valid = s->col.valid_map_base; uint32_t val_idx = start_value_idx + i + t; - uint32_t is_valid = (i + t < nvals && val_idx < s->col.num_values) - ? (valid) ? (valid[val_idx >> 5] >> (val_idx & 0x1f)) & 1 : 1 - : 0; + uint32_t is_valid = + (i + t < nvals && val_idx < s->col.num_values) + ? (valid) + ? (valid[(val_idx + validity_offset) >> 5] >> ((val_idx + validity_offset) & 0x1f)) & 1 + : 1 + : 0; uint32_t valid_warp = ballot(is_valid); uint32_t len, nz_pos, hash; if (is_valid) { @@ -963,6 +967,7 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, } __syncthreads(); + size_type validity_offset = (s->col.nesting_offsets == nullptr) ? s->col.column_offset : 0; // Encode Repetition and Definition levels if (s->page.page_type != PageType::DICTIONARY_PAGE && s->col.level_bits != 0 && s->col.nesting_levels == 0) { @@ -983,9 +988,12 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, uint32_t row = s->page.start_row + rle_numvals + t; // Definition level encodes validity. Checks the valid map and if it is valid, then sets the // def_lvl accordingly and sets it in s->vals which is then given to RleEncode to encode - uint32_t def_lvl = (rle_numvals + t < s->page.num_rows && row < s->col.num_rows) - ? (valid) ? (valid[row >> 5] >> (row & 0x1f)) & 1 : 1 - : 0; + uint32_t def_lvl = + (rle_numvals + t < s->page.num_rows && row < s->col.num_rows) + ? (valid) + ? (valid[(row + validity_offset) >> 5] >> ((row + validity_offset) & 0x1f)) & 1 + : 1 + : 0; s->vals[(rle_numvals + t) & (rle_buffer_size - 1)] = def_lvl; __syncthreads(); rle_numvals += nrows; @@ -1082,9 +1090,13 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, val_idx = (is_valid) ? s->col.dict_data[val_idx] : val_idx; } else { const uint32_t *valid = s->col.valid_map_base; - is_valid = (val_idx < s->col.num_values && cur_val_idx + t < s->page.num_leaf_values) - ? (valid) ? (valid[val_idx >> 5] >> (val_idx & 0x1f)) & 1 : 1 - : 0; + is_valid = + (val_idx < s->col.num_values && cur_val_idx + t < s->page.num_leaf_values) + ? (valid) + ? (valid[(val_idx + validity_offset) >> 5] >> ((val_idx + validity_offset) & 0x1f)) & + 1 + : 1 + : 0; } warp_valids = ballot(is_valid); cur_val_idx += nvals; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 1f76f6455c1..d91d3c1635c 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -136,6 +136,7 @@ class parquet_column_view { _null_count(_leaf_col.null_count()), _data(col.head() + col.offset() * _type_width), _nulls(_leaf_col.nullable() ? _leaf_col.null_mask() : nullptr), + _offset(col.offset()), _converted_type(ConvertedType::UNKNOWN), _ts_scale(0), _dremel_offsets(0, stream), @@ -323,6 +324,7 @@ class parquet_column_view { bool nullable() const noexcept { return (_nulls != nullptr); } void const *data() const noexcept { return _data; } uint32_t const *nulls() const noexcept { return _nulls; } + size_type offset() const noexcept { return _offset; } // List related data column_view cudf_col() const noexcept { return _col; } @@ -396,6 +398,7 @@ class parquet_column_view { size_t _null_count = 0; void const *_data = nullptr; uint32_t const *_nulls = nullptr; + size_type _offset = 0; // parquet-related members std::string _name{}; @@ -797,6 +800,7 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state) *desc = gpu::EncColumnDesc{}; // Zero out all fields desc->column_data_base = col.data(); desc->valid_map_base = col.nulls(); + desc->column_offset = col.offset(); desc->stats_dtype = col.stats_type(); desc->ts_scale = col.ts_scale(); // TODO (dm): Enable dictionary for list after refactor diff --git a/cpp/src/io/statistics/column_stats.cu b/cpp/src/io/statistics/column_stats.cu index 3f8921f6011..5c1d897eaa7 100644 --- a/cpp/src/io/statistics/column_stats.cu +++ b/cpp/src/io/statistics/column_stats.cu @@ -166,7 +166,10 @@ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Stora uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; uint32_t is_valid = (r < s->group.num_rows && row < s->col.num_values) - ? (valid_map) ? (valid_map[row >> 5] >> (row & 0x1f)) & 1 : 1 + ? (valid_map) ? (valid_map[(row + s->col.column_offset) >> 5] >> + ((row + s->col.column_offset) & 0x1f)) & + 1 + : 1 : 0; if (is_valid) { switch (dtype) { @@ -251,7 +254,10 @@ gatherFloatColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Sto uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; uint32_t is_valid = (r < s->group.num_rows && row < s->col.num_values) - ? (valid_map) ? (valid_map[row >> 5] >> (row & 0x1f)) & 1 : 1 + ? (valid_map) ? (valid_map[(row + s->col.column_offset) >> 5] >> + ((row + s->col.column_offset) & 0x1f)) & + 1 + : 1 : 0; if (is_valid) { if (dtype == dtype_float64) { @@ -331,7 +337,10 @@ void __device__ gatherStringColumnStats(stats_state_s *s, uint32_t t, Storage &s uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; uint32_t is_valid = (r < s->group.num_rows && row < s->col.num_values) - ? (valid_map) ? (valid_map[row >> 5] >> (row & 0x1f)) & 1 : 1 + ? (valid_map) ? (valid_map[(row + s->col.column_offset) >> 5] >> + ((row + s->col.column_offset) & 0x1f)) & + 1 + : 1 : 0; if (is_valid) { const nvstrdesc_s *str_col = static_cast(s->col.column_data_base); diff --git a/cpp/src/io/statistics/column_stats.h b/cpp/src/io/statistics/column_stats.h index bbecc85b8d8..6812678f01d 100644 --- a/cpp/src/io/statistics/column_stats.h +++ b/cpp/src/io/statistics/column_stats.h @@ -16,6 +16,7 @@ #pragma once #include +#include #include namespace cudf { @@ -43,6 +44,7 @@ struct stats_column_desc { uint32_t num_values; //!< Number of data values in column. Different from num_rows in case of //!< nested columns const uint32_t *valid_map_base; //!< base of valid bit map for this column (null if not present) + size_type column_offset; //! < index of the first element relative to the base memory const void *column_data_base; //!< base ptr to column data int32_t ts_scale; //!< timestamp scale (>0: multiply by scale, <0: divide by -scale) }; diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 0e03b8800b5..193bda308e6 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -142,13 +143,13 @@ inline auto random_values(size_t size) } // Helper function to compare two tables -void CUDF_TEST_EXPECT_TABLES_EQUAL(cudf::table_view const& lhs, cudf::table_view const& rhs) -{ - EXPECT_EQ(lhs.num_columns(), rhs.num_columns()); - auto expected = lhs.begin(); - auto result = rhs.begin(); - while (result != rhs.end()) { CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected++, *result++); } -} +// void CUDF_TEST_EXPECT_TABLES_EQUAL(cudf::table_view const& lhs, cudf::table_view const& rhs) +//{ +// EXPECT_EQ(lhs.num_columns(), rhs.num_columns()); +// auto expected = lhs.begin(); +// auto result = rhs.begin(); +// while (result != rhs.end()) { CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected++, *result++); } +//} struct SkipRowTest { int test_calls; @@ -617,6 +618,26 @@ TEST_F(OrcWriterTest, negTimestampsNano) CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result.tbl->view()); } +TEST_F(OrcWriterTest, Slice) +{ + auto col = + cudf::test::fixed_width_column_wrapper{{1, 2, 3, 4, 5}, {true, true, true, false, true}}; + std::vector indices{2, 5}; + std::vector result = cudf::slice(col, indices); + cudf::table_view tbl{{result[0]}}; + + auto filepath = temp_env->get_temp_filepath("Slice.orc"); + cudf_io::orc_writer_options out_opts = + cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, tbl); + cudf_io::write_orc(out_opts); + + cudf_io::orc_reader_options in_opts = + cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath}); + auto read_table = cudf_io::read_orc(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(read_table.tbl->view(), tbl); +} + TEST_F(OrcChunkedWriterTest, SingleTable) { srand(31337); diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index c3ecafe990a..9fd04e4425e 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -476,6 +476,7 @@ TEST_F(ParquetWriterTest, SlicedTable) expected_metadata.column_names.emplace_back("col_list"); expected_metadata.column_names.emplace_back("col_multi_level_list"); + // auto expected = table_view({col0, col1, col2, col3}); auto expected = table_view({col0, col1, col2, col3, col4}); auto expected_slice = cudf::slice(expected, {2, static_cast(num_rows) - 1}); @@ -816,6 +817,26 @@ TEST_F(ParquetWriterTest, MultipleMismatchedSources) } } +TEST_F(ParquetWriterTest, Slice) +{ + auto col = + cudf::test::fixed_width_column_wrapper{{1, 2, 3, 4, 5}, {true, true, true, false, true}}; + std::vector indices{2, 5}; + std::vector result = cudf::slice(col, indices); + cudf::table_view tbl{{result[0]}}; + + auto filepath = temp_env->get_temp_filepath("Slice.parquet"); + cudf_io::parquet_writer_options out_opts = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); + cudf_io::write_parquet(out_opts); + + cudf_io::parquet_reader_options in_opts = + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}); + auto read_table = cudf_io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(read_table.tbl->view(), tbl); +} + TEST_F(ParquetChunkedWriterTest, SingleTable) { srand(31337); From 30bbb3998779a50167d4234a151a48fc5d77c80c Mon Sep 17 00:00:00 2001 From: skirui-source <71867292+skirui-source@users.noreply.github.com> Date: Fri, 4 Dec 2020 12:22:28 -0800 Subject: [PATCH 02/10] Create agg() function for dataframes(#6483) Closes #5247 Adds `agg` function for DataFrame Authors: - Sheilah Kirui - Sheilah Kirui - Michael Wang - skirui-source <71867292+skirui-source@users.noreply.github.com> - galipremsagar - GALI PREM SAGAR - Keith Kraus - Ashwin Srinath Approvers: - Michael Wang - Michael Wang - Keith Kraus URL: https://github.com/rapidsai/cudf/pull/6483 --- CHANGELOG.md | 1 + python/cudf/cudf/core/dataframe.py | 135 ++++++++++++++++++++++- python/cudf/cudf/tests/test_dataframe.py | 102 +++++++++++++++++ 3 files changed, 236 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd3a045c9b9..071d3e15522 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ - PR #6765 Cupy fallback for __array_function__ and __array_ufunc__ for cudf.Series - PR #6817 Add support for scatter() on lists-of-struct columns - PR #6805 Implement `cudf::detail::copy_if` for `decimal32` and `decimal64` +- PR #6483 Add `agg` function to aggregate dataframe using one or more operations - PR #6726 Support selecting different hash functions in hash_partition - PR #6619 Improve Dockerfile - PR #6831 Added parquet chunked writing ability for list columns diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 77e992e0fd5..07ff37fc5cc 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -1,5 +1,5 @@ # Copyright (c) 2018-2020, NVIDIA CORPORATION. -from __future__ import division, print_function +from __future__ import division import inspect import itertools @@ -8,7 +8,7 @@ import sys import warnings from collections import OrderedDict, defaultdict -from collections.abc import Mapping, Sequence +from collections.abc import Iterable, Mapping, Sequence import cupy import numpy as np @@ -3728,6 +3728,137 @@ def sort_values( keep_index=not ignore_index, ) + def agg(self, aggs, axis=None): + """ + Aggregate using one or more operations over the specified axis. + + Parameters + ---------- + aggs : Iterable (set, list, string, tuple or dict) + Function to use for aggregating data. Accepted types are: + * string name, e.g. ``"sum"`` + * list of functions, e.g. ``["sum", "min", "max"]`` + * dict of axis labels specified operations per column, + e.g. ``{"a": "sum"}`` + + axis : not yet supported + + Returns + ------- + Aggregation Result : ``Series`` or ``DataFrame`` + When ``DataFrame.agg`` is called with single agg, + ``Series`` is returned. + When ``DataFrame.agg`` is called with several aggs, + ``DataFrame`` is returned. + + Notes + ----- + Difference from pandas: + * Not supporting: ``axis``, ``*args``, ``**kwargs`` + + """ + # TODO: Remove the typecasting below once issue #6846 is fixed + # link + dtypes = [self[col].dtype for col in self._column_names] + common_dtype = cudf.utils.dtypes.find_common_type(dtypes) + df_normalized = self.astype(common_dtype) + + if any(is_string_dtype(dt) for dt in dtypes): + raise NotImplementedError( + "DataFrame.agg() is not supported for " + "frames containing string columns" + ) + + if axis == 0 or axis is not None: + raise NotImplementedError("axis not implemented yet") + + if isinstance(aggs, Iterable) and not isinstance(aggs, (str, dict)): + result = cudf.DataFrame() + # TODO : Allow simultaneous pass for multi-aggregation as + # a future optimization + for agg in aggs: + result[agg] = getattr(df_normalized, agg)() + return result.T.sort_index(axis=1, ascending=True) + + elif isinstance(aggs, str): + if not hasattr(df_normalized, aggs): + raise AttributeError( + f"{aggs} is not a valid function for " + f"'DataFrame' object" + ) + result = cudf.DataFrame() + result[aggs] = getattr(df_normalized, aggs)() + result = result.iloc[:, 0] + result.name = None + return result + + elif isinstance(aggs, dict): + cols = aggs.keys() + if any([callable(val) for val in aggs.values()]): + raise NotImplementedError( + "callable parameter is not implemented yet" + ) + elif all([isinstance(val, str) for val in aggs.values()]): + result = cudf.Series(index=cols) + for key, value in aggs.items(): + col = df_normalized[key] + if not hasattr(col, value): + raise AttributeError( + f"{value} is not a valid function for " + f"'Series' object" + ) + result[key] = getattr(col, value)() + elif all([isinstance(val, Iterable) for val in aggs.values()]): + idxs = set() + for val in aggs.values(): + if isinstance(val, Iterable): + idxs.update(val) + elif isinstance(val, str): + idxs.add(val) + idxs = sorted(list(idxs)) + for agg in idxs: + if agg is callable: + raise NotImplementedError( + "callable parameter is not implemented yet" + ) + result = cudf.DataFrame(index=idxs, columns=cols) + for key in aggs.keys(): + col = df_normalized[key] + col_empty = column_empty( + len(idxs), dtype=col.dtype, masked=True + ) + ans = cudf.Series(data=col_empty, index=idxs) + if isinstance(aggs.get(key), Iterable): + # TODO : Allow simultaneous pass for multi-aggregation + # as a future optimization + for agg in aggs.get(key): + if not hasattr(col, agg): + raise AttributeError( + f"{agg} is not a valid function for " + f"'Series' object" + ) + ans[agg] = getattr(col, agg)() + elif isinstance(aggs.get(key), str): + if not hasattr(col, aggs.get(key)): + raise AttributeError( + f"{aggs.get(key)} is not a valid function for " + f"'Series' object" + ) + ans[aggs.get(key)] = getattr(col, agg)() + result[key] = ans + else: + raise ValueError("values of dict must be a string or list") + + return result + + elif callable(aggs): + raise NotImplementedError( + "callable parameter is not implemented yet" + ) + + else: + raise ValueError("argument must be a string, list or dict") + def nlargest(self, n, columns, keep="first"): """Get the rows of the DataFrame sorted by the n largest value of *columns* diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index 69eb70e7201..e513826ebaf 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -7995,3 +7995,105 @@ def test_dataframe_from_pandas_duplicate_columns(): ValueError, match="Duplicate column names are not allowed" ): gd.from_pandas(pdf) + + +@pytest.mark.parametrize( + "data", + [ + {"a": [1, 2, 3], "b": [3.0, 4.0, 5.0], "c": [True, True, False]}, + {"a": [1.0, 2.0, 3.0], "b": [3.0, 4.0, 5.0], "c": [True, True, False]}, + {"a": [1, 2, 3], "b": [3, 4, 5], "c": [True, True, False]}, + {"a": [1, 2, 3], "b": [True, True, False], "c": [False, True, False]}, + { + "a": [1.0, 2.0, 3.0], + "b": [True, True, False], + "c": [False, True, False], + }, + {"a": [1, 2, 3], "b": [3, 4, 5], "c": [2.0, 3.0, 4.0]}, + {"a": [1, 2, 3], "b": [2.0, 3.0, 4.0], "c": [5.0, 6.0, 4.0]}, + ], +) +@pytest.mark.parametrize( + "aggs", + [ + ["min", "sum", "max"], + ("min", "sum", "max"), + {"min", "sum", "max"}, + "sum", + {"a": "sum", "b": "min", "c": "max"}, + {"a": ["sum"], "b": ["min"], "c": ["max"]}, + {"a": ("sum"), "b": ("min"), "c": ("max")}, + {"a": {"sum"}, "b": {"min"}, "c": {"max"}}, + {"a": ["sum", "min"], "b": ["sum", "max"], "c": ["min", "max"]}, + {"a": ("sum", "min"), "b": ("sum", "max"), "c": ("min", "max")}, + {"a": {"sum", "min"}, "b": {"sum", "max"}, "c": {"min", "max"}}, + ], +) +def test_agg_for_dataframes(data, aggs): + pdf = pd.DataFrame(data) + gdf = gd.DataFrame(data) + + expect = pdf.agg(aggs) + got = gdf.agg(aggs) + + assert_eq(expect, got, check_dtype=False) + + +@pytest.mark.parametrize("aggs", [{"a": np.sum, "b": np.min, "c": np.max}]) +def test_agg_for_unsupported_function(aggs): + gdf = gd.DataFrame( + {"a": [1, 2, 3], "b": [3.0, 4.0, 5.0], "c": [True, True, False]} + ) + + with pytest.raises(NotImplementedError): + gdf.agg(aggs) + + +@pytest.mark.parametrize("aggs", ["asdf"]) +def test_agg_for_dataframe_with_invalid_function(aggs): + gdf = gd.DataFrame( + {"a": [1, 2, 3], "b": [3.0, 4.0, 5.0], "c": [True, True, False]} + ) + + with pytest.raises( + AttributeError, + match=f"{aggs} is not a valid function for 'DataFrame' object", + ): + gdf.agg(aggs) + + +@pytest.mark.parametrize("aggs", [{"a": "asdf"}]) +def test_agg_for_series_with_invalid_function(aggs): + gdf = gd.DataFrame( + {"a": [1, 2, 3], "b": [3.0, 4.0, 5.0], "c": [True, True, False]} + ) + + with pytest.raises( + AttributeError, + match=f"{aggs['a']} is not a valid function for 'Series' object", + ): + gdf.agg(aggs) + + +@pytest.mark.parametrize( + "aggs", + [ + "sum", + ["min", "sum", "max"], + {"a": {"sum", "min"}, "b": {"sum", "max"}, "c": {"min", "max"}}, + ], +) +def test_agg_for_dataframe_with_string_columns(aggs): + gdf = gd.DataFrame( + {"a": ["m", "n", "o"], "b": ["t", "u", "v"], "c": ["x", "y", "z"]}, + index=["a", "b", "c"], + ) + + with pytest.raises( + NotImplementedError, + match=re.escape( + "DataFrame.agg() is not supported for " + "frames containing string columns" + ), + ): + gdf.agg(aggs) From b7e4a8f8a9c404c8616d08dbba5d3f32555897d1 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Fri, 4 Dec 2020 16:10:42 -0500 Subject: [PATCH 03/10] PR comment fixes --- cpp/src/io/orc/dict_enc.cu | 6 +++--- cpp/src/io/orc/stripe_enc.cu | 6 ------ cpp/src/io/orc/writer_impl.cu | 5 +++-- cpp/src/io/parquet/page_enc.cu | 11 +++++------ cpp/src/io/statistics/column_stats.cu | 15 +++++++++------ cpp/tests/io/orc_test.cpp | 11 +---------- cpp/tests/io/parquet_test.cpp | 3 +-- 7 files changed, 22 insertions(+), 35 deletions(-) diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 63b932fee2e..075fac48844 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -76,13 +76,13 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s, int t) } else { uint32_t row = s->chunk.start_row + i + t * 32; uint32_t v = (row < s->chunk.start_row + s->chunk.num_rows) - ? valid_map[(row + s->chunk.column_offset) >> 5] + ? valid_map[(row + s->chunk.column_offset) / 32] : 0; if (row & 0x1f) { uint32_t v1 = (row + 32 < s->chunk.start_row + s->chunk.num_rows) - ? valid_map[((row + s->chunk.column_offset) >> 5) + 1] + ? valid_map[((row + s->chunk.column_offset) / 32) + 1] : 0; - v = __funnelshift_r(v, v1, row & 0x1f); + v = __funnelshift_r(v, v1, row + s->chunk.column_offset); } s->scratch_red[t] = v; } diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index a9350acef38..49085314762 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -712,12 +712,6 @@ __global__ void __launch_bounds__(block_size) uint8_t valid = 0; if (row < s->chunk.valid_rows) { if (s->chunk.valid_map_base) { - // uint8_t valid_map[4]; - // auto const valid_map_byte_idx = row >> 3; - // memcpy(valid_map, &s->chunk.valid_map_base[valid_map_byte_idx / 4], 4); - // valid = valid_map[valid_map_byte_idx % 4]; - //////////////////////// - size_type current_valid_offset = row + s->chunk.column_offset; size_type next_valid_offset = current_valid_offset + min(32, s->chunk.valid_rows); diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 2dfa6997d9d..7442273bbe1 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -120,8 +120,9 @@ __global__ void stringdata_to_nvstrdesc(gpu::nvstrdesc_s *dst, { size_type row = blockIdx.x * blockDim.x + threadIdx.x; if (row < column_size) { - uint32_t is_valid = - (nulls) ? (nulls[(row + column_offset) >> 5] >> ((row + column_offset) & 0x1f)) & 1 : 1; + uint32_t is_valid = (nulls != nullptr) + ? (nulls[(row + column_offset) / 32] >> ((row + column_offset) % 32)) & 1 + : 1; size_t count; const char *ptr; if (is_valid) { diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 78e3a5298e7..090c9ac7efe 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -197,7 +197,7 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment uint32_t is_valid = (i + t < nvals && val_idx < s->col.num_values) ? (valid) - ? (valid[(val_idx + validity_offset) >> 5] >> ((val_idx + validity_offset) & 0x1f)) & 1 + ? (valid[(val_idx + validity_offset) / 32] >> ((val_idx + validity_offset) % 32)) & 1 : 1 : 0; uint32_t valid_warp = ballot(is_valid); @@ -990,8 +990,8 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, // def_lvl accordingly and sets it in s->vals which is then given to RleEncode to encode uint32_t def_lvl = (rle_numvals + t < s->page.num_rows && row < s->col.num_rows) - ? (valid) - ? (valid[(row + validity_offset) >> 5] >> ((row + validity_offset) & 0x1f)) & 1 + ? (valid != nullptr) + ? (valid[(row + validity_offset) / 32] >> ((row + validity_offset) % 32)) & 1 : 1 : 0; s->vals[(rle_numvals + t) & (rle_buffer_size - 1)] = def_lvl; @@ -1092,9 +1092,8 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, const uint32_t *valid = s->col.valid_map_base; is_valid = (val_idx < s->col.num_values && cur_val_idx + t < s->page.num_leaf_values) - ? (valid) - ? (valid[(val_idx + validity_offset) >> 5] >> ((val_idx + validity_offset) & 0x1f)) & - 1 + ? (valid != nullptr) + ? (valid[(val_idx + validity_offset) / 32] >> ((val_idx + validity_offset) % 32)) & 1 : 1 : 0; } diff --git a/cpp/src/io/statistics/column_stats.cu b/cpp/src/io/statistics/column_stats.cu index 5c1d897eaa7..0ac49a15d01 100644 --- a/cpp/src/io/statistics/column_stats.cu +++ b/cpp/src/io/statistics/column_stats.cu @@ -165,12 +165,15 @@ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Stora uint32_t r = i + t; uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; - uint32_t is_valid = (r < s->group.num_rows && row < s->col.num_values) - ? (valid_map) ? (valid_map[(row + s->col.column_offset) >> 5] >> - ((row + s->col.column_offset) & 0x1f)) & - 1 - : 1 - : 0; + uint32_t is_valid = 0; + if (r < s->group.num_rows && row < s->col.num_values) { + if (valid_map != nullptr) { + is_valid = + (valid_map[(row + s->col.column_offset) / 32] >> ((row + s->col.column_offset) % 32)) & 1; + } else { + is_valid = 1; + } + } if (is_valid) { switch (dtype) { case dtype_int32: diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 193bda308e6..9fbcf605099 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -142,15 +142,6 @@ inline auto random_values(size_t size) return values; } -// Helper function to compare two tables -// void CUDF_TEST_EXPECT_TABLES_EQUAL(cudf::table_view const& lhs, cudf::table_view const& rhs) -//{ -// EXPECT_EQ(lhs.num_columns(), rhs.num_columns()); -// auto expected = lhs.begin(); -// auto result = rhs.begin(); -// while (result != rhs.end()) { CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected++, *result++); } -//} - struct SkipRowTest { int test_calls; SkipRowTest(void) : test_calls(0) {} @@ -624,7 +615,7 @@ TEST_F(OrcWriterTest, Slice) cudf::test::fixed_width_column_wrapper{{1, 2, 3, 4, 5}, {true, true, true, false, true}}; std::vector indices{2, 5}; std::vector result = cudf::slice(col, indices); - cudf::table_view tbl{{result[0]}}; + cudf::table_view tbl{result}; auto filepath = temp_env->get_temp_filepath("Slice.orc"); cudf_io::orc_writer_options out_opts = diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 9fd04e4425e..8ca836ea185 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -476,7 +476,6 @@ TEST_F(ParquetWriterTest, SlicedTable) expected_metadata.column_names.emplace_back("col_list"); expected_metadata.column_names.emplace_back("col_multi_level_list"); - // auto expected = table_view({col0, col1, col2, col3}); auto expected = table_view({col0, col1, col2, col3, col4}); auto expected_slice = cudf::slice(expected, {2, static_cast(num_rows) - 1}); @@ -823,7 +822,7 @@ TEST_F(ParquetWriterTest, Slice) cudf::test::fixed_width_column_wrapper{{1, 2, 3, 4, 5}, {true, true, true, false, true}}; std::vector indices{2, 5}; std::vector result = cudf::slice(col, indices); - cudf::table_view tbl{{result[0]}}; + cudf::table_view tbl{result}; auto filepath = temp_env->get_temp_filepath("Slice.parquet"); cudf_io::parquet_writer_options out_opts = From bca3a6983ef3149b8d46d331060d4ef86b4b3944 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Fri, 4 Dec 2020 16:13:40 -0500 Subject: [PATCH 04/10] CHANGELOG fix --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bab051aac5d..2c8df036e66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ## Improvements ## Bug Fixes +- PR #6889 Fix nullmask offset handling in parquet and orc writer # cuDF 0.17.0 (Date TBD) From 3e30457f783d62a71e05aa751b94c2aea80ed616 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Wed, 9 Dec 2020 14:23:35 -0500 Subject: [PATCH 05/10] Fix CHANGELOG --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c8df036e66..34ccb26f53d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,7 +47,6 @@ - PR #6817 Add support for scatter() on lists-of-struct columns - PR #6805 Implement `cudf::detail::copy_if` for `decimal32` and `decimal64` - PR #6847 Add a cmake find module for cuFile in JNI code -- PR #6483 Add `agg` function to aggregate dataframe using one or more operations - PR #6726 Support selecting different hash functions in hash_partition - PR #6619 Improve Dockerfile - PR #6831 Added parquet chunked writing ability for list columns From f0fe3ec805144aff2951fd51fa4c2971b4815cab Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Wed, 9 Dec 2020 14:28:20 -0500 Subject: [PATCH 06/10] Undo column_stats change --- cpp/src/io/statistics/column_stats.cu | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/statistics/column_stats.cu b/cpp/src/io/statistics/column_stats.cu index 0ac49a15d01..6740357e494 100644 --- a/cpp/src/io/statistics/column_stats.cu +++ b/cpp/src/io/statistics/column_stats.cu @@ -165,15 +165,12 @@ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Stora uint32_t r = i + t; uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; - uint32_t is_valid = 0; - if (r < s->group.num_rows && row < s->col.num_values) { - if (valid_map != nullptr) { - is_valid = - (valid_map[(row + s->col.column_offset) / 32] >> ((row + s->col.column_offset) % 32)) & 1; - } else { - is_valid = 1; - } - } + uint32_t is_valid = (r < s->group.num_rows && row < s->col.num_values) + ? (valid_map) ? (valid_map[(row + s->col.column_offset) / 32] >> + ((row + s->col.column_offset) % 32)) & + 1 + : 1 + : 0; if (is_valid) { switch (dtype) { case dtype_int32: From bf9875fb36aa244a769c6d965c88294ee7150ef4 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Wed, 9 Dec 2020 14:40:49 -0500 Subject: [PATCH 07/10] Style fix --- cpp/src/io/statistics/column_stats.cu | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/statistics/column_stats.cu b/cpp/src/io/statistics/column_stats.cu index 6740357e494..a7e730ce273 100644 --- a/cpp/src/io/statistics/column_stats.cu +++ b/cpp/src/io/statistics/column_stats.cu @@ -166,11 +166,11 @@ gatherIntColumnStats(stats_state_s *s, statistics_dtype dtype, uint32_t t, Stora uint32_t row = r + s->group.start_row; const uint32_t *valid_map = s->col.valid_map_base; uint32_t is_valid = (r < s->group.num_rows && row < s->col.num_values) - ? (valid_map) ? (valid_map[(row + s->col.column_offset) / 32] >> - ((row + s->col.column_offset) % 32)) & - 1 - : 1 - : 0; + ? (valid_map) ? (valid_map[(row + s->col.column_offset) / 32] >> + ((row + s->col.column_offset) % 32)) & + 1 + : 1 + : 0; if (is_valid) { switch (dtype) { case dtype_int32: From c6846bd1961819a558d07d68a702474b2b78097b Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Thu, 10 Dec 2020 15:23:59 -0500 Subject: [PATCH 08/10] Undo irrelevant changes pulled while merging --- python/cudf/cudf/core/dataframe.py | 135 +---------------------------- 1 file changed, 2 insertions(+), 133 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index ee0ac78ac2a..ef1bdb045a6 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -1,5 +1,5 @@ # Copyright (c) 2018-2020, NVIDIA CORPORATION. -from __future__ import division +from __future__ import division, print_function import inspect import itertools @@ -8,7 +8,7 @@ import sys import warnings from collections import OrderedDict, defaultdict -from collections.abc import Iterable, Mapping, Sequence +from collections.abc import Mapping, Sequence import cupy import numpy as np @@ -3714,137 +3714,6 @@ def sort_values( keep_index=not ignore_index, ) - def agg(self, aggs, axis=None): - """ - Aggregate using one or more operations over the specified axis. - - Parameters - ---------- - aggs : Iterable (set, list, string, tuple or dict) - Function to use for aggregating data. Accepted types are: - * string name, e.g. ``"sum"`` - * list of functions, e.g. ``["sum", "min", "max"]`` - * dict of axis labels specified operations per column, - e.g. ``{"a": "sum"}`` - - axis : not yet supported - - Returns - ------- - Aggregation Result : ``Series`` or ``DataFrame`` - When ``DataFrame.agg`` is called with single agg, - ``Series`` is returned. - When ``DataFrame.agg`` is called with several aggs, - ``DataFrame`` is returned. - - Notes - ----- - Difference from pandas: - * Not supporting: ``axis``, ``*args``, ``**kwargs`` - - """ - # TODO: Remove the typecasting below once issue #6846 is fixed - # link - dtypes = [self[col].dtype for col in self._column_names] - common_dtype = cudf.utils.dtypes.find_common_type(dtypes) - df_normalized = self.astype(common_dtype) - - if any(is_string_dtype(dt) for dt in dtypes): - raise NotImplementedError( - "DataFrame.agg() is not supported for " - "frames containing string columns" - ) - - if axis == 0 or axis is not None: - raise NotImplementedError("axis not implemented yet") - - if isinstance(aggs, Iterable) and not isinstance(aggs, (str, dict)): - result = cudf.DataFrame() - # TODO : Allow simultaneous pass for multi-aggregation as - # a future optimization - for agg in aggs: - result[agg] = getattr(df_normalized, agg)() - return result.T.sort_index(axis=1, ascending=True) - - elif isinstance(aggs, str): - if not hasattr(df_normalized, aggs): - raise AttributeError( - f"{aggs} is not a valid function for " - f"'DataFrame' object" - ) - result = cudf.DataFrame() - result[aggs] = getattr(df_normalized, aggs)() - result = result.iloc[:, 0] - result.name = None - return result - - elif isinstance(aggs, dict): - cols = aggs.keys() - if any([callable(val) for val in aggs.values()]): - raise NotImplementedError( - "callable parameter is not implemented yet" - ) - elif all([isinstance(val, str) for val in aggs.values()]): - result = cudf.Series(index=cols) - for key, value in aggs.items(): - col = df_normalized[key] - if not hasattr(col, value): - raise AttributeError( - f"{value} is not a valid function for " - f"'Series' object" - ) - result[key] = getattr(col, value)() - elif all([isinstance(val, Iterable) for val in aggs.values()]): - idxs = set() - for val in aggs.values(): - if isinstance(val, Iterable): - idxs.update(val) - elif isinstance(val, str): - idxs.add(val) - idxs = sorted(list(idxs)) - for agg in idxs: - if agg is callable: - raise NotImplementedError( - "callable parameter is not implemented yet" - ) - result = cudf.DataFrame(index=idxs, columns=cols) - for key in aggs.keys(): - col = df_normalized[key] - col_empty = column_empty( - len(idxs), dtype=col.dtype, masked=True - ) - ans = cudf.Series(data=col_empty, index=idxs) - if isinstance(aggs.get(key), Iterable): - # TODO : Allow simultaneous pass for multi-aggregation - # as a future optimization - for agg in aggs.get(key): - if not hasattr(col, agg): - raise AttributeError( - f"{agg} is not a valid function for " - f"'Series' object" - ) - ans[agg] = getattr(col, agg)() - elif isinstance(aggs.get(key), str): - if not hasattr(col, aggs.get(key)): - raise AttributeError( - f"{aggs.get(key)} is not a valid function for " - f"'Series' object" - ) - ans[aggs.get(key)] = getattr(col, agg)() - result[key] = ans - else: - raise ValueError("values of dict must be a string or list") - - return result - - elif callable(aggs): - raise NotImplementedError( - "callable parameter is not implemented yet" - ) - - else: - raise ValueError("argument must be a string, list or dict") - def nlargest(self, n, columns, keep="first"): """Get the rows of the DataFrame sorted by the n largest value of *columns* From ac3d28fe559b5c650aa0a2337b7671de416ca4fa Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Thu, 10 Dec 2020 15:31:35 -0500 Subject: [PATCH 09/10] Added missing newline in CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9af9b070778..eec28016e1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - PR #6750 Remove **kwargs from string/categorical methods ## Bug Fixes + - PR #6889 Fix nullmask offset handling in parquet and orc writer - PR #6922 Fix N/A detection for empty fields in CSV reader From d07f581c7669eb9cd17a5e1a0b2e7d07d52b1ad6 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Fri, 11 Dec 2020 15:41:07 -0500 Subject: [PATCH 10/10] Address PR comments --- CHANGELOG.md | 2 -- .../cudf/column/column_device_view.cuh | 23 +++++++++++++++ cpp/src/bitmask/null_mask.cu | 28 ++----------------- cpp/src/io/orc/stripe_enc.cu | 14 +++------- 4 files changed, 30 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 344e2a2a609..2064f0ea04d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,8 +18,6 @@ ## Bug Fixes -- PR #6889 Fix nullmask offset handling in parquet and orc writer - - PR #6922 Fix N/A detection for empty fields in CSV reader - PR #6912 Fix rmm_mode=managed parameter for gtests - PR #6945 Fix groupby agg/apply behaviour when no key columns are provided diff --git a/cpp/include/cudf/column/column_device_view.cuh b/cpp/include/cudf/column/column_device_view.cuh index 0f9bcfd5cd9..a41edea6f6c 100644 --- a/cpp/include/cudf/column/column_device_view.cuh +++ b/cpp/include/cudf/column/column_device_view.cuh @@ -795,6 +795,29 @@ __device__ inline numeric::decimal64 const column_device_view::element + word_index(source_begin_bit + + destination_word_index * detail::size_in_bits())) { + next_word = source[source_word_index + 1]; + } + return __funnelshift_r(curr_word, next_word, source_begin_bit); +} + /** * @brief value accessor of column without null bitmask * A unary functor returns scalar value at `id`. diff --git a/cpp/src/bitmask/null_mask.cu b/cpp/src/bitmask/null_mask.cu index 67a2b25cf2f..6a6e3f3f9f0 100644 --- a/cpp/src/bitmask/null_mask.cu +++ b/cpp/src/bitmask/null_mask.cu @@ -224,28 +224,6 @@ __global__ void count_set_bits_kernel(bitmask_type const *bitmask, if (threadIdx.x == 0) { atomicAdd(global_count, block_count); } } -/** - * @brief Convenience function to get offset word from a bitmask - * - * @see copy_offset_bitmask - * @see offset_bitmask_and - */ -__device__ bitmask_type get_mask_offset_word(bitmask_type const *__restrict__ source, - size_type destination_word_index, - size_type source_begin_bit, - size_type source_end_bit) -{ - size_type source_word_index = destination_word_index + word_index(source_begin_bit); - bitmask_type curr_word = source[source_word_index]; - bitmask_type next_word = 0; - if (word_index(source_end_bit) > - word_index(source_begin_bit + - destination_word_index * detail::size_in_bits())) { - next_word = source[source_word_index + 1]; - } - return __funnelshift_r(curr_word, next_word, source_begin_bit); -} - /** * For each range `[first_bit_indices[i], last_bit_indices[i])` * (where 0 <= i < `num_ranges`), count the number of bits set outside the range @@ -332,8 +310,8 @@ __global__ void copy_offset_bitmask(bitmask_type *__restrict__ destination, for (size_type destination_word_index = threadIdx.x + blockIdx.x * blockDim.x; destination_word_index < number_of_mask_words; destination_word_index += blockDim.x * gridDim.x) { - destination[destination_word_index] = - get_mask_offset_word(source, destination_word_index, source_begin_bit, source_end_bit); + destination[destination_word_index] = detail::get_mask_offset_word( + source, destination_word_index, source_begin_bit, source_end_bit); } } @@ -360,7 +338,7 @@ __global__ void offset_bitmask_and(bitmask_type *__restrict__ destination, destination_word_index += blockDim.x * gridDim.x) { bitmask_type destination_word = ~bitmask_type{0}; // All bits 1 for (size_type i = 0; i < num_sources; i++) { - destination_word &= get_mask_offset_word( + destination_word &= detail::get_mask_offset_word( source[i], destination_word_index, begin_bit[i], begin_bit[i] + source_size); } diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 49085314762..4fd5b765d11 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -15,6 +15,8 @@ */ #include +#include +#include #include #include #include "orc_common.h" @@ -715,16 +717,8 @@ __global__ void __launch_bounds__(block_size) size_type current_valid_offset = row + s->chunk.column_offset; size_type next_valid_offset = current_valid_offset + min(32, s->chunk.valid_rows); - size_type current_byte_index = current_valid_offset / 32; - size_type next_byte_index = next_valid_offset / 32; - - bitmask_type current_mask_word = s->chunk.valid_map_base[current_byte_index]; - bitmask_type next_mask_word = 0; - if (next_byte_index != current_byte_index) { - next_mask_word = s->chunk.valid_map_base[next_byte_index]; - } - bitmask_type mask = - __funnelshift_r(current_mask_word, next_mask_word, current_valid_offset); + bitmask_type mask = cudf::detail::get_mask_offset_word( + s->chunk.valid_map_base, 0, current_valid_offset, next_valid_offset); valid = 0xff & mask; } else { valid = 0xff;