Skip to content

Commit

Permalink
Merge branch 'rapidsai:branch-22.12' into pin_dask
Browse files Browse the repository at this point in the history
  • Loading branch information
galipremsagar authored Nov 16, 2022
2 parents 7adf229 + defad5e commit 8d84f2d
Show file tree
Hide file tree
Showing 22 changed files with 635 additions and 62 deletions.
1 change: 0 additions & 1 deletion CODE_OF_CONDUCT.md

This file was deleted.

1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ add_library(
src/io/json/json_tree.cu
src/io/json/nested_json_gpu.cu
src/io/json/reader_impl.cu
src/io/json/experimental/byte_range_info.cu
src/io/json/experimental/read_json.cpp
src/io/orc/aggregate_orc_metadata.cpp
src/io/orc/dict_enc.cu
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/io/json/experimental/byte_range_info.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <thrust/find.h>

namespace cudf::io::detail::json::experimental {

// Extract the first character position in the string.
size_type find_first_delimiter(device_span<char const> d_data,
char const delimiter,
rmm::cuda_stream_view stream)
{
auto const first_delimiter_position =
thrust::find(rmm::exec_policy(stream), d_data.begin(), d_data.end(), delimiter);
return first_delimiter_position != d_data.end() ? first_delimiter_position - d_data.begin() : -1;
}

} // namespace cudf::io::detail::json::experimental
98 changes: 92 additions & 6 deletions cpp/src/io/json/experimental/read_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,105 @@ std::vector<uint8_t> ingest_raw_input(host_span<std::unique_ptr<datasource>> con
}
}

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream)
{
auto const buffer = ingest_raw_input(sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size());
auto d_data = rmm::device_uvector<char>(buffer.size(), stream);
CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(),
buffer.data(),
buffer.size() * sizeof(decltype(buffer)::value_type),
cudaMemcpyHostToDevice,
stream.value()));
return find_first_delimiter(d_data, delimiter, stream);
}

size_type find_first_delimiter_in_chunk(host_span<unsigned char const> buffer,
char const delimiter,
rmm::cuda_stream_view stream)
{
auto d_data = rmm::device_uvector<char>(buffer.size(), stream);
CUDF_CUDA_TRY(cudaMemcpyAsync(d_data.data(),
buffer.data(),
buffer.size() * sizeof(decltype(buffer)::value_type),
cudaMemcpyHostToDevice,
stream.value()));
return find_first_delimiter(d_data, delimiter, stream);
}

bool should_load_whole_source(json_reader_options const& reader_opts)
{
return reader_opts.get_byte_range_offset() == 0 and //
reader_opts.get_byte_range_size() == 0;
}

/**
* @brief Get the byte range between record starts and ends starting from the given range.
*
* if get_byte_range_offset == 0, then we can skip the first delimiter search
* if get_byte_range_offset != 0, then we need to search for the first delimiter in given range.
* if not found, skip this chunk, if found, then search for first delimiter in next range until we
* find a delimiter. Use this as actual range for parsing.
*
* @param sources Data sources to read from
* @param reader_opts JSON reader options with range offset and range size
* @param stream CUDA stream used for device memory operations and kernel launches
* @return Byte range for parsing
*/
auto get_record_range_raw_input(host_span<std::unique_ptr<datasource>> sources,
json_reader_options const& reader_opts,
rmm::cuda_stream_view stream)
{
auto buffer = ingest_raw_input(sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size());
if (should_load_whole_source(reader_opts)) return buffer;
auto first_delim_pos = reader_opts.get_byte_range_offset() == 0
? 0
: find_first_delimiter_in_chunk(buffer, '\n', stream);
if (first_delim_pos == -1) {
return std::vector<uint8_t>{};
} else {
first_delim_pos = first_delim_pos + reader_opts.get_byte_range_offset();
// Find next delimiter
decltype(first_delim_pos) next_delim_pos = -1;
auto const total_source_size = sources_size(sources, 0, 0);
auto current_offset = reader_opts.get_byte_range_offset() + reader_opts.get_byte_range_size();
while (current_offset < total_source_size and next_delim_pos == -1) {
buffer = ingest_raw_input(
sources, reader_opts.get_compression(), current_offset, reader_opts.get_byte_range_size());
next_delim_pos = find_first_delimiter_in_chunk(buffer, '\n', stream);
if (next_delim_pos == -1) { current_offset += reader_opts.get_byte_range_size(); }
}
if (next_delim_pos == -1) {
next_delim_pos = total_source_size;
} else {
next_delim_pos = next_delim_pos + current_offset;
}
return ingest_raw_input(
sources, reader_opts.get_compression(), first_delim_pos, next_delim_pos - first_delim_pos);
}
}

table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
json_reader_options const& reader_opts,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
CUDF_EXPECTS(reader_opts.get_byte_range_offset() == 0 and reader_opts.get_byte_range_size() == 0,
"specifying a byte range is not yet supported");
if (not should_load_whole_source(reader_opts)) {
CUDF_EXPECTS(reader_opts.is_enabled_lines(),
"specifying a byte range is supported only for json lines");
}

auto const buffer = get_record_range_raw_input(sources, reader_opts, stream);

