From e41618841f6443ab6ebada9ee2c5b37de9f7fdb1 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Wed, 19 Jan 2022 07:21:07 -0600 Subject: [PATCH 1/6] Avoid index materialization when `DataFrame` is created with un-named `Series` objects (#10071) Fixes: #10070 This PR removed materializing of `index` incase of list-like un-named `Series` inputs are passed to `DataFrame` constructor. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Ashwin Srinath (https://github.com/shwina) URL: https://github.com/rapidsai/cudf/pull/10071 --- python/cudf/cudf/core/dataframe.py | 2 +- python/cudf/cudf/tests/test_dataframe.py | 9 +++++++-- python/dask_cudf/dask_cudf/tests/test_groupby.py | 1 + 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 1dddcb9e3af..a444d87b50c 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -6519,7 +6519,7 @@ def _get_union_of_series_names(series_list): else: names_list.append(series.name) if unnamed_count == len(series_list): - names_list = [*range(len(series_list))] + names_list = range(len(series_list)) return names_list diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index 61c3f428019..3e359335719 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -7665,9 +7665,14 @@ def test_dataframe_init_from_series_list(data, ignore_dtype, columns): actual = cudf.DataFrame(gd_data, columns=columns) if ignore_dtype: - assert_eq(expected.fillna(-1), actual.fillna(-1), check_dtype=False) + assert_eq( + expected.fillna(-1), + actual.fillna(-1), + check_dtype=False, + check_index_type=True, + ) else: - assert_eq(expected, actual) + assert_eq(expected, actual, check_index_type=True) @pytest.mark.parametrize( diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index fce9b773dac..274c6670426 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -17,6 +17,7 @@ @pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation): + np.random.seed(0) pdf = pd.DataFrame( { "x": np.random.randint(0, 5, size=10000), From 3aecce25701ea4d0f182d2a0f47237863ad15e69 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 19 Jan 2022 08:44:49 -0600 Subject: [PATCH 2/6] Update Java tests to expect DECIMAL128 from Arrow (#10073) After #9986 reading Arrow in libcudf now returns DECIMAL128 instead of DECIMAL64. This updates the Java tests to expect DECIMAL128 instead of DECIMAL64 by upcasting the decimal columns in the original table being round-tripped through Arrow before comparing the result. Authors: - Jason Lowe (https://github.com/jlowe) Approvers: - Rong Ou (https://github.com/rongou) - MithunR (https://github.com/mythrocks) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/10073 --- .../test/java/ai/rapids/cudf/TableTest.java | 70 +++++++++++++++++-- 1 file changed, 66 insertions(+), 4 deletions(-) diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 7fe69d2d7fc..18a0de77664 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -7192,6 +7192,64 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException { } } + /** Return a column where DECIMAL64 has been up-casted to DECIMAL128 */ + private ColumnVector castDecimal64To128(ColumnView c) { + DType dtype = c.getType(); + switch (dtype.getTypeId()) { + case DECIMAL64: + return c.castTo(DType.create(DType.DTypeEnum.DECIMAL128, dtype.getScale())); + case STRUCT: + case LIST: + { + ColumnView[] oldViews = c.getChildColumnViews(); + assert oldViews != null; + ColumnVector[] newChildren = new ColumnVector[oldViews.length]; + try { + for (int i = 0; i < oldViews.length; i++) { + newChildren[i] = castDecimal64To128(oldViews[i]); + } + try (ColumnView newView = new ColumnView(dtype, c.getRowCount(), + Optional.of(c.getNullCount()), c.getValid(), c.getOffsets(), newChildren)) { + return newView.copyToColumnVector(); + } + } finally { + for (ColumnView v : oldViews) { + v.close(); + } + for (ColumnVector v : newChildren) { + if (v != null) { + v.close(); + } + } + } + } + default: + if (c instanceof ColumnVector) { + return ((ColumnVector) c).incRefCount(); + } else { + return c.copyToColumnVector(); + } + } + } + + /** Return a new Table with any DECIMAL64 columns up-casted to DECIMAL128 */ + private Table castDecimal64To128(Table t) { + final int numCols = t.getNumberOfColumns(); + ColumnVector[] cols = new ColumnVector[numCols]; + try { + for (int i = 0; i < numCols; i++) { + cols[i] = castDecimal64To128(t.getColumn(i)); + } + return new Table(cols); + } finally { + for (ColumnVector c : cols) { + if (c != null) { + c.close(); + } + } + } + } + @Test void testArrowIPCWriteToFileWithNamesAndMetadata() throws IOException { File tempFile = File.createTempFile("test-names-metadata", ".arrow"); @@ -7203,7 +7261,9 @@ void testArrowIPCWriteToFileWithNamesAndMetadata() throws IOException { try (TableWriter writer = Table.writeArrowIPCChunked(options, tempFile.getAbsoluteFile())) { writer.write(table0); } - try (StreamedTableReader reader = Table.readArrowIPCChunked(tempFile)) { + // Reading from Arrow converts decimals to DECIMAL128 + try (StreamedTableReader reader = Table.readArrowIPCChunked(tempFile); + Table expected = castDecimal64To128(table0)) { boolean done = false; int count = 0; while (!done) { @@ -7211,7 +7271,7 @@ void testArrowIPCWriteToFileWithNamesAndMetadata() throws IOException { if (t == null) { done = true; } else { - assertTablesAreEqual(table0, t); + assertTablesAreEqual(expected, t); count++; } } @@ -7243,7 +7303,9 @@ void testArrowIPCWriteToBufferChunked() { writer.write(table0); writer.write(table0); } - try (StreamedTableReader reader = Table.readArrowIPCChunked(new MyBufferProvider(consumer))) { + // Reading from Arrow converts decimals to DECIMAL128 + try (StreamedTableReader reader = Table.readArrowIPCChunked(new MyBufferProvider(consumer)); + Table expected = castDecimal64To128(table0)) { boolean done = false; int count = 0; while (!done) { @@ -7251,7 +7313,7 @@ void testArrowIPCWriteToBufferChunked() { if (t == null) { done = true; } else { - assertTablesAreEqual(table0, t); + assertTablesAreEqual(expected, t); count++; } } From 8e88adc3c6bdac9717aaa434a009713f627fb39f Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Wed, 19 Jan 2022 08:58:23 -0600 Subject: [PATCH 3/6] Fix `columns` ordering issue in parquet reader (#10066) Fixes: #10062 This PR fixes issue where the order of `columns` and parquet metadata columns(i.e., `meta['columns']`) can differ and both are not guaranteed to be in the same order always. In this PR, removed the code that has this assumption and created a new dict that contains the metadata of columns which are later used to update the column metadata in dataframe. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu) - Devavret Makkar (https://github.com/devavret) URL: https://github.com/rapidsai/cudf/pull/10066 --- python/cudf/cudf/_lib/parquet.pyx | 13 ++++++++++--- python/cudf/cudf/tests/test_parquet.py | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 16873435e1d..8cb7dd942c1 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -200,12 +200,19 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, update_struct_field_names(df, c_out_table.metadata.schema_info) - # update the decimal precision of each column if meta is not None: - for col, col_meta in zip(column_names, meta["columns"]): + # Book keep each column metadata as the order + # of `meta["columns"]` and `column_names` are not + # guaranteed to be deterministic and same always. + meta_data_per_column = { + col_meta['name']: col_meta for col_meta in meta["columns"] + } + + # update the decimal precision of each column + for col in column_names: if is_decimal_dtype(df._data[col].dtype): df._data[col].dtype.precision = ( - col_meta["metadata"]["precision"] + meta_data_per_column[col]["metadata"]["precision"] ) # Set the index column diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index f239d88992a..519f24b7ca6 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2368,3 +2368,21 @@ def test_parquet_writer_row_group_size( math.ceil(num_rows / size_rows), math.ceil(8 * num_rows / size_bytes) ) assert expected_num_rows == row_groups + + +def test_parquet_reader_decimal_columns(): + df = cudf.DataFrame( + { + "col1": cudf.Series([1, 2, 3], dtype=cudf.Decimal64Dtype(10, 2)), + "col2": [10, 11, 12], + "col3": [12, 13, 14], + "col4": ["a", "b", "c"], + } + ) + buffer = BytesIO() + df.to_parquet(buffer) + + actual = cudf.read_parquet(buffer, columns=["col3", "col2", "col1"]) + expected = pd.read_parquet(buffer, columns=["col3", "col2", "col1"]) + + assert_eq(actual, expected) From f193d594b0b8aac6f4b0fe599f389281fd430579 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 19 Jan 2022 07:03:55 -0800 Subject: [PATCH 4/6] Include row group level stats when writing ORC files (#10041) Closes #9964 Encodes row group level stats with the rest and writes the encoded blobs into the protobuf, at the start of each stripe (other stats are in the file footer). Adds `put_bytes` to `ProtobufWriter` to optimize writing of buffers. Adds new struct to represent the encoded ORC statistics so they are separated by granularity level (instead of using a single vector). Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Mike Wilson (https://github.com/hyperbolic2346) - https://github.com/nvdbaranec URL: https://github.com/rapidsai/cudf/pull/10041 --- cpp/src/io/orc/orc.cpp | 65 +++++----- cpp/src/io/orc/orc.h | 138 ++++++++++++--------- cpp/src/io/orc/orc_common.h | 20 +-- cpp/src/io/orc/orc_field_writer.hpp | 36 ++---- cpp/src/io/orc/stats_enc.cu | 22 ++-- cpp/src/io/orc/stripe_init.cu | 12 +- cpp/src/io/orc/writer_impl.cu | 182 +++++++++++++++++----------- cpp/src/io/orc/writer_impl.hpp | 21 +++- 8 files changed, 278 insertions(+), 218 deletions(-) diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 44cea6169e4..f51fd28676e 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,10 +38,10 @@ uint32_t ProtobufReader::read_field_size(const uint8_t* end) void ProtobufReader::skip_struct_field(int t) { switch (t) { - case PB_TYPE_VARINT: get(); break; - case PB_TYPE_FIXED64: skip_bytes(8); break; - case PB_TYPE_FIXEDLEN: skip_bytes(get()); break; - case PB_TYPE_FIXED32: skip_bytes(4); break; + case ProtofType::VARINT: get(); break; + case ProtofType::FIXED64: skip_bytes(8); break; + case ProtofType::FIXEDLEN: skip_bytes(get()); break; + case ProtofType::FIXED32: skip_bytes(4); break; default: break; } } @@ -209,43 +209,54 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, int32_t data_ofs, int32_t data2_blk, int32_t data2_ofs, - TypeKind kind) + TypeKind kind, + ColStatsBlob const* stats) { size_t sz = 0, lpos; - putb(1 * 8 + PB_TYPE_FIXEDLEN); // 1:RowIndex.entry + put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:RowIndex.entry lpos = m_buf->size(); - putb(0xcd); // sz+2 - putb(1 * 8 + PB_TYPE_FIXEDLEN); // 1:positions[packed=true] - putb(0xcd); // sz + put_byte(0xcd); // sz+2 + put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true] + put_byte(0xcd); // sz if (present_blk >= 0) sz += put_uint(present_blk); if (present_ofs >= 0) { - sz += put_uint(present_ofs) + 2; - putb(0); // run pos = 0 - putb(0); // bit pos = 0 + sz += put_uint(present_ofs); + sz += put_byte(0); // run pos = 0 + sz += put_byte(0); // bit pos = 0 } if (data_blk >= 0) { sz += put_uint(data_blk); } if (data_ofs >= 0) { sz += put_uint(data_ofs); if (kind != STRING && kind != FLOAT && kind != DOUBLE && kind != DECIMAL) { - putb(0); // RLE run pos always zero (assumes RLE aligned with row index boundaries) - sz++; + // RLE run pos always zero (assumes RLE aligned with row index boundaries) + sz += put_byte(0); if (kind == BOOLEAN) { - putb(0); // bit position in byte, always zero - sz++; + // bit position in byte, always zero + sz += put_byte(0); } } } - if (kind != - INT) // INT kind can be passed in to bypass 2nd stream index (dictionary length streams) - { + // INT kind can be passed in to bypass 2nd stream index (dictionary length streams) + if (kind != INT) { if (data2_blk >= 0) { sz += put_uint(data2_blk); } if (data2_ofs >= 0) { - sz += put_uint(data2_ofs) + 1; - putb(0); // RLE run pos always zero (assumes RLE aligned with row index boundaries) + sz += put_uint(data2_ofs); + // RLE run pos always zero (assumes RLE aligned with row index boundaries) + sz += put_byte(0); } } - m_buf->data()[lpos] = (uint8_t)(sz + 2); + // size of the field 1 m_buf->data()[lpos + 2] = (uint8_t)(sz); + + if (stats != nullptr) { + sz += put_uint(encode_field_number(2)); // 2: statistics + // Statistics field contains its length as varint and dtype specific data (encoded on the GPU) + sz += put_uint(stats->size()); + sz += put_bytes(*stats); + } + + // size of the whole row index entry + m_buf->data()[lpos] = (uint8_t)(sz + 2); } size_t ProtobufWriter::write(const PostScript& s) @@ -256,7 +267,7 @@ size_t ProtobufWriter::write(const PostScript& s) if (s.compression != NONE) { w.field_uint(3, s.compressionBlockSize); } w.field_packed_uint(4, s.version); w.field_uint(5, s.metadataLength); - w.field_string(8000, s.magic); + w.field_blob(8000, s.magic); return w.value(); } @@ -300,8 +311,8 @@ size_t ProtobufWriter::write(const SchemaType& s) size_t ProtobufWriter::write(const UserMetadataItem& s) { ProtobufFieldWriter w(this); - w.field_string(1, s.name); - w.field_string(2, s.value); + w.field_blob(1, s.name); + w.field_blob(2, s.value); return w.value(); } @@ -310,7 +321,7 @@ size_t ProtobufWriter::write(const StripeFooter& s) ProtobufFieldWriter w(this); w.field_repeated_struct(1, s.streams); w.field_repeated_struct(2, s.columns); - if (s.writerTimezone != "") { w.field_string(3, s.writerTimezone); } + if (s.writerTimezone != "") { w.field_blob(3, s.writerTimezone); } return w.value(); } diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index 277c5d99f8f..4fa3480c90a 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -131,6 +131,67 @@ struct Metadata { std::vector stripeStats; }; +int inline constexpr encode_field_number(int field_number, ProtofType field_type) noexcept +{ + return (field_number * 8) + static_cast(field_type); +} + +namespace { +template ::value and + !std::is_enum::value>* = nullptr> +int static constexpr encode_field_number_base(int field_number) noexcept +{ + return encode_field_number(field_number, ProtofType::FIXEDLEN); +} + +template ::value or + std::is_enum::value>* = nullptr> +int static constexpr encode_field_number_base(int field_number) noexcept +{ + return encode_field_number(field_number, ProtofType::VARINT); +} + +template >* = nullptr> +int static constexpr encode_field_number_base(int field_number) noexcept +{ + return encode_field_number(field_number, ProtofType::FIXED32); +} + +template >* = nullptr> +int static constexpr encode_field_number_base(int field_number) noexcept +{ + return encode_field_number(field_number, ProtofType::FIXED64); +} +}; // namespace + +template < + typename T, + typename std::enable_if_t::value or std::is_same_v>* = nullptr> +int constexpr encode_field_number(int field_number) noexcept +{ + return encode_field_number_base(field_number); +} + +// containters change the field number encoding +template < + typename T, + typename std::enable_if_t>::value>* = nullptr> +int constexpr encode_field_number(int field_number) noexcept +{ + return encode_field_number_base(field_number); +} + +// optional fields don't change the field number encoding +template >::value>* = nullptr> +int constexpr encode_field_number(int field_number) noexcept +{ + return encode_field_number_base(field_number); +} + /** * @brief Class for parsing Orc's Protocol Buffers encoded metadata */ @@ -181,60 +242,6 @@ class ProtobufReader { template void function_builder(T& s, size_t maxlen, std::tuple& op); - template ::value and - !std::is_enum::value>* = nullptr> - int static constexpr encode_field_number_base(int field_number) noexcept - { - return (field_number * 8) + PB_TYPE_FIXEDLEN; - } - - template ::value or - std::is_enum::value>* = nullptr> - int static constexpr encode_field_number_base(int field_number) noexcept - { - return (field_number * 8) + PB_TYPE_VARINT; - } - - template >* = nullptr> - int static constexpr encode_field_number_base(int field_number) noexcept - { - return (field_number * 8) + PB_TYPE_FIXED32; - } - - template >* = nullptr> - int static constexpr encode_field_number_base(int field_number) noexcept - { - return (field_number * 8) + PB_TYPE_FIXED64; - } - - template ::value or std::is_same_v>* = - nullptr> - int static constexpr encode_field_number(int field_number) noexcept - { - return encode_field_number_base(field_number); - } - - // containters change the field number encoding - template >::value>* = nullptr> - int static constexpr encode_field_number(int field_number) noexcept - { - return encode_field_number_base(field_number); - } - - // optional fields don't change the field number encoding - template >::value>* = nullptr> - int static constexpr encode_field_number(int field_number) noexcept - { - return encode_field_number_base(field_number); - } - uint32_t read_field_size(const uint8_t* end); template ::value>* = nullptr> @@ -470,16 +477,28 @@ class ProtobufWriter { public: ProtobufWriter() { m_buf = nullptr; } ProtobufWriter(std::vector* output) { m_buf = output; } - void putb(uint8_t v) { m_buf->push_back(v); } + uint32_t put_byte(uint8_t v) + { + m_buf->push_back(v); + return 1; + } + template + uint32_t put_bytes(host_span values) + { + static_assert(sizeof(T) == 1); + m_buf->reserve(m_buf->size() + values.size()); + m_buf->insert(m_buf->end(), values.begin(), values.end()); + return values.size(); + } uint32_t put_uint(uint64_t v) { int l = 1; while (v > 0x7f) { - putb(static_cast(v | 0x80)); + put_byte(static_cast(v | 0x80)); v >>= 7; l++; } - putb(static_cast(v)); + put_byte(static_cast(v)); return l; } uint32_t put_int(int64_t v) @@ -493,7 +512,8 @@ class ProtobufWriter { int32_t data_ofs, int32_t data2_blk, int32_t data2_ofs, - TypeKind kind); + TypeKind kind, + ColStatsBlob const* stats); public: size_t write(const PostScript&); diff --git a/cpp/src/io/orc/orc_common.h b/cpp/src/io/orc/orc_common.h index f88a84b0bfc..6bee5be81ed 100644 --- a/cpp/src/io/orc/orc_common.h +++ b/cpp/src/io/orc/orc_common.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,15 +76,15 @@ enum ColumnEncodingKind : int8_t { DICTIONARY_V2 = 3, // the encoding is dictionary-based using RLE v2 }; -enum : uint8_t { // Protobuf field types - PB_TYPE_VARINT = 0, - PB_TYPE_FIXED64 = 1, - PB_TYPE_FIXEDLEN = 2, - PB_TYPE_START_GROUP = 3, // deprecated - PB_TYPE_END_GROUP = 4, // deprecated - PB_TYPE_FIXED32 = 5, - PB_TYPE_INVALID_6 = 6, - PB_TYPE_INVALID_7 = 7, +enum ProtofType : uint8_t { + VARINT = 0, + FIXED64 = 1, + FIXEDLEN = 2, + START_GROUP = 3, // deprecated + END_GROUP = 4, // deprecated + FIXED32 = 5, + INVALID_6 = 6, + INVALID_7 = 7, }; } // namespace orc diff --git a/cpp/src/io/orc/orc_field_writer.hpp b/cpp/src/io/orc/orc_field_writer.hpp index afcd99a2cd6..9714277b54d 100644 --- a/cpp/src/io/orc/orc_field_writer.hpp +++ b/cpp/src/io/orc/orc_field_writer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ struct ProtobufWriter::ProtobufFieldWriter { template void field_uint(int field, const T& value) { - struct_size += p->put_uint(field * 8 + PB_TYPE_VARINT); + struct_size += p->put_uint(encode_field_number(field)); struct_size += p->put_uint(static_cast(value)); } @@ -52,9 +52,9 @@ struct ProtobufWriter::ProtobufFieldWriter { template void field_packed_uint(int field, const std::vector& value) { - struct_size += p->put_uint(field * 8 + PB_TYPE_FIXEDLEN); + struct_size += p->put_uint(encode_field_number>(field)); auto lpos = p->m_buf->size(); - p->putb(0); + p->put_byte(0); auto sz = std::accumulate(value.begin(), value.end(), 0, [p = this->p](size_t sum, auto val) { return sum + p->put_uint(val); }); @@ -65,29 +65,15 @@ struct ProtobufWriter::ProtobufFieldWriter { (*(p->m_buf))[lpos] = static_cast(sz); } - /** - * @brief Function to write a string to the internal buffer - */ - void field_string(int field, const std::string& value) - { - size_t len = value.length(); - struct_size += p->put_uint(field * 8 + PB_TYPE_FIXEDLEN); - struct_size += p->put_uint(len) + len; - for (size_t i = 0; i < len; i++) - p->putb(value[i]); - } - /** * @brief Function to write a blob to the internal buffer */ template - void field_blob(int field, const std::vector& value) + void field_blob(int field, T const& values) { - size_t len = value.size(); - struct_size += p->put_uint(field * 8 + PB_TYPE_FIXEDLEN); - struct_size += p->put_uint(len) + len; - for (size_t i = 0; i < len; i++) - p->putb(value[i]); + struct_size += p->put_uint(encode_field_number(field)); + struct_size += p->put_uint(values.size()); + struct_size += p->put_bytes(values); } /** @@ -96,9 +82,9 @@ struct ProtobufWriter::ProtobufFieldWriter { template void field_struct(int field, const T& value) { - struct_size += p->put_uint((field)*8 + PB_TYPE_FIXEDLEN); + struct_size += p->put_uint(encode_field_number(field, ProtofType::FIXEDLEN)); auto lpos = p->m_buf->size(); - p->putb(0); + p->put_byte(0); auto sz = p->write(value); struct_size += sz + 1; for (; sz > 0x7f; sz >>= 7, struct_size++) @@ -112,7 +98,7 @@ struct ProtobufWriter::ProtobufFieldWriter { void field_repeated_string(int field, const std::vector& value) { for (const auto& elem : value) - field_string(field, elem); + field_blob(field, elem); } /** diff --git a/cpp/src/io/orc/stats_enc.cu b/cpp/src/io/orc/stats_enc.cu index 7441819d7cd..b377a2e7076 100644 --- a/cpp/src/io/orc/stats_enc.cu +++ b/cpp/src/io/orc/stats_enc.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -150,7 +150,7 @@ __device__ inline uint8_t* pb_encode_uint(uint8_t* p, uint64_t v) // Protobuf field encoding for unsigned int __device__ inline uint8_t* pb_put_uint(uint8_t* p, uint32_t id, uint64_t v) { - p[0] = id * 8 + PB_TYPE_VARINT; // NOTE: Assumes id < 16 + p[0] = id * 8 + static_cast(ProtofType::VARINT); // NOTE: Assumes id < 16 return pb_encode_uint(p + 1, v); } @@ -165,7 +165,7 @@ __device__ inline uint8_t* pb_put_int(uint8_t* p, uint32_t id, int64_t v) __device__ inline uint8_t* pb_put_packed_uint(uint8_t* p, uint32_t id, uint64_t v) { uint8_t* p2 = pb_encode_uint(p + 2, v); - p[0] = id * 8 + PB_TYPE_FIXEDLEN; + p[0] = id * 8 + ProtofType::FIXEDLEN; p[1] = static_cast(p2 - (p + 2)); return p2; } @@ -173,7 +173,7 @@ __device__ inline uint8_t* pb_put_packed_uint(uint8_t* p, uint32_t id, uint64_t // Protobuf field encoding for binary/string __device__ inline uint8_t* pb_put_binary(uint8_t* p, uint32_t id, const void* bytes, uint32_t len) { - p[0] = id * 8 + PB_TYPE_FIXEDLEN; + p[0] = id * 8 + ProtofType::FIXEDLEN; p = pb_encode_uint(p + 1, len); memcpy(p, bytes, len); return p + len; @@ -182,7 +182,7 @@ __device__ inline uint8_t* pb_put_binary(uint8_t* p, uint32_t id, const void* by // Protobuf field encoding for 64-bit raw encoding (double) __device__ inline uint8_t* pb_put_fixed64(uint8_t* p, uint32_t id, const void* raw64) { - p[0] = id * 8 + PB_TYPE_FIXED64; + p[0] = id * 8 + ProtofType::FIXED64; memcpy(p + 1, raw64, 8); return p + 9; } @@ -248,7 +248,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional sint64 sum = 3; // } if (s->chunk.has_minmax || s->chunk.has_sum) { - *cur = 2 * 8 + PB_TYPE_FIXEDLEN; + *cur = 2 * 8 + ProtofType::FIXEDLEN; cur += 2; if (s->chunk.has_minmax) { cur = pb_put_int(cur, 1, s->chunk.min_value.i_val); @@ -267,7 +267,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional double sum = 3; // } if (s->chunk.has_minmax) { - *cur = 3 * 8 + PB_TYPE_FIXEDLEN; + *cur = 3 * 8 + ProtofType::FIXEDLEN; cur += 2; cur = pb_put_fixed64(cur, 1, &s->chunk.min_value.fp_val); cur = pb_put_fixed64(cur, 2, &s->chunk.max_value.fp_val); @@ -286,7 +286,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) (pb_put_uint(cur, 1, s->chunk.min_value.str_val.length) - cur) + (pb_put_uint(cur, 2, s->chunk.max_value.str_val.length) - cur) + s->chunk.min_value.str_val.length + s->chunk.max_value.str_val.length; - cur[0] = 4 * 8 + PB_TYPE_FIXEDLEN; + cur[0] = 4 * 8 + ProtofType::FIXEDLEN; cur = pb_encode_uint(cur + 1, sz); cur = pb_put_binary( cur, 1, s->chunk.min_value.str_val.ptr, s->chunk.min_value.str_val.length); @@ -301,7 +301,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // repeated uint64 count = 1 [packed=true]; // } if (s->chunk.has_sum) { // Sum is equal to the number of 'true' values - cur[0] = 5 * 8 + PB_TYPE_FIXEDLEN; + cur[0] = 5 * 8 + ProtofType::FIXEDLEN; cur = pb_put_packed_uint(cur + 2, 1, s->chunk.sum.u_val); fld_start[1] = cur - (fld_start + 2); } @@ -325,7 +325,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional sint32 maximum = 2; // } if (s->chunk.has_minmax) { - cur[0] = 7 * 8 + PB_TYPE_FIXEDLEN; + cur[0] = 7 * 8 + ProtofType::FIXEDLEN; cur += 2; cur = pb_put_int(cur, 1, s->chunk.min_value.i_val); cur = pb_put_int(cur, 2, s->chunk.max_value.i_val); @@ -341,7 +341,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional sint64 maximumUtc = 4; // } if (s->chunk.has_minmax) { - cur[0] = 9 * 8 + PB_TYPE_FIXEDLEN; + cur[0] = 9 * 8 + ProtofType::FIXEDLEN; cur += 2; cur = pb_put_int(cur, 3, s->chunk.min_value.i_val); // minimumUtc cur = pb_put_int(cur, 4, s->chunk.max_value.i_val); // maximumUtc diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index be561530459..b197751d925 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -227,7 +227,7 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s, const uint8_t* start, const uint8_t* end) { - constexpr uint32_t pb_rowindexentry_id = static_cast(PB_TYPE_FIXEDLEN) + 8; + constexpr uint32_t pb_rowindexentry_id = ProtofType::FIXEDLEN + 8; const uint8_t* cur = start; row_entry_state_e state = NOT_FOUND; @@ -246,13 +246,13 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s, state = GET_LENGTH; } else { v &= 7; - if (v == PB_TYPE_FIXED64) + if (v == ProtofType::FIXED64) cur += 8; - else if (v == PB_TYPE_FIXED32) + else if (v == ProtofType::FIXED32) cur += 4; - else if (v == PB_TYPE_VARINT) + else if (v == ProtofType::VARINT) state = SKIP_VARINT; - else if (v == PB_TYPE_FIXEDLEN) + else if (v == ProtofType::FIXEDLEN) state = SKIP_FIXEDLEN; } break; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index b0e674c206f..b7264cb81ac 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1062,13 +1062,23 @@ void set_stat_desc_leaf_cols(device_span columns, [=] __device__(auto idx) { stat_desc[idx].leaf_column = &columns[idx]; }); } -std::vector> writer::impl::gather_statistic_blobs( - orc_table_view const& orc_table, file_segmentation const& segmentation) +writer::impl::encoded_statistics writer::impl::gather_statistic_blobs( + bool are_statistics_enabled, + orc_table_view const& orc_table, + file_segmentation const& segmentation) { - auto const num_stat_blobs = (1 + segmentation.num_stripes()) * orc_table.num_columns(); + auto const num_rowgroup_blobs = segmentation.rowgroups.count(); + auto const num_stripe_blobs = segmentation.num_stripes() * orc_table.num_columns(); + auto const num_file_blobs = orc_table.num_columns(); + auto const num_stat_blobs = num_rowgroup_blobs + num_stripe_blobs + num_file_blobs; + + if (not are_statistics_enabled or num_stat_blobs == 0) { return {}; } hostdevice_vector stat_desc(orc_table.num_columns(), stream); hostdevice_vector stat_merge(num_stat_blobs, stream); + auto rowgroup_stat_merge = stat_merge.host_ptr(); + auto stripe_stat_merge = rowgroup_stat_merge + num_rowgroup_blobs; + auto file_stat_merge = stripe_stat_merge + num_stripe_blobs; for (auto const& column : orc_table.columns) { stats_column_desc* desc = &stat_desc[column.index()]; @@ -1101,14 +1111,20 @@ std::vector> writer::impl::gather_statistic_blobs( desc->ts_scale = 0; } for (auto const& stripe : segmentation.stripes) { - auto grp = &stat_merge[column.index() * segmentation.num_stripes() + stripe.id]; - grp->col = stat_desc.device_ptr(column.index()); - grp->start_chunk = + auto& grp = stripe_stat_merge[column.index() * segmentation.num_stripes() + stripe.id]; + grp.col = stat_desc.device_ptr(column.index()); + grp.start_chunk = static_cast(column.index() * segmentation.num_rowgroups() + stripe.first); - grp->num_chunks = stripe.size; + grp.num_chunks = stripe.size; + for (auto rg_idx_it = stripe.cbegin(); rg_idx_it < stripe.cend(); ++rg_idx_it) { + auto& rg_grp = + rowgroup_stat_merge[column.index() * segmentation.num_rowgroups() + *rg_idx_it]; + rg_grp.col = stat_desc.device_ptr(column.index()); + rg_grp.start_chunk = *rg_idx_it; + rg_grp.num_chunks = 1; + } } - statistics_merge_group* col_stats = - &stat_merge[segmentation.num_stripes() * orc_table.num_columns() + column.index()]; + auto col_stats = &file_stat_merge[column.index()]; col_stats->col = stat_desc.device_ptr(column.index()); col_stats->start_chunk = static_cast(column.index() * segmentation.num_stripes()); col_stats->num_chunks = static_cast(segmentation.num_stripes()); @@ -1117,58 +1133,73 @@ std::vector> writer::impl::gather_statistic_blobs( stat_merge.host_to_device(stream); set_stat_desc_leaf_cols(orc_table.d_columns, stat_desc, stream); - auto const num_chunks = segmentation.rowgroups.count(); - rmm::device_uvector stat_chunks(num_chunks + num_stat_blobs, stream); - rmm::device_uvector stat_groups(num_chunks, stream); + rmm::device_uvector stat_chunks(num_stat_blobs, stream); + auto rowgroup_stat_chunks = stat_chunks.data(); + auto stripe_stat_chunks = rowgroup_stat_chunks + num_rowgroup_blobs; + auto file_stat_chunks = stripe_stat_chunks + num_stripe_blobs; + + rmm::device_uvector stat_groups(num_rowgroup_blobs, stream); gpu::orc_init_statistics_groups( stat_groups.data(), stat_desc.device_ptr(), segmentation.rowgroups, stream); detail::calculate_group_statistics( - stat_chunks.data(), stat_groups.data(), num_chunks, stream); + stat_chunks.data(), stat_groups.data(), num_rowgroup_blobs, stream); + detail::merge_group_statistics( - stat_chunks.data() + num_chunks, - stat_chunks.data(), - stat_merge.device_ptr(), - segmentation.num_stripes() * orc_table.num_columns(), + stripe_stat_chunks, + rowgroup_stat_chunks, + stat_merge.device_ptr(num_rowgroup_blobs), + num_stripe_blobs, stream); detail::merge_group_statistics( - stat_chunks.data() + num_chunks + segmentation.num_stripes() * orc_table.num_columns(), - stat_chunks.data() + num_chunks, - stat_merge.device_ptr(segmentation.num_stripes() * orc_table.num_columns()), - orc_table.num_columns(), + file_stat_chunks, + stripe_stat_chunks, + stat_merge.device_ptr(num_rowgroup_blobs + num_stripe_blobs), + num_file_blobs, stream); gpu::orc_init_statistics_buffersize( - stat_merge.device_ptr(), stat_chunks.data() + num_chunks, num_stat_blobs, stream); + stat_merge.device_ptr(), stat_chunks.data(), num_stat_blobs, stream); stat_merge.device_to_host(stream, true); hostdevice_vector blobs( stat_merge[num_stat_blobs - 1].start_chunk + stat_merge[num_stat_blobs - 1].num_chunks, stream); - gpu::orc_encode_statistics(blobs.device_ptr(), - stat_merge.device_ptr(), - stat_chunks.data() + num_chunks, - num_stat_blobs, - stream); + gpu::orc_encode_statistics( + blobs.device_ptr(), stat_merge.device_ptr(), stat_chunks.data(), num_stat_blobs, stream); stat_merge.device_to_host(stream); blobs.device_to_host(stream, true); - std::vector> stat_blobs(num_stat_blobs); - for (size_t i = 0; i < num_stat_blobs; i++) { - const uint8_t* stat_begin = blobs.host_ptr(stat_merge[i].start_chunk); - const uint8_t* stat_end = stat_begin + stat_merge[i].num_chunks; - stat_blobs[i].assign(stat_begin, stat_end); + std::vector rowgroup_blobs(num_rowgroup_blobs); + for (size_t i = 0; i < num_rowgroup_blobs; i++) { + auto const stat_begin = blobs.host_ptr(rowgroup_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + rowgroup_stat_merge[i].num_chunks; + rowgroup_blobs[i].assign(stat_begin, stat_end); + } + + std::vector stripe_blobs(num_stripe_blobs); + for (size_t i = 0; i < num_stripe_blobs; i++) { + auto const stat_begin = blobs.host_ptr(stripe_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + stripe_stat_merge[i].num_chunks; + stripe_blobs[i].assign(stat_begin, stat_end); } - return stat_blobs; + std::vector file_blobs(num_file_blobs); + for (size_t i = 0; i < num_file_blobs; i++) { + auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + file_stat_merge[i].num_chunks; + file_blobs[i].assign(stat_begin, stat_end); + } + return {std::move(rowgroup_blobs), std::move(stripe_blobs), std::move(file_blobs)}; } void writer::impl::write_index_stream(int32_t stripe_id, int32_t stream_id, host_span columns, - stripe_rowgroups const& rowgroups_range, + file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, host_span comp_out, + std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, ProtobufWriter* pbw) @@ -1226,9 +1257,18 @@ void writer::impl::write_index_stream(int32_t stripe_id, buffer_.resize((compression_kind_ != NONE) ? 3 : 0); // Add row index entries + auto const& rowgroups_range = segmentation.stripes[stripe_id]; std::for_each(rowgroups_range.cbegin(), rowgroups_range.cend(), [&](auto rowgroup) { - pbw->put_row_index_entry( - present.comp_pos, present.pos, data.comp_pos, data.pos, data2.comp_pos, data2.pos, kind); + pbw->put_row_index_entry(present.comp_pos, + present.pos, + data.comp_pos, + data.pos, + data2.comp_pos, + data2.pos, + kind, + (rg_stats.empty() or stream_id == 0) + ? nullptr + : (&rg_stats[column_id * segmentation.num_rowgroups() + rowgroup])); if (stream_id != 0) { const auto& strm = enc_streams[column_id][rowgroup]; @@ -1852,11 +1892,6 @@ void writer::impl::write(table_view const& table) auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data.streams, &strm_descs); if (num_rows > 0) { - // Gather column statistics - auto const column_stats = enable_statistics_ && table.num_columns() > 0 - ? gather_statistic_blobs(orc_table, segmentation) - : std::vector{}; - // Allocate intermediate output stream buffer size_t compressed_bfr_size = 0; size_t num_compressed_blocks = 0; @@ -1919,11 +1954,12 @@ void writer::impl::write(table_view const& table) ProtobufWriter pbw_(&buffer_); + auto const statistics = gather_statistic_blobs(enable_statistics_, orc_table, segmentation); + // Write stripes std::vector> write_tasks; for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { - auto const& rowgroups_range = segmentation.stripes[stripe_id]; - auto& stripe = stripes[stripe_id]; + auto& stripe = stripes[stripe_id]; stripe.offset = out_sink_->bytes_written(); @@ -1932,10 +1968,11 @@ void writer::impl::write(table_view const& table) write_index_stream(stripe_id, stream_id, orc_table.columns, - rowgroups_range, + segmentation, enc_data.streams, strm_descs, comp_out, + statistics.rowgroup_level, &stripe, &streams, &pbw_); @@ -1943,13 +1980,13 @@ void writer::impl::write(table_view const& table) // Column data consisting one or more separate streams for (auto const& strm_desc : strm_descs[stripe_id]) { - write_tasks.push_back( - write_data_stream(strm_desc, - enc_data.streams[strm_desc.column_id][rowgroups_range.first], - static_cast(compressed_data.data()), - stream_output.get(), - &stripe, - &streams)); + write_tasks.push_back(write_data_stream( + strm_desc, + enc_data.streams[strm_desc.column_id][segmentation.stripes[stripe_id].first], + static_cast(compressed_data.data()), + stream_output.get(), + &stripe, + &streams)); } // Write stripefooter consisting of stream information @@ -1980,37 +2017,34 @@ void writer::impl::write(table_view const& table) task.wait(); } - if (not column_stats.empty()) { - // File-level statistics - // NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls - if (single_write_mode) { - // First entry contains total number of rows - buffer_.resize(0); - pbw_.putb(1 * 8 + PB_TYPE_VARINT); - pbw_.put_uint(num_rows); - ff.statistics.reserve(1 + orc_table.num_columns()); - ff.statistics.emplace_back(std::move(buffer_)); - // Add file stats, stored after stripe stats in `column_stats` - ff.statistics.insert( - ff.statistics.end(), - std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(), - std::make_move_iterator(column_stats.end())); - } - // Stripe-level statistics + // File-level statistics + // NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls + if (single_write_mode and not statistics.file_level.empty()) { + // First entry contains total number of rows + buffer_.resize(0); + pbw_.put_uint(encode_field_number(1)); + pbw_.put_uint(num_rows); + ff.statistics.reserve(1 + orc_table.num_columns()); + ff.statistics.emplace_back(std::move(buffer_)); + // Add file stats, stored after stripe stats in `column_stats` + ff.statistics.insert(ff.statistics.end(), + std::make_move_iterator(statistics.file_level.begin()), + std::make_move_iterator(statistics.file_level.end())); + } + // Stripe-level statistics + if (not statistics.stripe_level.empty()) { size_t first_stripe = md.stripeStats.size(); md.stripeStats.resize(first_stripe + stripes.size()); for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) { md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns()); buffer_.resize(0); - pbw_.putb(1 * 8 + PB_TYPE_VARINT); + pbw_.put_uint(encode_field_number(1)); pbw_.put_uint(stripes[stripe_id].numberOfRows); md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_); for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) { size_t idx = stripes.size() * col_idx + stripe_id; - if (idx < column_stats.size()) { - md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] = - std::move(column_stats[idx]); - } + md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] = + std::move(statistics.stripe_level[idx]); } } } diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index d989721334e..2738a77e50a 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -284,17 +284,24 @@ class writer::impl { hostdevice_2dvector* enc_streams, hostdevice_2dvector* strm_desc); + struct encoded_statistics { + std::vector rowgroup_level; + std::vector stripe_level; + std::vector file_level; + }; + /** - * @brief Returns per-stripe and per-file column statistics encoded - * in ORC protobuf format. + * @brief Returns column statistics encoded in ORC protobuf format. * + * @param are_statistics_enabled True if statistics are to be included in the output file * @param orc_table Table information to be written * @param columns List of columns * @param segmentation stripe and rowgroup ranges * @return The statistic blobs */ - std::vector> gather_statistic_blobs(orc_table_view const& orc_table, - file_segmentation const& segmentation); + encoded_statistics gather_statistic_blobs(bool are_statistics_enabled, + orc_table_view const& orc_table, + file_segmentation const& segmentation); /** * @brief Writes the specified column's row index stream. @@ -302,10 +309,11 @@ class writer::impl { * @param[in] stripe_id Stripe's identifier * @param[in] stream_id Stream identifier (column id + 1) * @param[in] columns List of columns - * @param[in] rowgroups_range Indexes of rowgroups in the stripe + * @param[in] segmentation stripe and rowgroup ranges * @param[in] enc_streams List of encoder chunk streams [column][rowgroup] * @param[in] strm_desc List of stream descriptors * @param[in] comp_out Output status for compressed streams + * @param[in] rg_stats row group level statistics * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams * @param[in,out] pbw Protobuf writer @@ -313,10 +321,11 @@ class writer::impl { void write_index_stream(int32_t stripe_id, int32_t stream_id, host_span columns, - stripe_rowgroups const& rowgroups_range, + file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, host_span comp_out, + std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, ProtobufWriter* pbw); From e49084e03e5e089f1f8469440a12848749fed402 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 19 Jan 2022 09:04:12 -0600 Subject: [PATCH 5/6] Java bindings for mixed left, inner, and full joins (#9941) Depends on #9917. Adds Java bindings for the libcudf mixed join APIs. A new MixedJoinSize class was added to track the size information returned for mixed joins. Authors: - Jason Lowe (https://github.com/jlowe) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) URL: https://github.com/rapidsai/cudf/pull/9941 --- .../java/ai/rapids/cudf/MixedJoinSize.java | 43 +++ java/src/main/java/ai/rapids/cudf/Table.java | 235 +++++++++++- java/src/main/native/src/TableJni.cpp | 173 +++++++++ .../test/java/ai/rapids/cudf/TableTest.java | 338 ++++++++++++++++++ 4 files changed, 788 insertions(+), 1 deletion(-) create mode 100644 java/src/main/java/ai/rapids/cudf/MixedJoinSize.java diff --git a/java/src/main/java/ai/rapids/cudf/MixedJoinSize.java b/java/src/main/java/ai/rapids/cudf/MixedJoinSize.java new file mode 100644 index 00000000000..811f0b9a0b0 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/MixedJoinSize.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf; + +/** This class tracks size information associated with a mixed table join. */ +public final class MixedJoinSize implements AutoCloseable { + private final long outputRowCount; + // This is in flux, avoid exposing publicly until the dust settles. + private ColumnVector matches; + + MixedJoinSize(long outputRowCount, ColumnVector matches) { + this.outputRowCount = outputRowCount; + this.matches = matches; + } + + /** Return the number of output rows that would be generated from the mixed join */ + public long getOutputRowCount() { + return outputRowCount; + } + + ColumnVector getMatches() { + return matches; + } + + @Override + public synchronized void close() { + matches.close(); + } +} diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index dcd7953fa2e..a021ded4588 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -640,6 +640,36 @@ private static native long[] conditionalLeftAntiJoinGatherMapWithCount(long left long condition, long rowCount) throws CudfException; + private static native long[] mixedLeftJoinSize(long leftKeysTable, long rightKeysTable, + long leftConditionTable, long rightConditionTable, + long condition, boolean compareNullsEqual); + + private static native long[] mixedLeftJoinGatherMaps(long leftKeysTable, long rightKeysTable, + long leftConditionTable, long rightConditionTable, + long condition, boolean compareNullsEqual); + + private static native long[] mixedLeftJoinGatherMapsWithSize(long leftKeysTable, long rightKeysTable, + long leftConditionTable, long rightConditionTable, + long condition, boolean compareNullsEqual, + long outputRowCount, long matchesColumnView); + + private static native long[] mixedInnerJoinSize(long leftKeysTable, long rightKeysTable, + long leftConditionTable, long rightConditionTable, + long condition, boolean compareNullsEqual); + + private static native long[] mixedInnerJoinGatherMaps(long leftKeysTable, long rightKeysTable, + long leftConditionTable, long rightConditionTable, + long condition, boolean compareNullsEqual); + + private static native long[] mixedInnerJoinGatherMapsWithSize(long leftKeysTable, long rightKeysTable, + long leftConditionTable, long rightConditionTable, + long condition, boolean compareNullsEqual, + long outputRowCount, long matchesColumnView); + + private static native long[] mixedFullJoinGatherMaps(long leftKeysTable, long rightKeysTable, + long leftConditionTable, long rightConditionTable, + long condition, boolean compareNullsEqual); + private static native long[] crossJoin(long leftTable, long rightTable) throws CudfException; private static native long[] concatenate(long[] cudfTablePointers) throws CudfException; @@ -2221,7 +2251,7 @@ public static Table scatter(Scalar[] source, ColumnView scatterMap, Table target target.getNativeView(), checkBounds)); } - private GatherMap[] buildJoinGatherMaps(long[] gatherMapData) { + private static GatherMap[] buildJoinGatherMaps(long[] gatherMapData) { long bufferSize = gatherMapData[0]; long leftAddr = gatherMapData[1]; long leftHandle = gatherMapData[2]; @@ -2374,6 +2404,94 @@ public GatherMap[] conditionalLeftJoinGatherMaps(Table rightTable, return buildJoinGatherMaps(gatherMapData); } + /** + * Computes output size information for a left join between two tables using a mix of equality + * and inequality conditions. The entire join condition is assumed to be a logical AND of the + * equality condition and inequality condition. + * NOTE: It is the responsibility of the caller to close the resulting size information object + * or native resources can be leaked! + * @param leftKeys the left table's key columns for the equality condition + * @param rightKeys the right table's key columns for the equality condition + * @param leftConditional the left table's columns needed to evaluate the inequality condition + * @param rightConditional the right table's columns needed to evaluate the inequality condition + * @param condition the inequality condition of the join + * @param nullEquality whether nulls should compare as equal + * @return size information for the join + */ + public static MixedJoinSize mixedLeftJoinSize(Table leftKeys, Table rightKeys, + Table leftConditional, Table rightConditional, + CompiledExpression condition, + NullEquality nullEquality) { + long[] mixedSizeInfo = mixedLeftJoinSize( + leftKeys.getNativeView(), rightKeys.getNativeView(), + leftConditional.getNativeView(), rightConditional.getNativeView(), + condition.getNativeHandle(), nullEquality == NullEquality.EQUAL); + assert mixedSizeInfo.length == 2; + long outputRowCount = mixedSizeInfo[0]; + long matchesColumnHandle = mixedSizeInfo[1]; + return new MixedJoinSize(outputRowCount, new ColumnVector(matchesColumnHandle)); + } + + /** + * Computes the gather maps that can be used to manifest the result of a left join between + * two tables using a mix of equality and inequality conditions. The entire join condition is + * assumed to be a logical AND of the equality condition and inequality condition. + * Two {@link GatherMap} instances will be returned that can be used to gather + * the left and right tables, respectively, to produce the result of the left join. + * It is the responsibility of the caller to close the resulting gather map instances. + * @param leftKeys the left table's key columns for the equality condition + * @param rightKeys the right table's key columns for the equality condition + * @param leftConditional the left table's columns needed to evaluate the inequality condition + * @param rightConditional the right table's columns needed to evaluate the inequality condition + * @param condition the inequality condition of the join + * @param nullEquality whether nulls should compare as equal + * @return left and right table gather maps + */ + public static GatherMap[] mixedLeftJoinGatherMaps(Table leftKeys, Table rightKeys, + Table leftConditional, Table rightConditional, + CompiledExpression condition, + NullEquality nullEquality) { + long[] gatherMapData = mixedLeftJoinGatherMaps( + leftKeys.getNativeView(), rightKeys.getNativeView(), + leftConditional.getNativeView(), rightConditional.getNativeView(), + condition.getNativeHandle(), + nullEquality == NullEquality.EQUAL); + return buildJoinGatherMaps(gatherMapData); + } + + /** + * Computes the gather maps that can be used to manifest the result of a left join between + * two tables using a mix of equality and inequality conditions. The entire join condition is + * assumed to be a logical AND of the equality condition and inequality condition. + * Two {@link GatherMap} instances will be returned that can be used to gather + * the left and right tables, respectively, to produce the result of the left join. + * It is the responsibility of the caller to close the resulting gather map instances. + * This interface allows passing the size result from + * {@link #mixedLeftJoinSize(Table, Table, Table, Table, CompiledExpression, NullEquality)} + * when the output size was computed previously. + * @param leftKeys the left table's key columns for the equality condition + * @param rightKeys the right table's key columns for the equality condition + * @param leftConditional the left table's columns needed to evaluate the inequality condition + * @param rightConditional the right table's columns needed to evaluate the inequality condition + * @param condition the inequality condition of the join + * @param nullEquality whether nulls should compare as equal + * @param joinSize mixed join size result + * @return left and right table gather maps + */ + public static GatherMap[] mixedLeftJoinGatherMaps(Table leftKeys, Table rightKeys, + Table leftConditional, Table rightConditional, + CompiledExpression condition, + NullEquality nullEquality, + MixedJoinSize joinSize) { + long[] gatherMapData = mixedLeftJoinGatherMapsWithSize( + leftKeys.getNativeView(), rightKeys.getNativeView(), + leftConditional.getNativeView(), rightConditional.getNativeView(), + condition.getNativeHandle(), + nullEquality == NullEquality.EQUAL, + joinSize.getOutputRowCount(), joinSize.getMatches().getNativeView()); + return buildJoinGatherMaps(gatherMapData); + } + /** * Computes the gather maps that can be used to manifest the result of an inner equi-join between * two tables. It is assumed this table instance holds the key columns from the left table, and @@ -2514,6 +2632,94 @@ public GatherMap[] conditionalInnerJoinGatherMaps(Table rightTable, return buildJoinGatherMaps(gatherMapData); } + /** + * Computes output size information for an inner join between two tables using a mix of equality + * and inequality conditions. The entire join condition is assumed to be a logical AND of the + * equality condition and inequality condition. + * NOTE: It is the responsibility of the caller to close the resulting size information object + * or native resources can be leaked! + * @param leftKeys the left table's key columns for the equality condition + * @param rightKeys the right table's key columns for the equality condition + * @param leftConditional the left table's columns needed to evaluate the inequality condition + * @param rightConditional the right table's columns needed to evaluate the inequality condition + * @param condition the inequality condition of the join + * @param nullEquality whether nulls should compare as equal + * @return size information for the join + */ + public static MixedJoinSize mixedInnerJoinSize(Table leftKeys, Table rightKeys, + Table leftConditional, Table rightConditional, + CompiledExpression condition, + NullEquality nullEquality) { + long[] mixedSizeInfo = mixedInnerJoinSize( + leftKeys.getNativeView(), rightKeys.getNativeView(), + leftConditional.getNativeView(), rightConditional.getNativeView(), + condition.getNativeHandle(), nullEquality == NullEquality.EQUAL); + assert mixedSizeInfo.length == 2; + long outputRowCount = mixedSizeInfo[0]; + long matchesColumnHandle = mixedSizeInfo[1]; + return new MixedJoinSize(outputRowCount, new ColumnVector(matchesColumnHandle)); + } + + /** + * Computes the gather maps that can be used to manifest the result of an inner join between + * two tables using a mix of equality and inequality conditions. The entire join condition is + * assumed to be a logical AND of the equality condition and inequality condition. + * Two {@link GatherMap} instances will be returned that can be used to gather + * the left and right tables, respectively, to produce the result of the inner join. + * It is the responsibility of the caller to close the resulting gather map instances. + * @param leftKeys the left table's key columns for the equality condition + * @param rightKeys the right table's key columns for the equality condition + * @param leftConditional the left table's columns needed to evaluate the inequality condition + * @param rightConditional the right table's columns needed to evaluate the inequality condition + * @param condition the inequality condition of the join + * @param nullEquality whether nulls should compare as equal + * @return left and right table gather maps + */ + public static GatherMap[] mixedInnerJoinGatherMaps(Table leftKeys, Table rightKeys, + Table leftConditional, Table rightConditional, + CompiledExpression condition, + NullEquality nullEquality) { + long[] gatherMapData = mixedInnerJoinGatherMaps( + leftKeys.getNativeView(), rightKeys.getNativeView(), + leftConditional.getNativeView(), rightConditional.getNativeView(), + condition.getNativeHandle(), + nullEquality == NullEquality.EQUAL); + return buildJoinGatherMaps(gatherMapData); + } + + /** + * Computes the gather maps that can be used to manifest the result of an inner join between + * two tables using a mix of equality and inequality conditions. The entire join condition is + * assumed to be a logical AND of the equality condition and inequality condition. + * Two {@link GatherMap} instances will be returned that can be used to gather + * the left and right tables, respectively, to produce the result of the inner join. + * It is the responsibility of the caller to close the resulting gather map instances. + * This interface allows passing the size result from + * {@link #mixedInnerJoinSize(Table, Table, Table, Table, CompiledExpression, NullEquality)} + * when the output size was computed previously. + * @param leftKeys the left table's key columns for the equality condition + * @param rightKeys the right table's key columns for the equality condition + * @param leftConditional the left table's columns needed to evaluate the inequality condition + * @param rightConditional the right table's columns needed to evaluate the inequality condition + * @param condition the inequality condition of the join + * @param nullEquality whether nulls should compare as equal + * @param joinSize mixed join size result + * @return left and right table gather maps + */ + public static GatherMap[] mixedInnerJoinGatherMaps(Table leftKeys, Table rightKeys, + Table leftConditional, Table rightConditional, + CompiledExpression condition, + NullEquality nullEquality, + MixedJoinSize joinSize) { + long[] gatherMapData = mixedInnerJoinGatherMapsWithSize( + leftKeys.getNativeView(), rightKeys.getNativeView(), + leftConditional.getNativeView(), rightConditional.getNativeView(), + condition.getNativeHandle(), + nullEquality == NullEquality.EQUAL, + joinSize.getOutputRowCount(), joinSize.getMatches().getNativeView()); + return buildJoinGatherMaps(gatherMapData); + } + /** * Computes the gather maps that can be used to manifest the result of an full equi-join between * two tables. It is assumed this table instance holds the key columns from the left table, and @@ -2620,6 +2826,33 @@ public GatherMap[] conditionalFullJoinGatherMaps(Table rightTable, return buildJoinGatherMaps(gatherMapData); } + /** + * Computes the gather maps that can be used to manifest the result of a full join between + * two tables using a mix of equality and inequality conditions. The entire join condition is + * assumed to be a logical AND of the equality condition and inequality condition. + * Two {@link GatherMap} instances will be returned that can be used to gather + * the left and right tables, respectively, to produce the result of the full join. + * It is the responsibility of the caller to close the resulting gather map instances. + * @param leftKeys the left table's key columns for the equality condition + * @param rightKeys the right table's key columns for the equality condition + * @param leftConditional the left table's columns needed to evaluate the inequality condition + * @param rightConditional the right table's columns needed to evaluate the inequality condition + * @param condition the inequality condition of the join + * @param nullEquality whether nulls should compare as equal + * @return left and right table gather maps + */ + public static GatherMap[] mixedFullJoinGatherMaps(Table leftKeys, Table rightKeys, + Table leftConditional, Table rightConditional, + CompiledExpression condition, + NullEquality nullEquality) { + long[] gatherMapData = mixedFullJoinGatherMaps( + leftKeys.getNativeView(), rightKeys.getNativeView(), + leftConditional.getNativeView(), rightConditional.getNativeView(), + condition.getNativeHandle(), + nullEquality == NullEquality.EQUAL); + return buildJoinGatherMaps(gatherMapData); + } + private GatherMap buildSemiJoinGatherMap(long[] gatherMapData) { long bufferSize = gatherMapData[0]; long leftAddr = gatherMapData[1]; diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index 828d163fe07..03faf9be021 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include "cudf_jni_apis.hpp" @@ -886,6 +887,76 @@ jlongArray cond_join_gather_single_map(JNIEnv *env, jlong j_left_table, jlong j_ CATCH_STD(env, NULL); } +template +jlongArray mixed_join_size(JNIEnv *env, jlong j_left_keys, jlong j_right_keys, + jlong j_left_condition, jlong j_right_condition, jlong j_condition, + jboolean j_nulls_equal, T join_size_func) { + JNI_NULL_CHECK(env, j_left_keys, "left keys table is null", 0); + JNI_NULL_CHECK(env, j_right_keys, "right keys table is null", 0); + JNI_NULL_CHECK(env, j_left_condition, "left condition table is null", 0); + JNI_NULL_CHECK(env, j_right_condition, "right condition table is null", 0); + JNI_NULL_CHECK(env, j_condition, "condition is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const left_keys = reinterpret_cast(j_left_keys); + auto const right_keys = reinterpret_cast(j_right_keys); + auto const left_condition = reinterpret_cast(j_left_condition); + auto const right_condition = reinterpret_cast(j_right_condition); + auto const condition = reinterpret_cast(j_condition); + auto const nulls_equal = + j_nulls_equal ? cudf::null_equality::EQUAL : cudf::null_equality::UNEQUAL; + std::pair>> join_size_info = + join_size_func(*left_keys, *right_keys, *left_condition, *right_condition, + condition->get_top_expression(), nulls_equal); + if (join_size_info.second->size() > std::numeric_limits::max()) { + throw std::runtime_error("Too many values in device buffer to convert into a column"); + } + auto col_size = join_size_info.second->size(); + auto col_data = join_size_info.second->release(); + auto col = std::make_unique(cudf::data_type{cudf::type_id::INT32}, col_size, + std::move(col_data), rmm::device_buffer{}, 0); + cudf::jni::native_jlongArray result(env, 2); + result[0] = static_cast(join_size_info.first); + result[1] = reinterpret_cast(col.release()); + return result.get_jArray(); + } + CATCH_STD(env, NULL); +} + +template +jlongArray mixed_join_gather_maps(JNIEnv *env, jlong j_left_keys, jlong j_right_keys, + jlong j_left_condition, jlong j_right_condition, + jlong j_condition, jboolean j_nulls_equal, T join_func) { + JNI_NULL_CHECK(env, j_left_keys, "left keys table is null", 0); + JNI_NULL_CHECK(env, j_right_keys, "right keys table is null", 0); + JNI_NULL_CHECK(env, j_left_condition, "left condition table is null", 0); + JNI_NULL_CHECK(env, j_right_condition, "right condition table is null", 0); + JNI_NULL_CHECK(env, j_condition, "condition is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const left_keys = reinterpret_cast(j_left_keys); + auto const right_keys = reinterpret_cast(j_right_keys); + auto const left_condition = reinterpret_cast(j_left_condition); + auto const right_condition = reinterpret_cast(j_right_condition); + auto const condition = reinterpret_cast(j_condition); + auto const nulls_equal = + j_nulls_equal ? cudf::null_equality::EQUAL : cudf::null_equality::UNEQUAL; + return gather_maps_to_java(env, + join_func(*left_keys, *right_keys, *left_condition, *right_condition, + condition->get_top_expression(), nulls_equal)); + } + CATCH_STD(env, NULL); +} + +std::pair> +get_mixed_size_info(JNIEnv *env, jlong j_output_row_count, jlong j_matches_view) { + auto const row_count = static_cast(j_output_row_count); + auto const matches = reinterpret_cast(j_matches_view); + return std::pair>( + row_count, cudf::device_span(matches->template data(), + matches->size())); +} + // Returns a table view containing only the columns at the specified indices cudf::table_view const get_keys_table(cudf::table_view const *t, native_jintArray const &key_indices) { @@ -2227,6 +2298,50 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_conditionalLeftJoinGather }); } +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_mixedLeftJoinSize( + JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jlong j_left_condition, + jlong j_right_condition, jlong j_condition, jboolean j_nulls_equal) { + return cudf::jni::mixed_join_size( + env, j_left_keys, j_right_keys, j_left_condition, j_right_condition, j_condition, + j_nulls_equal, + [](cudf::table_view const &left_keys, cudf::table_view const &right_keys, + cudf::table_view const &left_condition, cudf::table_view const &right_condition, + cudf::ast::expression const &condition, cudf::null_equality nulls_equal) { + return cudf::mixed_left_join_size(left_keys, right_keys, left_condition, right_condition, + condition, nulls_equal); + }); +} + +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_mixedLeftJoinGatherMaps( + JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jlong j_left_condition, + jlong j_right_condition, jlong j_condition, jboolean j_nulls_equal) { + return cudf::jni::mixed_join_gather_maps( + env, j_left_keys, j_right_keys, j_left_condition, j_right_condition, j_condition, + j_nulls_equal, + [](cudf::table_view const &left_keys, cudf::table_view const &right_keys, + cudf::table_view const &left_condition, cudf::table_view const &right_condition, + cudf::ast::expression const &condition, cudf::null_equality nulls_equal) { + return cudf::mixed_left_join(left_keys, right_keys, left_condition, right_condition, + condition, nulls_equal); + }); +} + +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_mixedLeftJoinGatherMapsWithSize( + JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jlong j_left_condition, + jlong j_right_condition, jlong j_condition, jboolean j_nulls_equal, jlong j_output_row_count, + jlong j_matches_view) { + auto size_info = cudf::jni::get_mixed_size_info(env, j_output_row_count, j_matches_view); + return cudf::jni::mixed_join_gather_maps( + env, j_left_keys, j_right_keys, j_left_condition, j_right_condition, j_condition, + j_nulls_equal, + [&size_info](cudf::table_view const &left_keys, cudf::table_view const &right_keys, + cudf::table_view const &left_condition, cudf::table_view const &right_condition, + cudf::ast::expression const &condition, cudf::null_equality nulls_equal) { + return cudf::mixed_left_join(left_keys, right_keys, left_condition, right_condition, + condition, nulls_equal, size_info); + }); +} + JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_innerJoinGatherMaps( JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jboolean compare_nulls_equal) { return cudf::jni::join_gather_maps( @@ -2316,6 +2431,50 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_conditionalInnerJoinGathe }); } +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_mixedInnerJoinSize( + JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jlong j_left_condition, + jlong j_right_condition, jlong j_condition, jboolean j_nulls_equal) { + return cudf::jni::mixed_join_size( + env, j_left_keys, j_right_keys, j_left_condition, j_right_condition, j_condition, + j_nulls_equal, + [](cudf::table_view const &left_keys, cudf::table_view const &right_keys, + cudf::table_view const &left_condition, cudf::table_view const &right_condition, + cudf::ast::expression const &condition, cudf::null_equality nulls_equal) { + return cudf::mixed_inner_join_size(left_keys, right_keys, left_condition, right_condition, + condition, nulls_equal); + }); +} + +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_mixedInnerJoinGatherMaps( + JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jlong j_left_condition, + jlong j_right_condition, jlong j_condition, jboolean j_nulls_equal) { + return cudf::jni::mixed_join_gather_maps( + env, j_left_keys, j_right_keys, j_left_condition, j_right_condition, j_condition, + j_nulls_equal, + [](cudf::table_view const &left_keys, cudf::table_view const &right_keys, + cudf::table_view const &left_condition, cudf::table_view const &right_condition, + cudf::ast::expression const &condition, cudf::null_equality nulls_equal) { + return cudf::mixed_inner_join(left_keys, right_keys, left_condition, right_condition, + condition, nulls_equal); + }); +} + +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_mixedInnerJoinGatherMapsWithSize( + JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jlong j_left_condition, + jlong j_right_condition, jlong j_condition, jboolean j_nulls_equal, jlong j_output_row_count, + jlong j_matches_view) { + auto size_info = cudf::jni::get_mixed_size_info(env, j_output_row_count, j_matches_view); + return cudf::jni::mixed_join_gather_maps( + env, j_left_keys, j_right_keys, j_left_condition, j_right_condition, j_condition, + j_nulls_equal, + [&size_info](cudf::table_view const &left_keys, cudf::table_view const &right_keys, + cudf::table_view const &left_condition, cudf::table_view const &right_condition, + cudf::ast::expression const &condition, cudf::null_equality nulls_equal) { + return cudf::mixed_inner_join(left_keys, right_keys, left_condition, right_condition, + condition, nulls_equal, size_info); + }); +} + JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_fullJoinGatherMaps( JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jboolean compare_nulls_equal) { return cudf::jni::join_gather_maps( @@ -2374,6 +2533,20 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_conditionalFullJoinGather }); } +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_mixedFullJoinGatherMaps( + JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jlong j_left_condition, + jlong j_right_condition, jlong j_condition, jboolean j_nulls_equal) { + return cudf::jni::mixed_join_gather_maps( + env, j_left_keys, j_right_keys, j_left_condition, j_right_condition, j_condition, + j_nulls_equal, + [](cudf::table_view const &left_keys, cudf::table_view const &right_keys, + cudf::table_view const &left_condition, cudf::table_view const &right_condition, + cudf::ast::expression const &condition, cudf::null_equality nulls_equal) { + return cudf::mixed_full_join(left_keys, right_keys, left_condition, right_condition, + condition, nulls_equal); + }); +} + JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_leftSemiJoinGatherMap( JNIEnv *env, jclass, jlong j_left_keys, jlong j_right_keys, jboolean compare_nulls_equal) { return cudf::jni::join_gather_single_map( diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 18a0de77664..47c468de8c8 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -1578,6 +1578,144 @@ void testConditionalLeftJoinGatherMapsNullsWithCount() { } } + @Test + void testMixedLeftJoinGatherMaps() { + final int inv = Integer.MIN_VALUE; + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(2, 3, 9, 0, 1, 7, 4, 6, 5, 8) + .column(1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(6, 5, 9, 8, 10, 32) + .column(0, 1, 2, 3, 4, 5) + .column(7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + .column(inv, inv, 2, inv, inv, inv, inv, 0, 1, inv) + .build()) { + GatherMap[] maps = Table.mixedLeftJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.UNEQUAL); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + + @Test + void testMixedLeftJoinGatherMapsNulls() { + final int inv = Integer.MIN_VALUE; + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(null, 3, 9, 0, 1, 7, 4, null, 5, 8) + .column( 1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(null, 5, null, 8, 10, 32) + .column( 0, 1, 2, 3, 4, 5) + .column( 7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column(0, 1, 2, 3, 4, 5, 6, 7, 7, 8, 9) + .column(0, inv, inv, inv, inv, inv, inv, 0, 2, 1, inv) + .build()) { + GatherMap[] maps = Table.mixedLeftJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.EQUAL); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + + @Test + void testMixedLeftJoinGatherMapsWithSize() { + final int inv = Integer.MIN_VALUE; + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(2, 3, 9, 0, 1, 7, 4, 6, 5, 8) + .column(1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(6, 5, 9, 8, 10, 32) + .column(0, 1, 2, 3, 4, 5) + .column(7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + .column(inv, inv, 2, inv, inv, inv, inv, 0, 1, inv) + .build(); + MixedJoinSize sizeInfo = Table.mixedLeftJoinSize(leftKeys, rightKeys, left, right, + condition, NullEquality.UNEQUAL)) { + assertEquals(expected.getRowCount(), sizeInfo.getOutputRowCount()); + GatherMap[] maps = Table.mixedLeftJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.UNEQUAL, sizeInfo); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + + @Test + void testMixedLeftJoinGatherMapsNullsWithSize() { + final int inv = Integer.MIN_VALUE; + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(null, 3, 9, 0, 1, 7, 4, null, 5, 8) + .column( 1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(null, 5, null, 8, 10, 32) + .column( 0, 1, 2, 3, 4, 5) + .column( 7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column(0, 1, 2, 3, 4, 5, 6, 7, 7, 8, 9) + .column(0, inv, inv, inv, inv, inv, inv, 0, 2, 1, inv) + .build(); + MixedJoinSize sizeInfo = Table.mixedLeftJoinSize(leftKeys, rightKeys, left, right, + condition, NullEquality.EQUAL)) { + assertEquals(expected.getRowCount(), sizeInfo.getOutputRowCount()); + GatherMap[] maps = Table.mixedLeftJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.EQUAL, sizeInfo); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + @Test void testInnerJoinGatherMaps() { try (Table leftKeys = new Table.TestBuilder().column(2, 3, 9, 0, 1, 7, 4, 6, 5, 8).build(); @@ -1848,6 +1986,140 @@ void testConditionalInnerJoinGatherMapsNullsWithCount() { } } + @Test + void testMixedInnerJoinGatherMaps() { + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(2, 3, 9, 0, 1, 7, 4, 6, 5, 8) + .column(1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(6, 5, 9, 8, 10, 32) + .column(0, 1, 2, 3, 4, 5) + .column(7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column(2, 7, 8) + .column(2, 0, 1) + .build()) { + GatherMap[] maps = Table.mixedInnerJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.UNEQUAL); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + + @Test + void testMixedInnerJoinGatherMapsNulls() { + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(null, 3, 9, 0, 1, 7, 4, null, 5, 8) + .column( 1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(null, 5, null, 8, 10, 32) + .column( 0, 1, 2, 3, 4, 5) + .column( 7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column(0, 7, 7, 8) + .column(0, 0, 2, 1) + .build()) { + GatherMap[] maps = Table.mixedInnerJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.EQUAL); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + + @Test + void testMixedInnerJoinGatherMapsWithSize() { + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(2, 3, 9, 0, 1, 7, 4, 6, 5, 8) + .column(1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(6, 5, 9, 8, 10, 32) + .column(0, 1, 2, 3, 4, 5) + .column(7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column(2, 7, 8) + .column(2, 0, 1) + .build(); + MixedJoinSize sizeInfo = Table.mixedInnerJoinSize(leftKeys, rightKeys, left, right, + condition, NullEquality.UNEQUAL)) { + assertEquals(expected.getRowCount(), sizeInfo.getOutputRowCount()); + GatherMap[] maps = Table.mixedInnerJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.UNEQUAL, sizeInfo); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + + @Test + void testMixedInnerJoinGatherMapsNullsWithSize() { + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(null, 3, 9, 0, 1, 7, 4, null, 5, 8) + .column( 1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(null, 5, null, 8, 10, 32) + .column( 0, 1, 2, 3, 4, 5) + .column( 7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column(0, 7, 7, 8) + .column(0, 0, 2, 1) + .build(); + MixedJoinSize sizeInfo = Table.mixedInnerJoinSize(leftKeys, rightKeys, left, right, + condition, NullEquality.EQUAL)) { + assertEquals(expected.getRowCount(), sizeInfo.getOutputRowCount()); + GatherMap[] maps = Table.mixedInnerJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.EQUAL, sizeInfo); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + @Test void testFullJoinGatherMaps() { final int inv = Integer.MIN_VALUE; @@ -2042,6 +2314,72 @@ void testConditionalFullJoinGatherMapsNulls() { } } + @Test + void testMixedFullJoinGatherMaps() { + final int inv = Integer.MIN_VALUE; + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(2, 3, 9, 0, 1, 7, 4, 6, 5, 8) + .column(1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(6, 5, 9, 8, 10, 32) + .column(0, 1, 2, 3, 4, 5) + .column(7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column(inv, inv, inv, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + .column( 3, 4, 5, inv, inv, 2, inv, inv, inv, inv, 0, 1, inv) + .build()) { + GatherMap[] maps = Table.mixedFullJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.UNEQUAL); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + + @Test + void testMixedFullJoinGatherMapsNulls() { + final int inv = Integer.MIN_VALUE; + BinaryOperation expr = new BinaryOperation(BinaryOperator.GREATER, + new ColumnReference(1, TableReference.LEFT), + new ColumnReference(1, TableReference.RIGHT)); + try (CompiledExpression condition = expr.compile(); + Table left = new Table.TestBuilder() + .column(null, 3, 9, 0, 1, 7, 4, null, 5, 8) + .column( 1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .build(); + Table leftKeys = new Table(left.getColumn(0)); + Table right = new Table.TestBuilder() + .column(null, 5, null, 8, 10, 32) + .column( 0, 1, 2, 3, 4, 5) + .column( 7, 8, 9, 0, 1, 2).build(); + Table rightKeys = new Table(right.getColumn(0)); + Table expected = new Table.TestBuilder() + .column(inv, inv, inv, 0, 1, 2, 3, 4, 5, 6, 7, 7, 8, 9) + .column( 3, 4, 5, 0, inv, inv, inv, inv, inv, inv, 0, 2, 1, inv) + .build()) { + GatherMap[] maps = Table.mixedFullJoinGatherMaps(leftKeys, rightKeys, left, right, condition, + NullEquality.EQUAL); + try { + verifyJoinGatherMaps(maps, expected); + } finally { + for (GatherMap map : maps) { + map.close(); + } + } + } + } + @Test void testLeftSemiJoinGatherMap() { try (Table leftKeys = new Table.TestBuilder().column(2, 3, 9, 0, 1, 7, 4, 6, 5, 8).build(); From 8fd7dd26724e89d3d66eba865b93e3c2cd02dada Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 19 Jan 2022 07:04:48 -0800 Subject: [PATCH 6/6] Move `drop_duplicates`, `drop_na`, `_gather`, `take` to IndexFrame and create their `_base_index` counterparts (#9807) This PR is a follow up of #9558 (Part 1 of 3) One remaining problem from #9558 is that `Frame` is index agnostic, however the above functions needs to perform index-aware operations when building the list of columns to pass to libcudf. For example, to remove duplicates of `BaseIndex`, it should only construct the list with all its columns. But in a dataframe, it would need to pass in all data columns plus the index columns, while specifying the indices of the data columns to consider duplicates. This complicates for `_gather` which supports `keep_index` argument. This PR moves aforementioned functions to `IndexedFrames`, and create its counterparts in `_base_index`. A couple noteworthy changes: - Merge object added with two new arguments `l(r)hs_is_index` - DataFrame/Series.take `keep_index` argument is removed. For internal usage it's more advised to use `_gather`. (And thus this PR is labeled breaking) Authors: - Michael Wang (https://github.com/isVoid) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - https://github.com/brandon-b-miller URL: https://github.com/rapidsai/cudf/pull/9807 --- python/cudf/cudf/_lib/copying.pyx | 17 ++ python/cudf/cudf/core/_base_index.py | 131 +++++++++- python/cudf/cudf/core/column/column.py | 6 +- python/cudf/cudf/core/dataframe.py | 7 +- python/cudf/cudf/core/frame.py | 313 +----------------------- python/cudf/cudf/core/index.py | 10 + python/cudf/cudf/core/indexed_frame.py | 236 +++++++++++++++++- python/cudf/cudf/core/join/join.py | 53 ++-- python/cudf/cudf/core/multiindex.py | 7 + python/cudf/cudf/core/series.py | 5 - python/cudf/cudf/tests/test_dropna.py | 62 +++++ python/cudf/cudf/tests/test_indexing.py | 10 +- python/cudf/cudf/utils/utils.py | 18 -- 13 files changed, 505 insertions(+), 370 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 28bd78733a3..30157bc10ad 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -48,6 +48,23 @@ from cudf._lib.utils cimport ( ctypedef const scalar constscalar +def _gather_map_is_valid( + gather_map: "cudf.core.column.ColumnBase", + nrows: int, + check_bounds: bool, + nullify: bool, +) -> bool: + """Returns true if gather map is valid. + + A gather map is valid if empty or all indices are within the range + ``[-nrows, nrows)``, except when ``nullify`` is specifed. + """ + if not check_bounds or nullify or len(gather_map) == 0: + return True + gm_min, gm_max = minmax(gather_map) + return gm_min >= -nrows and gm_max < nrows + + def copy_column(Column input_column): """ Deep copies a column diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index 4f2614e843f..be5a1e7cc93 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -9,12 +9,18 @@ import pandas as pd import cudf -from cudf._lib.stream_compaction import apply_boolean_mask +from cudf._lib.copying import _gather_map_is_valid, gather +from cudf._lib.stream_compaction import ( + apply_boolean_mask, + drop_duplicates, + drop_nulls, +) from cudf._typing import DtypeObj from cudf.api.types import ( is_bool_dtype, is_dtype_equal, is_integer, + is_integer_dtype, is_list_like, is_scalar, ) @@ -1423,6 +1429,129 @@ def from_pandas(cls, index, nan_as_null=None): def _constructor_expanddim(self): return cudf.MultiIndex + def drop_duplicates( + self, keep="first", nulls_are_equal=True, + ): + """ + Drop duplicate rows in index. + + keep : {"first", "last", False}, default "first" + - 'first' : Drop duplicates except for the first occurrence. + - 'last' : Drop duplicates except for the last occurrence. + - ``False`` : Drop all duplicates. + nulls_are_equal: bool, default True + Null elements are considered equal to other null elements. + """ + + # This utilizes the fact that all `Index` is also a `Frame`. + result = self.__class__._from_columns( + drop_duplicates( + list(self._columns), + keys=range(len(self._data)), + keep=keep, + nulls_are_equal=nulls_are_equal, + ), + self._column_names, + ) + result._copy_type_metadata(self, include_index=False) + return result + + def dropna(self, how="any"): + """ + Drop null rows from Index. + + how : {"any", "all"}, default "any" + Specifies how to decide whether to drop a row. + "any" (default) drops rows containing at least + one null value. "all" drops only rows containing + *all* null values. + """ + + # This is to be consistent with IndexedFrame.dropna to handle nans + # as nulls by default + data_columns = [ + col.nans_to_nulls() + if isinstance(col, cudf.core.column.NumericalColumn) + else col + for col in self._columns + ] + + result = self.__class__._from_columns( + drop_nulls(data_columns, how=how, keys=range(len(data_columns)),), + self._column_names, + ) + result._copy_type_metadata(self, include_index=False) + return result + + def _gather(self, gather_map, nullify=False, check_bounds=True): + """Gather rows of index specified by indices in `gather_map`. + + Skip bounds checking if check_bounds is False. + Set rows to null for all out of bound indices if nullify is `True`. + """ + gather_map = cudf.core.column.as_column(gather_map) + + # TODO: For performance, the check and conversion of gather map should + # be done by the caller. This check will be removed in future release. + if not is_integer_dtype(gather_map.dtype): + gather_map = gather_map.astype("int32") + + if not _gather_map_is_valid( + gather_map, len(self), check_bounds, nullify + ): + raise IndexError("Gather map index is out of bounds.") + + result = self.__class__._from_columns( + gather(list(self._columns), gather_map, nullify=nullify), + self._column_names, + ) + + result._copy_type_metadata(self, include_index=False) + return result + + def take(self, indices, axis=0, allow_fill=True, fill_value=None): + """Return a new index containing the rows specified by *indices* + + Parameters + ---------- + indices : array-like + Array of ints indicating which positions to take. + axis : int + The axis over which to select values, always 0. + allow_fill : Unsupported + fill_value : Unsupported + + Returns + ------- + out : Index + New object with desired subset of rows. + + Examples + -------- + >>> idx = cudf.Index(['a', 'b', 'c', 'd', 'e']) + >>> idx.take([2, 0, 4, 3]) + StringIndex(['c' 'a' 'e' 'd'], dtype='object') + """ + + if axis not in {0, "index"}: + raise NotImplementedError( + "Gather along column axis is not yet supported." + ) + if not allow_fill or fill_value is not None: + raise NotImplementedError( + "`allow_fill` and `fill_value` are unsupported." + ) + + indices = cudf.core.column.as_column(indices) + if is_bool_dtype(indices): + warnings.warn( + "Calling take with a boolean array is deprecated and will be " + "removed in the future.", + FutureWarning, + ) + return self._apply_boolean_mask(indices) + return self._gather(indices) + def _apply_boolean_mask(self, boolean_mask): """Apply boolean mask to each row of `self`. diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 667ce0488cd..1a83194489d 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -77,7 +77,7 @@ pandas_dtypes_alias_to_cudf_alias, pandas_dtypes_to_np_dtypes, ) -from cudf.utils.utils import _gather_map_is_valid, mask_dtype +from cudf.utils.utils import mask_dtype T = TypeVar("T", bound="ColumnBase") @@ -702,7 +702,9 @@ def take( # be done by the caller. This check will be removed in future release. if not is_integer_dtype(indices.dtype): indices = indices.astype("int32") - if not _gather_map_is_valid(indices, len(self), check_bounds, nullify): + if not libcudf.copying._gather_map_is_valid( + indices, len(self), check_bounds, nullify + ): raise IndexError("Gather map index is out of bounds.") return libcudf.copying.gather([self], indices, nullify=nullify)[ diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index a444d87b50c..c686cd0fd39 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -2545,11 +2545,8 @@ def reset_index( inplace=inplace, ) - def take(self, indices, axis=0, keep_index=None): - axis = self._get_axis_from_axis_arg(axis) - if axis != 0: - raise NotImplementedError("Only axis=0 is supported.") - out = super().take(indices, keep_index) + def take(self, indices, axis=0): + out = super().take(indices) out.columns = self.columns return out diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 6e47c0f41cf..1d59d9f3b1a 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -30,10 +30,8 @@ from cudf._typing import ColumnLike, DataFrameOrSeries, Dtype from cudf.api.types import ( _is_non_decimal_numeric_dtype, - is_bool_dtype, is_decimal_dtype, is_dict_like, - is_integer_dtype, is_scalar, issubdtype, ) @@ -52,7 +50,6 @@ from cudf.utils import ioutils from cudf.utils.docutils import copy_docstring from cudf.utils.dtypes import find_common_type, is_column_like -from cudf.utils.utils import _gather_map_is_valid T = TypeVar("T", bound="Frame") @@ -72,6 +69,7 @@ class Frame: # TODO: Once all dependence on Frame having an index is removed, this # attribute should be moved to IndexedFrame. _index: Optional[cudf.core.index.BaseIndex] + _names: Optional[List] def __init__(self, data=None, index=None): if data is None: @@ -533,37 +531,6 @@ def _get_columns_by_index(self, indices): data, columns=data.to_pandas_index(), index=self.index ) - def _gather( - self, gather_map, keep_index=True, nullify=False, check_bounds=True - ): - """Gather rows of frame specified by indices in `gather_map`. - - Skip bounds checking if check_bounds is False. - Set rows to null for all out of bound indices if nullify is `True`. - """ - # TODO: `keep_index` argument is to be removed. - gather_map = cudf.core.column.as_column(gather_map) - - # TODO: For performance, the check and conversion of gather map should - # be done by the caller. This check will be removed in future release. - if not is_integer_dtype(gather_map.dtype): - gather_map = gather_map.astype("int32") - - if not _gather_map_is_valid( - gather_map, len(self), check_bounds, nullify - ): - raise IndexError("Gather map index is out of bounds.") - - result = self.__class__._from_columns( - libcudf.copying.gather( - list(self._columns), gather_map, nullify=nullify, - ), - self._column_names, - ) - - result._copy_type_metadata(self) - return result - def _as_column(self): """ _as_column : Converts a single columned Frame to Column @@ -1110,120 +1077,6 @@ def scatter_by_map( return result - def dropna( - self, axis=0, how="any", thresh=None, subset=None, inplace=False - ): - """ - Drops rows (or columns) containing nulls from a Column. - - Parameters - ---------- - axis : {0, 1}, optional - Whether to drop rows (axis=0, default) or columns (axis=1) - containing nulls. - how : {"any", "all"}, optional - Specifies how to decide whether to drop a row (or column). - any (default) drops rows (or columns) containing at least - one null value. all drops only rows (or columns) containing - *all* null values. - thresh: int, optional - If specified, then drops every row (or column) containing - less than `thresh` non-null values - subset : list, optional - List of columns to consider when dropping rows (all columns - are considered by default). Alternatively, when dropping - columns, subset is a list of rows to consider. - inplace : bool, default False - If True, do operation inplace and return None. - - Returns - ------- - Copy of the DataFrame with rows/columns containing nulls dropped. - - See also - -------- - cudf.DataFrame.isna - Indicate null values. - - cudf.DataFrame.notna - Indicate non-null values. - - cudf.DataFrame.fillna - Replace null values. - - cudf.Series.dropna - Drop null values. - - cudf.Index.dropna - Drop null indices. - - Examples - -------- - >>> import cudf - >>> df = cudf.DataFrame({"name": ['Alfred', 'Batman', 'Catwoman'], - ... "toy": ['Batmobile', None, 'Bullwhip'], - ... "born": [np.datetime64("1940-04-25"), - ... np.datetime64("NaT"), - ... np.datetime64("NaT")]}) - >>> df - name toy born - 0 Alfred Batmobile 1940-04-25 00:00:00 - 1 Batman - 2 Catwoman Bullwhip - - Drop the rows where at least one element is null. - - >>> df.dropna() - name toy born - 0 Alfred Batmobile 1940-04-25 - - Drop the columns where at least one element is null. - - >>> df.dropna(axis='columns') - name - 0 Alfred - 1 Batman - 2 Catwoman - - Drop the rows where all elements are null. - - >>> df.dropna(how='all') - name toy born - 0 Alfred Batmobile 1940-04-25 00:00:00 - 1 Batman - 2 Catwoman Bullwhip - - Keep only the rows with at least 2 non-null values. - - >>> df.dropna(thresh=2) - name toy born - 0 Alfred Batmobile 1940-04-25 00:00:00 - 2 Catwoman Bullwhip - - Define in which columns to look for null values. - - >>> df.dropna(subset=['name', 'born']) - name toy born - 0 Alfred Batmobile 1940-04-25 - - Keep the DataFrame with valid entries in the same variable. - - >>> df.dropna(inplace=True) - >>> df - name toy born - 0 Alfred Batmobile 1940-04-25 - """ - if axis == 0: - result = self._drop_na_rows( - how=how, subset=subset, thresh=thresh, drop_nan=True - ) - else: - result = self._drop_na_columns( - how=how, subset=subset, thresh=thresh - ) - - return self._mimic_inplace(result, inplace=inplace) - def fillna( self, value=None, method=None, axis=None, inplace=False, limit=None ): @@ -1370,70 +1223,6 @@ def fillna( return self._mimic_inplace(result, inplace=inplace) - def ffill(self): - return self.fillna(method="ffill") - - def bfill(self): - return self.fillna(method="bfill") - - def _drop_na_rows( - self, how="any", subset=None, thresh=None, drop_nan=False - ): - """ - Drops null rows from `self`. - - how : {"any", "all"}, optional - Specifies how to decide whether to drop a row. - any (default) drops rows containing at least - one null value. all drops only rows containing - *all* null values. - subset : list, optional - List of columns to consider when dropping rows. - thresh: int, optional - If specified, then drops every row containing - less than `thresh` non-null values. - """ - if subset is None: - subset = self._column_names - elif ( - not np.iterable(subset) - or isinstance(subset, str) - or isinstance(subset, tuple) - and subset in self._data.names - ): - subset = (subset,) - diff = set(subset) - set(self._data) - if len(diff) != 0: - raise KeyError(f"columns {diff} do not exist") - - if len(subset) == 0: - return self.copy(deep=True) - - frame = self.copy(deep=False) - if drop_nan: - for name, col in frame._data.items(): - if name in subset and isinstance( - col, cudf.core.column.NumericalColumn - ): - frame._data[name] = col.nans_to_nulls() - else: - frame._data[name] = col - - result = self.__class__._from_columns( - libcudf.stream_compaction.drop_nulls( - list(self._index._data.columns + frame._columns), - how=how, - keys=self._positions_from_column_names( - subset, offset_by_index_columns=True - ), - thresh=thresh, - ), - self._column_names, - self._index.names, - ) - result._copy_type_metadata(frame) - return result - def _drop_na_columns(self, how="any", subset=None, thresh=None): """ Drop columns containing nulls @@ -2129,34 +1918,6 @@ def to_arrow(self): {name: col.to_arrow() for name, col in self._data.items()} ) - def drop_duplicates( - self, keep="first", nulls_are_equal=True, - ): - """ - Drop duplicate rows in frame. - - keep : ["first", "last", False], default "first" - "first" will keep the first duplicate entry, "last" will keep the - last duplicate entry, and False will drop all duplicates. - nulls_are_equal: bool, default True - Null elements are considered equal to other null elements. - """ - - result = self.__class__._from_columns( - libcudf.stream_compaction.drop_duplicates( - list(self._columns), - keys=range(len(self._columns)), - keep=keep, - nulls_are_equal=nulls_are_equal, - ), - self._column_names, - ) - # TODO: _copy_type_metadata is a common pattern to apply after the - # roundtrip from libcudf. We should build this into a factory function - # to increase reusability. - result._copy_type_metadata(self) - return result - def _positions_from_column_names(self, column_names): """Map each column name into their positions in the frame. @@ -2872,74 +2633,6 @@ def _get_sorted_inds(self, by=None, ascending=True, na_position="last"): return libcudf.sort.order_by(to_sort, ascending, na_position) - def take(self, indices, keep_index=None): - """Return a new object containing the rows specified by *positions* - - Parameters - ---------- - indices : array-like - Array of ints indicating which positions to take. - keep_index : bool, default True - Whether to retain the index in result or not. - - Returns - ------- - out : Series or DataFrame or Index - New object with desired subset of rows. - - Examples - -------- - **Series** - >>> s = cudf.Series(['a', 'b', 'c', 'd', 'e']) - >>> s.take([2, 0, 4, 3]) - 2 c - 0 a - 4 e - 3 d - dtype: object - - **DataFrame** - - >>> a = cudf.DataFrame({'a': [1.0, 2.0, 3.0], - ... 'b': cudf.Series(['a', 'b', 'c'])}) - >>> a.take([0, 2, 2]) - a b - 0 1.0 a - 2 3.0 c - 2 3.0 c - >>> a.take([True, False, True]) - a b - 0 1.0 a - 2 3.0 c - - **Index** - - >>> idx = cudf.Index(['a', 'b', 'c', 'd', 'e']) - >>> idx.take([2, 0, 4, 3]) - StringIndex(['c' 'a' 'e' 'd'], dtype='object') - """ - # TODO: When we remove keep_index we should introduce the axis - # parameter. We could also introduce is_copy, but that's already - # deprecated in pandas so it's probably unnecessary. We also need to - # introduce Index.take's allow_fill and fill_value parameters. - if keep_index is not None: - warnings.warn( - "keep_index is deprecated and will be removed in the future.", - FutureWarning, - ) - else: - keep_index = True - - indices = as_column(indices) - if is_bool_dtype(indices): - warnings.warn( - "Calling take with a boolean array is deprecated and will be " - "removed in the future.", - FutureWarning, - ) - return self._apply_boolean_mask(indices) - return self._gather(indices, keep_index=keep_index) - def sin(self): """ Get Trigonometric sine, element-wise. @@ -3629,6 +3322,8 @@ def _merge( elif how in {"leftsemi", "leftanti"}: merge_cls = MergeSemi + # TODO: the two isinstance checks below indicates that `_merge` should + # not be defined in `Frame`, but in `IndexedFrame`. return merge_cls( lhs, rhs, @@ -3637,6 +3332,8 @@ def _merge( right_on=right_on, left_index=left_index, right_index=right_index, + lhs_is_index=isinstance(lhs, cudf.core._base_index.BaseIndex), + rhs_is_index=isinstance(rhs, cudf.core._base_index.BaseIndex), how=how, sort=sort, indicator=indicator, diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 1e493708415..91c7a740699 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -681,6 +681,16 @@ def _intersection(self, other, sort=False): return new_index + def _gather(self, gather_map, nullify=False, check_bounds=True): + return Int64Index._from_columns( + [self._values.take(gather_map, nullify, check_bounds)], [self.name] + ) + + def _apply_boolean_mask(self, boolean_mask): + return Int64Index._from_columns( + [self._values.apply_boolean_mask(boolean_mask)], [self.name] + ) + # Patch in all binops and unary ops, which bypass __getattr__ on the instance # and prevent the above overload from working. diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 7c5783bf637..9458057894a 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -29,7 +29,7 @@ from cudf.core.frame import Frame from cudf.core.index import Index, RangeIndex, _index_from_columns from cudf.core.multiindex import MultiIndex -from cudf.utils.utils import _gather_map_is_valid, cached_property +from cudf.utils.utils import cached_property doc_reset_index_template = """ Reset the index of the {klass}, or a level of it. @@ -551,7 +551,7 @@ def _gather( if not is_integer_dtype(gather_map.dtype): gather_map = gather_map.astype("int32") - if not _gather_map_is_valid( + if not libcudf.copying._gather_map_is_valid( gather_map, len(self), check_bounds, nullify ): raise IndexError("Gather map index is out of bounds.") @@ -577,8 +577,8 @@ def _positions_from_column_names( """Map each column name into their positions in the frame. Return positions of the provided column names, offset by the number of - index columns `offset_by_index_columns` is True. The order of indices - returned corresponds to the column order in this Frame. + index columns if `offset_by_index_columns` is True. The order of + indices returned corresponds to the column order in this Frame. """ num_index_columns = ( len(self._index._data) if offset_by_index_columns else 0 @@ -854,11 +854,12 @@ def _n_largest_or_smallest(self, largest, n, columns, keep): n = 0 # argsort the `by` column - return self.take( + return self._gather( self._get_columns_by_label(columns)._get_sorted_inds( ascending=not largest )[:n], keep_index=True, + check_bounds=False, ) elif keep == "last": indices = self._get_columns_by_label(columns)._get_sorted_inds( @@ -870,7 +871,7 @@ def _n_largest_or_smallest(self, largest, n, columns, keep): indices = indices[0:0] else: indices = indices[: -n - 1 : -1] - return self.take(indices, keep_index=True) + return self._gather(indices, keep_index=True, check_bounds=False) else: raise ValueError('keep must be either "first", "last"') @@ -1198,6 +1199,176 @@ def resample( else cudf.core.resample.DataFrameResampler(self, by=by) ) + def dropna( + self, axis=0, how="any", thresh=None, subset=None, inplace=False + ): + """ + Drop rows (or columns) containing nulls from a Column. + + Parameters + ---------- + axis : {0, 1}, optional + Whether to drop rows (axis=0, default) or columns (axis=1) + containing nulls. + how : {"any", "all"}, optional + Specifies how to decide whether to drop a row (or column). + any (default) drops rows (or columns) containing at least + one null value. all drops only rows (or columns) containing + *all* null values. + thresh: int, optional + If specified, then drops every row (or column) containing + less than `thresh` non-null values + subset : list, optional + List of columns to consider when dropping rows (all columns + are considered by default). Alternatively, when dropping + columns, subset is a list of rows to consider. + inplace : bool, default False + If True, do operation inplace and return None. + + Returns + ------- + Copy of the DataFrame with rows/columns containing nulls dropped. + + See also + -------- + cudf.DataFrame.isna + Indicate null values. + + cudf.DataFrame.notna + Indicate non-null values. + + cudf.DataFrame.fillna + Replace null values. + + cudf.Series.dropna + Drop null values. + + cudf.Index.dropna + Drop null indices. + + Examples + -------- + >>> import cudf + >>> df = cudf.DataFrame({"name": ['Alfred', 'Batman', 'Catwoman'], + ... "toy": ['Batmobile', None, 'Bullwhip'], + ... "born": [np.datetime64("1940-04-25"), + ... np.datetime64("NaT"), + ... np.datetime64("NaT")]}) + >>> df + name toy born + 0 Alfred Batmobile 1940-04-25 00:00:00 + 1 Batman + 2 Catwoman Bullwhip + + Drop the rows where at least one element is null. + + >>> df.dropna() + name toy born + 0 Alfred Batmobile 1940-04-25 + + Drop the columns where at least one element is null. + + >>> df.dropna(axis='columns') + name + 0 Alfred + 1 Batman + 2 Catwoman + + Drop the rows where all elements are null. + + >>> df.dropna(how='all') + name toy born + 0 Alfred Batmobile 1940-04-25 00:00:00 + 1 Batman + 2 Catwoman Bullwhip + + Keep only the rows with at least 2 non-null values. + + >>> df.dropna(thresh=2) + name toy born + 0 Alfred Batmobile 1940-04-25 00:00:00 + 2 Catwoman Bullwhip + + Define in which columns to look for null values. + + >>> df.dropna(subset=['name', 'born']) + name toy born + 0 Alfred Batmobile 1940-04-25 + + Keep the DataFrame with valid entries in the same variable. + + >>> df.dropna(inplace=True) + >>> df + name toy born + 0 Alfred Batmobile 1940-04-25 + """ + if axis == 0: + result = self._drop_na_rows( + how=how, subset=subset, thresh=thresh, drop_nan=True + ) + else: + result = self._drop_na_columns( + how=how, subset=subset, thresh=thresh + ) + + return self._mimic_inplace(result, inplace=inplace) + + def _drop_na_rows( + self, how="any", subset=None, thresh=None, drop_nan=False + ): + """ + Drop null rows from `self`. + + how : {"any", "all"}, optional + Specifies how to decide whether to drop a row. + any (default) drops rows containing at least + one null value. all drops only rows containing + *all* null values. + subset : list, optional + List of columns to consider when dropping rows. + thresh: int, optional + If specified, then drops every row containing + less than `thresh` non-null values. + """ + if subset is None: + subset = self._column_names + elif ( + not np.iterable(subset) + or isinstance(subset, str) + or isinstance(subset, tuple) + and subset in self._data.names + ): + subset = (subset,) + diff = set(subset) - set(self._data) + if len(diff) != 0: + raise KeyError(f"columns {diff} do not exist") + + if len(subset) == 0: + return self.copy(deep=True) + + if drop_nan: + data_columns = [ + col.nans_to_nulls() + if isinstance(col, cudf.core.column.NumericalColumn) + else col + for col in self._columns + ] + + result = self.__class__._from_columns( + libcudf.stream_compaction.drop_nulls( + list(self._index._data.columns) + data_columns, + how=how, + keys=self._positions_from_column_names( + subset, offset_by_index_columns=True + ), + thresh=thresh, + ), + self._column_names, + self._index.names, + ) + result._copy_type_metadata(self) + return result + def _apply_boolean_mask(self, boolean_mask): """Apply boolean mask to each row of `self`. @@ -1217,6 +1388,59 @@ def _apply_boolean_mask(self, boolean_mask): result._copy_type_metadata(self) return result + def take(self, indices, axis=0): + """Return a new frame containing the rows specified by *indices*. + + Parameters + ---------- + indices : array-like + Array of ints indicating which positions to take. + axis : Unsupported + + Returns + ------- + out : Series or DataFrame + New object with desired subset of rows. + + Examples + -------- + **Series** + >>> s = cudf.Series(['a', 'b', 'c', 'd', 'e']) + >>> s.take([2, 0, 4, 3]) + 2 c + 0 a + 4 e + 3 d + dtype: object + + **DataFrame** + + >>> a = cudf.DataFrame({'a': [1.0, 2.0, 3.0], + ... 'b': cudf.Series(['a', 'b', 'c'])}) + >>> a.take([0, 2, 2]) + a b + 0 1.0 a + 2 3.0 c + 2 3.0 c + >>> a.take([True, False, True]) + a b + 0 1.0 a + 2 3.0 c + """ + axis = self._get_axis_from_axis_arg(axis) + if axis != 0: + raise NotImplementedError("Only axis=0 is supported.") + + indices = cudf.core.column.as_column(indices) + if is_bool_dtype(indices): + warnings.warn( + "Calling take with a boolean array is deprecated and will be " + "removed in the future.", + FutureWarning, + ) + return self._apply_boolean_mask(indices) + return self._gather(indices) + def _reset_index(self, level, drop, col_level=0, col_fill=""): """Shared path for DataFrame.reset_index and Series.reset_index.""" if level is not None and not isinstance(level, (tuple, list)): diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index dd8f462fb1d..704274815f6 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -1,7 +1,7 @@ # Copyright (c) 2020-2021, NVIDIA CORPORATION. from __future__ import annotations -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING, Callable, cast import cudf from cudf import _lib as libcudf @@ -41,6 +41,8 @@ def __init__( right_on, left_index, right_index, + lhs_is_index, + rhs_is_index, how, sort, indicator, @@ -70,6 +72,10 @@ def __init__( right_index : bool Boolean flag indicating the right index column or coumns are to be used as join keys in order. + lhs_is_index : bool + ``lhs`` is a ``BaseIndex`` + rhs_is_index : bool + ``rhs`` is a ``BaseIndex`` how : string The type of join. Possible values are 'inner', 'outer', 'left', 'leftsemi' and 'leftanti' @@ -94,6 +100,8 @@ def __init__( self.lhs = lhs.copy(deep=False) self.rhs = rhs.copy(deep=False) + self.lhs_is_index = lhs_is_index + self.rhs_is_index = rhs_is_index self.how = how self.sort = sort self.lsuffix, self.rsuffix = suffixes @@ -201,24 +209,28 @@ def perform_merge(self) -> Frame: ) gather_index = self._using_left_index or self._using_right_index + lkwargs = { + "gather_map": left_rows, + "nullify": True, + "check_bounds": False, + } + rkwargs = { + "gather_map": right_rows, + "nullify": True, + "check_bounds": False, + } + if not self.lhs_is_index: + lkwargs["keep_index"] = gather_index + if not self.rhs_is_index: + rkwargs["keep_index"] = gather_index left_result = ( - self.lhs._gather( - left_rows, - nullify=True, - keep_index=gather_index, - check_bounds=False, - ) + self.lhs._gather(**lkwargs) if left_rows is not None else cudf.core.frame.Frame() ) right_result = ( - self.rhs._gather( - right_rows, - nullify=True, - keep_index=gather_index, - check_bounds=False, - ) + self.rhs._gather(**rkwargs) if right_rows is not None else cudf.core.frame.Frame() ) @@ -321,11 +333,16 @@ def _sort_result(self, result: Frame) -> Frame: if by: to_sort = cudf.DataFrame._from_data(dict(enumerate(by))) sort_order = to_sort.argsort() - result = result._gather( - sort_order, - keep_index=self._using_left_index or self._using_right_index, - check_bounds=False, - ) + if isinstance(result, cudf.core._base_index.BaseIndex): + result = result._gather(sort_order, check_bounds=False) + else: + result = cast(cudf.core.indexed_frame.IndexedFrame, result) + result = result._gather( + sort_order, + keep_index=self._using_left_index + or self._using_right_index, + check_bounds=False, + ) return result @staticmethod diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index 3acc947c649..e8ff7838a9e 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -1744,6 +1744,13 @@ def _intersection(self, other, sort=None): return midx.sort_values() return midx + def _copy_type_metadata( + self, other: Frame, include_index: bool = True + ) -> Frame: + res = super()._copy_type_metadata(other, include_index=include_index) + res._names = other._names + return res + def _split_columns_by_levels(self, levels): # This function assumes that for levels with duplicate names, they are # specified by indices, not name by ``levels``. E.g. [None, None] can diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 2ecee781eb1..e96531d4b1c 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -1205,11 +1205,6 @@ def __setitem__(self, key, value): else: self.loc[key] = value - def take(self, indices, axis=0, keep_index=True): - # Validate but don't use the axis. - _ = self._get_axis_from_axis_arg(axis) - return super().take(indices, keep_index) - def __repr__(self): _, height = get_terminal_size() max_rows = ( diff --git a/python/cudf/cudf/tests/test_dropna.py b/python/cudf/cudf/tests/test_dropna.py index e1d0c38c760..1e24dd9d275 100644 --- a/python/cudf/cudf/tests/test_dropna.py +++ b/python/cudf/cudf/tests/test_dropna.py @@ -228,3 +228,65 @@ def test_dropna_dataframe_np_nan(data, axis): pdf = pd.DataFrame(pd_data) assert_eq(pdf.dropna(axis=axis), gdf.dropna(axis=axis), check_dtype=False) + + +@pytest.mark.parametrize( + "data, dtype", + [ + ([1, float("nan"), 2], "float64"), + (["x", None, "y"], "str"), + (["x", None, "y"], "category"), + (["2020-01-20", pd.NaT, "2020-03-15"], "datetime64[ns]"), + (["1s", pd.NaT, "3d"], "timedelta64[ns]"), + ], +) +def test_dropna_index(data, dtype): + pi = pd.Index(data, dtype=dtype) + gi = cudf.from_pandas(pi) + + expect = pi.dropna() + got = gi.dropna() + + assert_eq(expect, got) + + +@pytest.mark.parametrize("data", [[[1, None, 2], [None, None, 2]]]) +@pytest.mark.parametrize("how", ["all", "any"]) +def test_dropna_multiindex(data, how): + pi = pd.MultiIndex.from_arrays(data) + gi = cudf.from_pandas(pi) + + expect = pi.dropna(how) + got = gi.dropna(how) + + with pytest.raises(AssertionError, match="different"): + # pandas-gh44792. Pandas infers the dtypes as (int64, int64), though + # int64 doesn't really store null/nans. The dtype propagates to the + # result of dropna. cuDF infers the dtypes as (float, float), which + # differs from pandas. + assert_eq(expect, got) + + +@pytest.mark.parametrize( + "data", + [ + [ + [pd.Timestamp("2020-01-01"), pd.NaT, pd.Timestamp("2020-02-01")], + [pd.NaT, pd.NaT, pd.Timestamp("2020-03-01")], + ], + [ + [pd.Timestamp("2020-01-01"), pd.NaT, pd.Timestamp("2020-02-01")], + [np.nan, np.nan, 1.0], + ], + [[1.0, np.nan, 2.0], [np.nan, np.nan, 1.0]], + ], +) +@pytest.mark.parametrize("how", ["all", "any"]) +def test_dropna_multiindex_2(data, how): + pi = pd.MultiIndex.from_arrays(data) + gi = cudf.from_pandas(pi) + + expect = pi.dropna(how) + got = gi.dropna(how) + + assert_eq(expect, got) diff --git a/python/cudf/cudf/tests/test_indexing.py b/python/cudf/cudf/tests/test_indexing.py index 90a20e2bab4..e452dc5d7f7 100644 --- a/python/cudf/cudf/tests/test_indexing.py +++ b/python/cudf/cudf/tests/test_indexing.py @@ -747,9 +747,8 @@ def test_dataframe_take_with_multiIndex(ntake): assert_eq(actual, expected) -@pytest.mark.parametrize("keep_index", [True, False]) @pytest.mark.parametrize("ntake", [0, 1, 10, 123, 122, 200]) -def test_series_take(ntake, keep_index): +def test_series_take(ntake): np.random.seed(0) nelem = 123 @@ -758,12 +757,9 @@ def test_series_take(ntake, keep_index): take_indices = np.random.randint(0, len(gsr), ntake) - actual = gsr.take(take_indices, keep_index=keep_index) + actual = gsr.take(take_indices) expected = psr.take(take_indices) - if not keep_index: - expected = expected.reset_index(drop=True) - assert_eq(actual, expected) @@ -775,7 +771,7 @@ def test_series_take_positional(): take_indices = [1, 2, 0, 3] expect = psr.take(take_indices) - got = gsr.take(take_indices, keep_index=True) + got = gsr.take(take_indices) assert_eq(expect, got) diff --git a/python/cudf/cudf/utils/utils.py b/python/cudf/cudf/utils/utils.py index 2af7543e600..d23094ef3f9 100644 --- a/python/cudf/cudf/utils/utils.py +++ b/python/cudf/cudf/utils/utils.py @@ -12,7 +12,6 @@ import rmm import cudf -from cudf._lib.reduce import minmax from cudf.core import column from cudf.core.buffer import Buffer from cudf.utils.dtypes import to_cudf_compatible_scalar @@ -507,20 +506,3 @@ def _maybe_indices_to_slice(indices: cp.ndarray) -> Union[slice, cp.ndarray]: if (indices == cp.arange(start, stop, step)).all(): return slice(start, stop, step) return indices - - -def _gather_map_is_valid( - gather_map: "cudf.core.column.ColumnBase", - nrows: int, - check_bounds: bool, - nullify: bool, -) -> bool: - """Returns true if gather map is valid. - - A gather map is valid if empty or all indices are within the range - ``[-nrows, nrows)``, except when ``nullify`` is specifed. - """ - if not check_bounds or nullify or len(gather_map) == 0: - return True - gm_min, gm_max = minmax(gather_map) - return gm_min >= -nrows and gm_max < nrows