Skip to content

Commit

Permalink
Fix read_avro() skip_rows and num_rows. (#12912)
Browse files Browse the repository at this point in the history
This PR fixes the avro reader (`cudf.read_avro()`) such that it honors the values passed to the `skip_rows` and `num_rows` parameters.  In implementing this new logic, we also revamp the reader's ability to handle multi-block avro files, which we also test extensively with a new `test_avro_reader_multiblock()` test that features some 1300 permutations of various block size combinations.

Closes #6529.

Authors:
  - Trent Nelson (https://github.com/tpn)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)

URL: #12912
  • Loading branch information
tpn authored Apr 17, 2023
1 parent b8ab63d commit 62e02c6
Show file tree
Hide file tree
Showing 7 changed files with 531 additions and 93 deletions.
125 changes: 109 additions & 16 deletions cpp/src/io/avro/avro.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ std::string container::get_encoded()
bool container::parse(file_metadata* md, size_t max_num_rows, size_t first_row)
{
constexpr uint32_t avro_magic = (('O' << 0) | ('b' << 8) | ('j' << 16) | (0x01 << 24));
uint32_t sig4, max_block_size;
size_t total_object_count;

sig4 = get_raw<uint8_t>();
uint32_t sig4 = get_raw<uint8_t>();
sig4 |= get_raw<uint8_t>() << 8;
sig4 |= get_raw<uint8_t>() << 16;
sig4 |= get_raw<uint8_t>() << 24;
Expand All @@ -92,36 +90,131 @@ bool container::parse(file_metadata* md, size_t max_num_rows, size_t first_row)
}
}
}
// Save the first sync markers in the metadata; we compare them to other
// sync markers that should be present at the end of a block. If they
// differ, the data should be interpreted as corrupted.
md->sync_marker[0] = get_raw<uint64_t>();
md->sync_marker[1] = get_raw<uint64_t>();

// Initialize remaining metadata fields.
md->metadata_size = m_cur - m_base;
md->skip_rows = 0;
max_block_size = 0;
total_object_count = 0;
md->skip_rows = first_row;
md->total_num_rows = 0;

// Enumerate the blocks in this file. Each block starts with a count of
// objects (rows) in the block (uint64_t), and then the total size in bytes
// of the block (uint64_t). We walk each block and do the following:
// 1. Capture the total number of rows present across all blocks.
// 2. Add each block to the metadata's list of blocks.
// 3. Handle the case where we've been asked to skip or limit rows.
// 4. Verify sync markers at the end of each block.
//
// A row offset is also maintained, and added to each block. This reflects
// the absolute offset that needs to be added to any given row in order to
// get the row's index within the destination array. See `dst_row` in
// `avro_decode_row()` for more information.
//
// N.B. "object" and "row" are used interchangeably here; "object" is
// avro nomenclature, "row" is ours.
//
// N.B. If we're skipping rows, we ignore blocks (i.e. don't add them to
// md->block_list) that precede the block containing the first row
// we're interested in.
//

// Number of rows in the current block.
uint32_t num_rows = 0;

// Absolute row offset of the current block relative to all blocks selected by
// the skip rows/limit rows constraints, if any. Otherwise, absolute row
// offset relative to all blocks.
uint32_t row_offset = 0;

// Maximum block size in bytes encountered whilst processing all blocks
// selected by the skip rows/limit rows constraints, if any. Otherwise,
// maximum block size across all blocks.
uint32_t max_block_size = 0;

// Accumulates the total number of rows across all blocks selected by the skip
// rows/limit rows constraints, if any. Otherwise, total number of rows across
// all blocks.
size_t total_object_count = 0;

