Skip to content

Commit

Permalink
Read multiple row groups in Parquet files correctly (#3950)
Browse files Browse the repository at this point in the history
* Read multiple row groups correctly

Iterate through the column's row groups while maintaining a count of the
total items read, and terminate the loop when the specified number of items
have been read.

Signed-off-by: John H. Hartman <[email protected]>

* Skip values and count values read properly

The variable skipIdx contains the number of values to be skipped in the column
prior to reading values. Skipping is done one row group at a time, so this
value must be updated as each row group is skipped.

Also, readColumnDbFl and readColumnIrregularBitWidth now return the number of
values read, so that ReadColumn increments the index into the output array
properly.

---------

Signed-off-by: John H. Hartman <[email protected]>
Co-authored-by: John H. Hartman <[email protected]>
Co-authored-by: ajpotts <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2025
1 parent e825f70 commit 091b8dd
Showing 1 changed file with 25 additions and 16 deletions.
41 changes: 25 additions & 16 deletions src/parquet/ReadParquet.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
#include "ReadParquet.h"
#include "UtilParquet.h"

// Returns the number of elements read
template <typename ReaderType, typename ChplType>
int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
int64_t readColumn(void* chpl_arr, int64_t *startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize,
int64_t values_read, bool* where_null_chpl) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (ChplType*)chpl_arr;
int64_t num_read = 0;
ReaderType* reader =
static_cast<ReaderType*>(column_reader.get());
startIdx -= reader->Skip(startIdx);
*startIdx -= reader->Skip(*startIdx);

if (not hasNonFloatNulls) {
while (reader->HasNext() && i < numElems) {
if((numElems - i) < batchSize) // adjust batchSize if needed
batchSize = numElems - i;
(void)reader->ReadBatch(batchSize, nullptr, nullptr, &chpl_ptr[i], &values_read);
i+=values_read;
num_read += values_read;
}
}
else {
Expand All @@ -27,21 +30,23 @@ int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::Co
where_null_chpl[i] = true;
}
i++;
num_read++;
}
}
return i;
return num_read;
}

template <typename ReaderType, typename ChplType, typename PqType>
int64_t readColumnDbFl(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
int64_t readColumnDbFl(void* chpl_arr, int64_t *startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize,
int64_t values_read, bool* where_null_chpl) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (ChplType*)chpl_arr;
ReaderType* reader =
static_cast<ReaderType*>(column_reader.get());
startIdx -= reader->Skip(startIdx);
*startIdx -= reader->Skip(*startIdx);

int64_t num_read = 0;
while (reader->HasNext() && i < numElems) {
PqType value;
(void)reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
Expand All @@ -53,20 +58,22 @@ int64_t readColumnDbFl(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet
chpl_ptr[i] = NAN;
}
i++;
num_read++;
}
return i;
return num_read;
}

template <typename ReaderType, typename ChplType, typename PqType>
int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t *startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize,
int64_t values_read, bool* where_null_chpl) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (ChplType*)chpl_arr;
ReaderType* reader =
static_cast<ReaderType*>(column_reader.get());
startIdx -= reader->Skip(startIdx);
*startIdx -= reader->Skip(*startIdx);

int64_t num_read = 0;
if (not hasNonFloatNulls) {
PqType* tmpArr = (PqType*)malloc(batchSize * sizeof(int32_t));
while (reader->HasNext() && i < numElems) {
Expand All @@ -78,6 +85,7 @@ int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t startIdx, std::share
for (int64_t j = 0; j < values_read; j++)
chpl_ptr[i+j] = (ChplType)tmpArr[j];
i+=values_read;
num_read+=values_read;
}
free(tmpArr);
}
Expand All @@ -93,9 +101,10 @@ int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t startIdx, std::share
chpl_ptr[i] = (int64_t)tmp;
}
i++;
num_read++;
}
}
return i;
return num_read;
}

int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg) {
Expand Down Expand Up @@ -182,7 +191,7 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
int num_row_groups = file_metadata->num_row_groups();

int64_t i = 0;
for (int r = 0; r < num_row_groups; r++) {
for (int r = 0; (r < num_row_groups) && (i < numElems); r++) {
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader->RowGroup(r);

Expand All @@ -191,7 +200,6 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
std::shared_ptr<parquet::ColumnReader> column_reader;

auto idx = file_metadata -> schema() -> ColumnIndex(colname);
auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed

if(idx < 0) {
std::string dname(colname);
Expand All @@ -200,19 +208,20 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
*errMsg = strdup(msg.c_str());
return ARROWERROR;
}
auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed

column_reader = row_group_reader->Column(idx);

// Since int64 and uint64 Arrow dtypes share a physical type and only differ
// in logical type, they must be read from the file in the same way
if(ty == ARROWINT64 || ty == ARROWUINT64) {
i += readColumn<parquet::Int64Reader, int64_t>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
i += readColumn<parquet::Int64Reader, int64_t>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWINT32 || ty == ARROWUINT32) {
i += readColumnIrregularBitWidth<parquet::Int32Reader, int64_t, int32_t>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
i += readColumnIrregularBitWidth<parquet::Int32Reader, int64_t, int32_t>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWBOOLEAN) {
i += readColumn<parquet::BoolReader, bool>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
i += readColumn<parquet::BoolReader, bool>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWSTRING) {
int16_t definition_level; // nullable type and only reading single records in batch
Expand All @@ -233,10 +242,10 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
i++; // skip one space so the strings are null terminated with a 0
}
} else if(ty == ARROWFLOAT) {
i += readColumnDbFl<parquet::FloatReader, double, float>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
i += readColumnDbFl<parquet::FloatReader, double, float>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWDOUBLE) {
i += readColumnDbFl<parquet::DoubleReader, double, double>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
i += readColumnDbFl<parquet::DoubleReader, double, double>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWDECIMAL) {
auto chpl_ptr = (double*)chpl_arr;
Expand Down

0 comments on commit 091b8dd

Please sign in to comment.