Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify read_avro by removing unnecessary writer/impl classes #9090

Merged
merged 23 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9c72e56
simplify io/functions.cpp data source/sink factories
cwharris Aug 14, 2021
884bde6
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into io-funct…
cwharris Aug 16, 2021
88e2399
remove filepath-related logic from csv and json readers
cwharris Aug 17, 2021
62b9520
remove filepath logic from avro, parquet, orc readers
cwharris Aug 17, 2021
fb01294
move range size padding calculation out of json/csv reader and in to …
cwharris Aug 18, 2021
d422aeb
remove filepaths from json reader
cwharris Aug 18, 2021
9107f24
remove unncessary avro reader class
cwharris Aug 21, 2021
f15622e
replace avro::reader_impl mr member with local variable
cwharris Aug 21, 2021
4cf9f32
replace avro::reader_impl source member with local variable
cwharris Aug 21, 2021
ea2c219
remove unneccessary avro::reader_impl _columns member
cwharris Aug 21, 2021
2c16c19
replace avro::reader_impl metadata member with local variable
cwharris Aug 21, 2021
4188c2b
delete unncessarry avro::reader_impl class
cwharris Aug 21, 2021
cdbdb5e
read_avro: replace explicit cudamemcpyasync with uvector factor calls
cwharris Aug 21, 2021
05f514c
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into io-simpl…
cwharris Aug 24, 2021
85b84bd
read_avro: treat source and metadata as refernces, not raw pointers
cwharris Aug 25, 2021
1d0b01d
enforce constness on avro_decode_row dictionary argument
cwharris Aug 25, 2021
cf9eecf
adjust copyright in avro.hpp
cwharris Aug 25, 2021
5ceee8e
read_avro: return out_buffers rather than mutating in function
cwharris Aug 25, 2021
4a2cfcc
use device_span for block list in DecodeAvroColumnData
cwharris Aug 25, 2021
619a87f
read_avro: style changes
cwharris Aug 25, 2021
247f264
read_avro: update const positions
cwharris Aug 26, 2021
543b25e
adjust const positions
cwharris Aug 26, 2021
98ab9c9
adjust const positions
cwharris Aug 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 15 additions & 41 deletions cpp/include/cudf/io/detail/avro.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,11 +14,6 @@
* limitations under the License.
*/

/**
* @file avro.hpp
* @brief cuDF-IO reader classes API
*/

#pragma once