// N.B. The 18 below is (presumably) intended to account for the two 64-bit
// object count and block size integers (16 bytes total), and then an
// additional two bytes to represent the smallest possible row size.
while (m_cur + 18 < m_end && total_object_count < max_num_rows) {
auto const object_count = static_cast<uint32_t>(get_encoded<int64_t>());
auto const block_size = static_cast<uint32_t>(get_encoded<int64_t>());
if (block_size <= 0 || object_count <= 0 || m_cur + block_size + 16 > m_end) { break; }
if (object_count > first_row) {
auto block_row = static_cast<uint32_t>(total_object_count);
auto const next_end = m_cur + block_size + 16;
// Abort on terminal conditions. We keep these as separate lines instead of
// combining them into a single if in order to facilitate setting specific
// line breakpoints in the debugger.
if (block_size <= 0) { return false; }
if (object_count <= 0) { return false; }
if (next_end > m_end) { return false; }

// Update our total row count. This is only captured for information
// purposes.
md->total_num_rows += object_count;

if (object_count <= first_row) {
// We've been asked to skip rows, and we haven't yet reached our desired
// number of rows to skip. Subtract this block's rows (`object_count`)
// from the remaining rows to skip (`first_row`). Do not add this block
// to our block list.
first_row -= object_count;
} else {
// Either we weren't asked to skip rows, or we were, but we've already hit
// our target number of rows to skip. Add this block to our block list.
max_block_size = std::max(max_block_size, block_size);
total_object_count += object_count;
if (!md->block_list.size()) {
md->skip_rows = static_cast<uint32_t>(first_row);
// This is the first block, so add it to our list with the current value
// of `first_row`, which will reflect the number of rows to skip *in
// this block*.
m_start = m_cur;
total_object_count -= first_row;
num_rows = total_object_count;
CUDF_EXPECTS(row_offset == 0, "Invariant check failed: row_offset != 0");
if ((max_num_rows > 0) && (max_num_rows < total_object_count)) { num_rows = max_num_rows; }
md->block_list.emplace_back(m_cur - m_base, block_size, row_offset, first_row, num_rows);
first_row = 0;
row_offset += num_rows;
} else {
// Not our first block; `first_row` should always be zero here.
CUDF_EXPECTS(first_row == 0, "Invariant check failed: first_row != 0");

num_rows = object_count;
if ((max_num_rows > 0) && (max_num_rows < total_object_count)) {
num_rows -= (total_object_count - max_num_rows);
}

md->block_list.emplace_back(m_cur - m_base, block_size, row_offset, first_row, num_rows);
row_offset += num_rows;
}
md->block_list.emplace_back(m_cur - m_base, block_size, block_row, object_count);
} else {
first_row -= object_count;
}
m_cur += block_size;
m_cur += 16; // TODO: Validate sync marker
// Read the next sync markers and ensure they match the first ones we
// encountered. If they don't, we have to assume the data is corrupted,
// and thus, we terminate processing immediately.
const uint64_t sync_marker[] = {get_raw<uint64_t>(), get_raw<uint64_t>()};
bool valid_sync_markers =
((sync_marker[0] == md->sync_marker[0]) && (sync_marker[1] == md->sync_marker[1]));
if (!valid_sync_markers) { return false; }
}
md->max_block_size = max_block_size;
// N.B. `total_object_count` has skip_rows applied to it at this point, i.e.
// it represents the number of rows that will be returned *after* rows
// have been skipped (if requested).
if ((max_num_rows <= 0) || (max_num_rows > total_object_count)) {
md->num_rows = total_object_count;
} else {
md->num_rows = max_num_rows;
}
md->max_block_size = max_block_size;
md->num_rows = total_object_count;
md->total_data_size = m_cur - (m_base + md->metadata_size);
CUDF_EXPECTS(m_cur > m_start, "Invariant check failed: `m_cur > m_start` is false.");
md->selected_data_size = m_cur - m_start;
// Extract columns
for (size_t i = 0; i < md->schema.size(); i++) {
type_kind_e kind = md->schema[i].kind;
Expand Down
69 changes: 60 additions & 9 deletions cpp/src/io/avro/avro.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,56 @@ struct column_desc {

/**
* @brief AVRO file metadata struct
*
* `metadata_size` is the size in bytes of the avro file header.
*
* `total_data_size` is the size of all data minus `metadata_size`.
*
* `selected_data_size` is the size of all data minus `metadata_size`, with any
* adjustments made to account for the number of rows or rows to skip per the
* user's request. This is the value used to size device-side buffers.
*
* `num_rows` is the number of rows that will be processed. If the user has not
* requested the number of rows to be limited (i.e. via the `num_rows` param to
* `read_avro()`), this number will represent all rows in the file *after* the
* `skip_rows` parameter has been taken into consideration (assuming a request
* has been made to also skip rows).
*
* `total_num_rows` is the total number of rows present in the file, across all
* blocks. This may be more than `num_rows` if the user has requested a limit
* on the number of rows to return, or if `skip_rows` is active.
*
* `skip_rows` is the number of rows the user has requested to skip. Note that
* this value may differ from the `block_desc_s.first_row` member, which will
* capture the number of rows to skip for a given block.
*
* `block_list` is a list of all blocks that contain the selected rows. If no
* row filtering has been done via `num_rows` or `skip_rows`; it will contain
* all blocks. Otherwise, it will contain only blocks selected by those
* constraints.
*
* N.B. It is important to note that the coordination of skipping and limiting
* rows is dictated by the `first_row` and `num_rows` members of each block
* in the block list, *not* the `skip_rows` and `num_rows` members of this
* struct.
*
* This is because the first row and number of rows to process for each
* block needs to be handled at the individual block level in order to
* correctly support avro multi-block files.
*
* See also the `block_desc_s` struct.
*/
struct file_metadata {
std::map<std::string, std::string> user_data;
std::string codec = "";
uint64_t sync_marker[2] = {0, 0};
size_t metadata_size = 0;
size_t total_data_size = 0;
size_t num_rows = 0;
uint32_t skip_rows = 0;
uint32_t max_block_size = 0;
std::string codec = "";
uint64_t sync_marker[2] = {0, 0};
size_t metadata_size = 0;
size_t total_data_size = 0;
size_t selected_data_size = 0;
size_type num_rows = 0;
size_type skip_rows = 0;
size_type total_num_rows = 0;
uint32_t max_block_size = 0;
std::vector<schema_entry> schema;
std::vector<block_desc_s> block_list;
std::vector<column_desc> columns;
Expand Down Expand Up @@ -100,11 +140,12 @@ class schema_parser {
*/
class container {
public:
container(uint8_t const* base, size_t len) noexcept : m_base{base}, m_cur{base}, m_end{base + len}
container(uint8_t const* base, size_t len) noexcept
: m_base{base}, m_start{base}, m_cur{base}, m_end{base + len}
{
}

[[nodiscard]] auto bytecount() const { return m_cur - m_base; }
[[nodiscard]] auto bytecount() const { return m_cur - m_start; }

template <typename T>
T get_raw()
Expand All @@ -123,7 +164,17 @@ class container {
bool parse(file_metadata* md, size_t max_num_rows = 0x7fff'ffff, size_t first_row = 0);

protected:
// Base address of the file data. This will always point to the file's metadata.
const uint8_t* m_base;

// Start, current, and end pointers for the file. These pointers refer to the
// actual data content of the file, not the metadata. `m_cur` and `m_start`
// will only ever differ if a user has requested `read_avro()` to skip rows;
// in this case, `m_start` will be the base address of the block that contains
// the first row to be processed. `m_cur` is updated as the file is parsed,
// until either `m_end` is reached, or the number of rows requested by the user
// is reached.
const uint8_t* m_start;
const uint8_t* m_cur;
const uint8_t* m_end;
};
Expand Down
45 changes: 40 additions & 5 deletions cpp/src/io/avro/avro_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,52 @@ namespace io {
namespace avro {
struct block_desc_s {
block_desc_s() {}
explicit constexpr block_desc_s(size_t offset_,
uint32_t size_,
uint32_t first_row_,
uint32_t num_rows_)
: offset(offset_), size(size_), first_row(first_row_), num_rows(num_rows_)
explicit constexpr block_desc_s(
size_t offset_, uint32_t size_, uint32_t row_offset_, uint32_t first_row_, uint32_t num_rows_)
: offset(offset_),
size(size_),
row_offset(row_offset_),
first_row(first_row_),
num_rows(num_rows_)
{
}

// Offset of this block, in bytes, from the start of the file.
size_t offset;

// Size of this block, in bytes.
uint32_t size;

// The absolute row offset that needs to be added to each row index in order
// to derive the offset of the decoded data in the destination array. E.g.
// `const ptrdiff_t dst_row = ((row - first_row) + row_offset)`. See
// `avro_decode_row()` for details.
uint32_t row_offset;

// The index of the first row to be *saved* from this block. That is, the
// number of rows to skip in this block before starting to save values. If
// this is 0, then no rows will be skipped (all rows will be saved). If a
// user has requested `read_avro()` to skip rows, that will materialize as a
// non-zero `first_row` value in the appropriate block containing the first
// row to be saved.
//
// N.B. We explicitly use the word "saved" here, not "decoded". Technically,
// all rows are decoded, one column at a time, as the process of decoding
// a column value is what informs us of the value's size in bytes (in its
// encoded form), and thus, where the next column starts. However, we
// only *save* these decoded values based on the `first_row`.
uint32_t first_row;

// The number of rows to save from this block. If a user has requested
// `read_avro()` to limit the number of rows to return, this will materialize
// as a `num_rows` value less than the total number of rows in the appropriate
// block. Otherwise, `num_rows` will be equal to the total number of rows in
// the block, after skipping `first_row` rows (if applicable).
//
// N.B. Unlike `first_rows`, where all rows and columns are decoded prior to
// reaching the point we've been requested to start *saving* values --
// once the `num_rows` limit has been reached, no further decoding takes
// place.
uint32_t num_rows;
};

Expand Down
Loading

0 comments on commit 62e02c6

Please sign in to comment.