Skip to content

Commit

Permalink
Simplify read_avro by removing unnecessary writer/impl classes (#9090)
Browse files Browse the repository at this point in the history
Depends on #9040

Authors:
  - Christopher Harris (https://github.com/cwharris)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #9090
  • Loading branch information
cwharris authored Aug 31, 2021
1 parent cc0c8e0 commit 4ad09aa
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 343 deletions.
56 changes: 15 additions & 41 deletions cpp/include/cudf/io/detail/avro.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,11 +14,6 @@
* limitations under the License.
*/

/**
* @file avro.hpp
* @brief cuDF-IO reader classes API
*/

#pragma once

#include <cudf/io/avro.hpp>
Expand All @@ -29,44 +24,23 @@ namespace cudf {
namespace io {
namespace detail {
namespace avro {

/**
* @brief Class to read Avro dataset data into columns.
* @brief Reads the entire dataset.
*
* @param source Input `datasource` object to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*
* @return The set of columns along with table metadata
*/
class reader {
private:
class impl;
std::unique_ptr<impl> _impl;

public:
/**
* @brief Constructor from an array of datasources
*
* @param sources Input `datasource` objects to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit reader(std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
avro_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Destructor explicitly-declared to avoid inlined in header
*/
~reader();
table_with_metadata read_avro(
std::unique_ptr<cudf::io::datasource>&& source,
avro_reader_options const& options,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Reads the entire dataset.
*
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches.
*
* @return The set of columns along with table metadata
*/
table_with_metadata read(avro_reader_options const& options,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
};
} // namespace avro
} // namespace detail
} // namespace io
Expand Down
50 changes: 20 additions & 30 deletions cpp/src/io/avro/avro_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ static inline int64_t __device__ avro_decode_zigzag_varint(const uint8_t*& cur,
*
* @return data pointer at the end of the row (start of next row)
*/
static const uint8_t* __device__ avro_decode_row(const schemadesc_s* schema,
schemadesc_s* schema_g,
uint32_t schema_len,
size_t row,
size_t max_rows,
const uint8_t* cur,
const uint8_t* end,
device_span<string_index_pair> global_dictionary)
static uint8_t const* __device__
avro_decode_row(schemadesc_s const* schema,
schemadesc_s* schema_g,
uint32_t schema_len,
size_t row,
size_t max_rows,
uint8_t const* cur,
uint8_t const* end,
device_span<string_index_pair const> global_dictionary)
{
uint32_t array_start = 0, array_repeat_count = 0;
int array_children = 0;
Expand Down Expand Up @@ -220,19 +221,17 @@ static const uint8_t* __device__ avro_decode_row(const schemadesc_s* schema,
* @param[in] schema Schema description
* @param[in] global_Dictionary Global dictionary entries
* @param[in] avro_data Raw block data
* @param[in] num_blocks Number of blocks
* @param[in] schema_len Number of entries in schema
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
*/
// blockDim {32,num_warps,1}
extern "C" __global__ void __launch_bounds__(num_warps * 32, 2)
gpuDecodeAvroColumnData(block_desc_s* blocks,
gpuDecodeAvroColumnData(device_span<block_desc_s const> blocks,
schemadesc_s* schema_g,
device_span<string_index_pair> global_dictionary,
const uint8_t* avro_data,
uint32_t num_blocks,
device_span<string_index_pair const> global_dictionary,
uint8_t const* avro_data,
uint32_t schema_len,
uint32_t min_row_size,
size_t max_rows,
Expand All @@ -258,9 +257,9 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2)
} else {
schema = schema_g;
}
if (block_id < num_blocks and threadIdx.x == 0) { *blk = blocks[block_id]; }
if (block_id < blocks.size() and threadIdx.x == 0) { *blk = blocks[block_id]; }
__syncthreads();
if (block_id >= num_blocks) { return; }
if (block_id >= blocks.size()) { return; }
cur_row = blk->first_row;
rows_remaining = blk->num_rows;
cur = avro_data + blk->offset;
Expand Down Expand Up @@ -304,18 +303,16 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2)
* @param[in] schema Schema description
* @param[in] global_dictionary Global dictionary entries
* @param[in] avro_data Raw block data
* @param[in] num_blocks Number of blocks
* @param[in] schema_len Number of entries in schema
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] stream CUDA stream to use, default 0
*/
void DecodeAvroColumnData(block_desc_s* blocks,
void DecodeAvroColumnData(device_span<block_desc_s const> blocks,
schemadesc_s* schema,
device_span<string_index_pair> global_dictionary,
const uint8_t* avro_data,
uint32_t num_blocks,
device_span<string_index_pair const> global_dictionary,
uint8_t const* avro_data,
uint32_t schema_len,
size_t max_rows,
size_t first_row,
Expand All @@ -325,17 +322,10 @@ void DecodeAvroColumnData(block_desc_s* blocks,
// num_warps warps per threadblock
dim3 const dim_block(32, num_warps);
// 1 warp per datablock, num_warps datablocks per threadblock
dim3 const dim_grid((num_blocks + num_warps - 1) / num_warps, 1);
dim3 const dim_grid((blocks.size() + num_warps - 1) / num_warps, 1);

gpuDecodeAvroColumnData<<<dim_grid, dim_block, 0, stream.value()>>>(blocks,
schema,
global_dictionary,
avro_data,
num_blocks,
schema_len,
min_row_size,
max_rows,
first_row);
gpuDecodeAvroColumnData<<<dim_grid, dim_block, 0, stream.value()>>>(
blocks, schema, global_dictionary, avro_data, schema_len, min_row_size, max_rows, first_row);
}

} // namespace gpu
Expand Down
8 changes: 3 additions & 5 deletions cpp/src/io/avro/avro_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,16 @@ struct schemadesc_s {
* @param[in] schema Schema description
* @param[in] global_dictionary Global dictionary entries
* @param[in] avro_data Raw block data
* @param[in] num_blocks Number of blocks
* @param[in] schema_len Number of entries in schema
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] stream CUDA stream to use, default 0
*/
void DecodeAvroColumnData(block_desc_s* blocks,
void DecodeAvroColumnData(cudf::device_span<block_desc_s const> blocks,
schemadesc_s* schema,
cudf::device_span<string_index_pair> global_dictionary,
const uint8_t* avro_data,
uint32_t num_blocks,
cudf::device_span<string_index_pair const> global_dictionary,
uint8_t const* avro_data,
uint32_t schema_len,
size_t max_rows = ~0,
size_t first_row = 0,
Expand Down
Loading

0 comments on commit 4ad09aa

Please sign in to comment.