#include <cudf/io/avro.hpp>
Expand All @@ -29,44 +24,23 @@ namespace cudf {
namespace io {
cwharris marked this conversation as resolved.
Show resolved Hide resolved
namespace detail {
namespace avro {

/**
* @brief Class to read Avro dataset data into columns.
* @brief Reads the entire dataset.
*
* @param source Input `datasource` object to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*
* @return The set of columns along with table metadata
*/
class reader {
private:
class impl;
std::unique_ptr<impl> _impl;

public:
/**
* @brief Constructor from an array of datasources
*
* @param sources Input `datasource` objects to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
avro_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Destructor explicitly-declared to avoid inlined in header
*/
~reader();
table_with_metadata read_avro(
std::unique_ptr<cudf::io::datasource>&& source,
avro_reader_options const& options,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Reads the entire dataset.
*
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches.
*
* @return The set of columns along with table metadata
*/
table_with_metadata read(avro_reader_options const& options,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
};
} // namespace avro
} // namespace detail
} // namespace io
Expand Down
46 changes: 18 additions & 28 deletions cpp/src/io/avro/avro_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ static inline int64_t __device__ avro_decode_zigzag_varint(const uint8_t*& cur,
*
* @return data pointer at the end of the row (start of next row)
*/
static const uint8_t* __device__ avro_decode_row(const schemadesc_s* schema,
schemadesc_s* schema_g,
uint32_t schema_len,
size_t row,
size_t max_rows,
const uint8_t* cur,
const uint8_t* end,
device_span<string_index_pair> global_dictionary)
static const uint8_t* __device__
cwharris marked this conversation as resolved.
Show resolved Hide resolved
avro_decode_row(const schemadesc_s* schema,
schemadesc_s* schema_g,
uint32_t schema_len,
size_t row,
size_t max_rows,
const uint8_t* cur,
const uint8_t* end,
cwharris marked this conversation as resolved.
Show resolved Hide resolved
device_span<string_index_pair const> global_dictionary)
{
uint32_t array_start = 0, array_repeat_count = 0;
int array_children = 0;
Expand Down Expand Up @@ -220,19 +221,17 @@ static const uint8_t* __device__ avro_decode_row(const schemadesc_s* schema,
* @param[in] schema Schema description
* @param[in] global_Dictionary Global dictionary entries
* @param[in] avro_data Raw block data
* @param[in] num_blocks Number of blocks
* @param[in] schema_len Number of entries in schema
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
*/
// blockDim {32,num_warps,1}
extern "C" __global__ void __launch_bounds__(num_warps * 32, 2)
gpuDecodeAvroColumnData(block_desc_s* blocks,
gpuDecodeAvroColumnData(device_span<block_desc_s const> blocks,
schemadesc_s* schema_g,
device_span<string_index_pair> global_dictionary,
device_span<string_index_pair const> global_dictionary,
const uint8_t* avro_data,
cwharris marked this conversation as resolved.
Show resolved Hide resolved
cwharris marked this conversation as resolved.
Show resolved Hide resolved
uint32_t num_blocks,
uint32_t schema_len,
uint32_t min_row_size,
size_t max_rows,
Expand All @@ -258,9 +257,9 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2)
} else {
schema = schema_g;
}
if (block_id < num_blocks and threadIdx.x == 0) { *blk = blocks[block_id]; }
if (block_id < blocks.size() and threadIdx.x == 0) { *blk = blocks[block_id]; }
__syncthreads();
if (block_id >= num_blocks) { return; }
if (block_id >= blocks.size()) { return; }
cur_row = blk->first_row;
rows_remaining = blk->num_rows;
cur = avro_data + blk->offset;
Expand Down Expand Up @@ -304,18 +303,16 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2)
* @param[in] schema Schema description
* @param[in] global_dictionary Global dictionary entries
* @param[in] avro_data Raw block data
* @param[in] num_blocks Number of blocks
* @param[in] schema_len Number of entries in schema
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] stream CUDA stream to use, default 0
*/
void DecodeAvroColumnData(block_desc_s* blocks,
void DecodeAvroColumnData(device_span<block_desc_s const> blocks,
schemadesc_s* schema,
device_span<string_index_pair> global_dictionary,
device_span<string_index_pair const> global_dictionary,
const uint8_t* avro_data,
cwharris marked this conversation as resolved.
Show resolved Hide resolved
cwharris marked this conversation as resolved.
Show resolved Hide resolved
uint32_t num_blocks,
uint32_t schema_len,
size_t max_rows,
size_t first_row,
Expand All @@ -325,17 +322,10 @@ void DecodeAvroColumnData(block_desc_s* blocks,
// num_warps warps per threadblock
dim3 const dim_block(32, num_warps);
// 1 warp per datablock, num_warps datablocks per threadblock
dim3 const dim_grid((num_blocks + num_warps - 1) / num_warps, 1);
dim3 const dim_grid((blocks.size() + num_warps - 1) / num_warps, 1);

gpuDecodeAvroColumnData<<<dim_grid, dim_block, 0, stream.value()>>>(blocks,
schema,
global_dictionary,
avro_data,
num_blocks,
schema_len,
min_row_size,
max_rows,
first_row);
gpuDecodeAvroColumnData<<<dim_grid, dim_block, 0, stream.value()>>>(
blocks, schema, global_dictionary, avro_data, schema_len, min_row_size, max_rows, first_row);
}

} // namespace gpu
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/io/avro/avro_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,16 @@ struct schemadesc_s {
* @param[in] schema Schema description
* @param[in] global_dictionary Global dictionary entries
* @param[in] avro_data Raw block data
* @param[in] num_blocks Number of blocks
* @param[in] schema_len Number of entries in schema
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] stream CUDA stream to use, default 0
*/
void DecodeAvroColumnData(block_desc_s* blocks,
void DecodeAvroColumnData(cudf::device_span<block_desc_s const> blocks,
schemadesc_s* schema,
cudf::device_span<string_index_pair> global_dictionary,
cudf::device_span<string_index_pair const> global_dictionary,
const uint8_t* avro_data,
cwharris marked this conversation as resolved.
Show resolved Hide resolved
cwharris marked this conversation as resolved.
Show resolved Hide resolved
uint32_t num_blocks,
uint32_t schema_len,
size_t max_rows = ~0,
size_t first_row = 0,
Expand Down
Loading