auto const buffer = ingest_raw_input(sources,
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size());
auto data = host_span<char const>(reinterpret_cast<char const*>(buffer.data()), buffer.size());

try {
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/io/json/experimental/read_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,13 @@ table_with_metadata read_json(host_span<std::unique_ptr<datasource>> sources,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

}
size_type find_first_delimiter(device_span<char const> d_data,
char const delimiter,
rmm::cuda_stream_view stream);

size_type find_first_delimiter_in_chunk(host_span<std::unique_ptr<cudf::io::datasource>> sources,
json_reader_options const& reader_opts,
char const delimiter,
rmm::cuda_stream_view stream);

} // namespace cudf::io::detail::json::experimental
2 changes: 1 addition & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ ConfigureTest(CSV_TEST io/csv_test.cpp)
ConfigureTest(FILE_IO_TEST io/file_io_test.cpp)
ConfigureTest(ORC_TEST io/orc_test.cpp)
ConfigureTest(PARQUET_TEST io/parquet_test.cpp)
ConfigureTest(JSON_TEST io/json_test.cpp)
ConfigureTest(JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp)
ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu)
ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp)
ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp)
Expand Down
128 changes: 128 additions & 0 deletions cpp/tests/io/json_chunked_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/cudf_gtest.hpp>
#include <cudf_test/table_utilities.hpp>

#include <io/json/experimental/read_json.hpp>

/**
* @brief Base test fixture for JSON reader tests
*/
struct JsonReaderTest : public cudf::test::BaseFixture {
};

// function to extract first delimiter in the string in each chunk,
// collate together and form byte_range for each chunk,
// parse separately.
std::vector<cudf::io::table_with_metadata> skeleton_for_parellel_chunk_reader(
cudf::host_span<std::unique_ptr<cudf::io::datasource>> sources,
cudf::io::json_reader_options const& reader_opts,
int32_t chunk_size,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using namespace cudf::io::detail::json::experimental;
using cudf::size_type;
// assuming single source.
size_t total_source_size = 0;
for (auto const& source : sources) {
total_source_size += source->size();
}
size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size;
constexpr size_type no_min_value = -1;

// Get the first delimiter in each chunk.
std::vector<size_type> first_delimiter_index(num_chunks);
auto reader_opts_chunk = reader_opts;
for (size_t i = 0; i < num_chunks; i++) {
auto const chunk_start = i * chunk_size;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_size);
first_delimiter_index[i] =
find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream);
if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; }
}

// Process and allocate record start, end for each worker.
using record_range = std::pair<size_type, size_type>;
std::vector<record_range> record_ranges;
record_ranges.reserve(num_chunks);
first_delimiter_index[0] = 0;
auto prev = first_delimiter_index[0];
for (size_t i = 1; i < num_chunks; i++) {
if (first_delimiter_index[i] == no_min_value) continue;
record_ranges.push_back({prev, first_delimiter_index[i]});
prev = first_delimiter_index[i];
}
record_ranges.push_back({prev, total_source_size});

std::vector<cudf::io::table_with_metadata> tables;
// Process each chunk in parallel.
for (auto const [chunk_start, chunk_end] : record_ranges) {
if (chunk_start == -1 or chunk_end == -1) continue;
reader_opts_chunk.set_byte_range_offset(chunk_start);
reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start);
tables.push_back(read_json(sources, reader_opts_chunk, stream, mr));
}
// assume all records have same number of columns, and inferred same type. (or schema is passed)
// TODO a step before to merge all columns, types and infer final schema.
return tables;
}

TEST_F(JsonReaderTest, ByteRange)
{
std::string const json_string = R"(
{ "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 }
{ "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 }
{ "a": { "y" : 6}, "b" : [6 ], "c": 13 }
{ "a": { "y" : 6}, "b" : [7 ], "c": 14 })";

// Initialize parsing options (reading json lines)
cudf::io::json_reader_options json_lines_options =
cudf::io::json_reader_options::builder(
cudf::io::source_info{json_string.c_str(), json_string.size()})
.compression(cudf::io::compression_type::NONE)
.lines(true)
.experimental(true);

// Read full test data via existing, nested JSON lines reader
cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options);

auto datasources = cudf::io::datasource::create(json_lines_options.get_source().buffers());

// Test for different chunk sizes
for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) {
auto const tables = skeleton_for_parellel_chunk_reader(datasources,
json_lines_options,
chunk_size,
cudf::get_default_stream(),
rmm::mr::get_current_device_resource());

auto table_views = std::vector<cudf::table_view>(tables.size());
std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) {
return table.tbl->view();
});
auto result = cudf::concatenate(table_views);

// Verify that the data read via chunked reader matches the data read via nested JSON reader
// cannot use EQUAL due to concatenate removing null mask
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view());
}
}
3 changes: 2 additions & 1 deletion python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ def _wrap_binop_normalization(self, other):
if other is NA or other is None:
return cudf.Scalar(other, dtype=self.dtype)
if isinstance(other, np.ndarray) and other.ndim == 0:
other = other.item()
# Try and maintain the dtype
other = other.dtype.type(other.item())
return self.normalize_binop_value(other)

def _scatter_by_slice(
Expand Down
Loading

0 comments on commit 8d84f2d

Please sign in to comment.