Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
revans2 committed May 4, 2022
1 parent 81f32ce commit 08b9fb7
Showing 1 changed file with 62 additions and 49 deletions.
111 changes: 62 additions & 49 deletions src/main/cpp/src/NativeParquetJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,25 @@ namespace jni {
* and may not produce the exact same result as the JVM does. This is probably good enough
* for now.
*/
std::string unicode_to_lower(std::string & input) {
int wide_size = std::mbstowcs(nullptr, input.data(), 0);
std::string unicode_to_lower(std::string const& input) {
// get the size of the wide character result
std::size_t wide_size = std::mbstowcs(nullptr, input.data(), 0);
if (wide_size < 0) {
throw std::invalid_argument("invalid character sequence");
}

std::vector<wchar_t> wide(wide_size + 1);
// Set a null so we can get a proper output size from wcstombs. This is becasue
// Set a null so we can get a proper output size from wcstombs. This is because
// we pass in a max length of 0, so it will only stop when it see the null character.
wide.back() = 0;
std::mbstowcs(wide.data(), input.data(), wide_size);
for (auto wit = wide.begin(); wit != wide.end(); wit++) {
if (std::mbstowcs(wide.data(), input.data(), wide_size) != wide_size) {
throw std::runtime_error("error during wide char converstion");
}
for (auto wit = wide.begin(); wit != wide.end(); ++wit) {
*wit = std::towlower(*wit);
}
int mb_size = std::wcstombs(nullptr, wide.data(), 0);
// Get the multi-byte result size
std::size_t mb_size = std::wcstombs(nullptr, wide.data(), 0);
if (mb_size < 0) {
throw std::invalid_argument("unsupported wide character sequence");
}
Expand All @@ -66,7 +70,9 @@ std::string unicode_to_lower(std::string & input) {
// because it will be overwritten. std::string itself will insert a NUL
// terminator on the buffer it allocates internally. We don't need to worry about it.
std::string ret(mb_size, '\0');
std::wcstombs(ret.data(), wide.data(), mb_size);
if (std::wcstombs(ret.data(), wide.data(), mb_size) != mb_size) {
throw std::runtime_error("error during multibyte char converstion");
}
return ret;
}

Expand All @@ -78,8 +84,8 @@ std::string unicode_to_lower(std::string & input) {
struct column_pruning_maps {
// gather map for pulling out items from the schema
std::vector<int> schema_map;
// Each SchemaElement also include the number of children in it. This allows the vector
// to be interpreted as a tree flattend depth first. These are the new values for num
// Each SchemaElement also includes the number of children in it. This allows the vector
// to be interpreted as a tree flattened depth first. These are the new values for num
// children after the schema is gathered.
std::vector<int> schema_num_children;
// There are several places where a struct is stored only for a leaf column (like a column chunk)
Expand All @@ -94,7 +100,7 @@ struct column_pruning_maps {
class column_pruner {
public:
/**
* Create pruining filter from a depth first flattened tree of names and num_children.
* Create pruning filter from a depth first flattened tree of names and num_children.
* The root entry is not included in names or in num_children, but parent_num_children
* should hold how many entries there are in it.
*/
Expand All @@ -114,8 +120,9 @@ class column_pruner {
* Given a schema from a parquet file create a set of pruning maps to prune columns from the rest of the footer
*/
column_pruning_maps filter_schema(std::vector<parquet::format::SchemaElement> & schema, bool ignore_case) {
// The following are all covered by follow on work in https://github.com/NVIDIA/spark-rapids-jni/issues/210
// TODO the java code will fail if there is ambiguity in the names and ignore_case is true
// so we need to figure that out too???
// so we need to figure that out too.
// TODO there are a number of different way to represent a list or a map. We want to support all of them
// so we need a way to detect that schema is a list and group the parts we don't care about together.
// TODO the java code verifies that the schema matches when it is looking at the columns or it throws
Expand All @@ -126,7 +133,7 @@ class column_pruner {
//
// Then when we are walking the tree we need to keep track of if we are looking for a Map, an array or
// a struct and match up the SchemaElement entries accordingly as we go.
// If we see somehting that is off we need to throw an exception.
// If we see something that is off we need to throw an exception.
//
// To be able to handle the duplicates, I think we need to have some state in the column_pruner class
// to say if we have matched a leaf node or not.
Expand Down Expand Up @@ -195,12 +202,15 @@ class column_pruner {
std::vector<int> num_children_stack;
std::vector<column_pruner*> tree_stack;
tree_stack.push_back(this);
if (schema.size() == 0) {
throw std::invalid_argument("a root schema element must exist");
}
num_children_stack.push_back(schema[0].num_children);

uint64_t chunk_index = 0;
// We are skipping over the first entry in the schema because it is always the root entry, and
// we already processed it
for (uint64_t schema_index = 1; schema_index < schema.size(); schema_index++) {
for (uint64_t schema_index = 1; schema_index < schema.size(); ++schema_index) {
auto schema_item = schema[schema_index];
// num_children is optional, but is supposed to be set for non-leaf nodes. That said leaf nodes
// will have 0 children so we can just default to that.
Expand All @@ -216,35 +226,34 @@ class column_pruner {
}
column_pruner * found = nullptr;
if (tree_stack.back() != nullptr) {
// tree_stack can have a nullptr in it if the scheam we are looking through
// tree_stack can have a nullptr in it if the schema we are looking through
// has an entry that does not match the tree
auto found_it = tree_stack.back()->children.find(name);
if (found_it != tree_stack.back()->children.end()) {
found = &(found_it->second);
int parent_mapped_schema_index = tree_stack.back()->s_id;
num_children_map[parent_mapped_schema_index]++;
++num_children_map[parent_mapped_schema_index];

int mapped_schema_index = found->s_id;
schema_map[mapped_schema_index] = schema_index;
num_children_map[mapped_schema_index] = 0;
}
}

if (schema_item.__isset.type) {
// this is a leaf node, it has a primitive type.
if (found != nullptr) {
int mapped_chunk_index = found->c_id;
int mapped_schema_index = found->s_id;
chunk_map[mapped_chunk_index] = chunk_index;
schema_map[mapped_schema_index] = schema_index;
num_children_map[mapped_schema_index] = 0;
}
// this is a leaf node
chunk_index++;
} else {
// a non-leaf node
if (found != nullptr) {
int mapped_schema_index = found->s_id;
schema_map[mapped_schema_index] = schema_index;
num_children_map[mapped_schema_index] = 0;
}
}

++chunk_index;
}
// else it is a non-leaf node it is group typed
// chunks are only for leaf nodes

// num_children and if the type is set or not should correspond to each other.
// By convention in parquet they should, but to be on the safe side I keep them
// separate.
if (num_children > 0) {
tree_stack.push_back(found);
num_children_stack.push_back(num_children);
Expand Down Expand Up @@ -272,28 +281,32 @@ class column_pruner {
// so there are no gaps
std::vector<int> final_schema_map;
final_schema_map.reserve(schema_map.size());
for (auto it = schema_map.begin(); it != schema_map.end(); it++) {
for (auto it = schema_map.begin(); it != schema_map.end(); ++it) {
final_schema_map.push_back(it->second);
}

std::vector<int> final_num_children_map;
final_num_children_map.reserve(num_children_map.size());
for (auto it = num_children_map.begin(); it != num_children_map.end(); it++) {
for (auto it = num_children_map.begin(); it != num_children_map.end(); ++it) {
final_num_children_map.push_back(it->second);
}

std::vector<int> final_chunk_map;
final_chunk_map.reserve(chunk_map.size());
for (auto it = chunk_map.begin(); it != chunk_map.end(); it++) {
for (auto it = chunk_map.begin(); it != chunk_map.end(); ++it) {
final_chunk_map.push_back(it->second);
}

return column_pruning_maps{final_schema_map, final_num_children_map, final_chunk_map};
return column_pruning_maps{std::move(final_schema_map),
std::move(final_num_children_map),
std::move(final_chunk_map)};
}

private:

void add_depth_first(const std::vector<std::string> & names, const std::vector<int> & num_children, int parent_num_children) {
void add_depth_first(std::vector<std::string> const& names,
std::vector<int> const& num_children,
int parent_num_children) {
CUDF_FUNC_RANGE();
if (parent_num_children == 0) {
// There is no point in doing more the tree is empty, and it lets us avoid some corner cases
Expand All @@ -307,14 +320,14 @@ class column_pruner {
std::vector<int> num_children_stack;
tree_stack.push_back(this);
num_children_stack.push_back(parent_num_children);
for(uint64_t i = 0; i < num; i++) {
for(uint64_t i = 0; i < num; ++i) {
auto name = names[i];
auto num_c = num_children[i];
local_s_id++;
++local_s_id;
int tmp_c_id = -1;
if (num_c == 0) {
// leaf node...
local_c_id++;
++local_c_id;
tmp_c_id = local_c_id;
}
tree_stack.back()->children.try_emplace(name, local_s_id, tmp_c_id);
Expand Down Expand Up @@ -346,7 +359,7 @@ class column_pruner {
}

std::map<std::string, column_pruner> children;
// The following IDs are the position that they should be in when output in a filteres footer, except
// The following IDs are the position that they should be in when output in a filtered footer, except
// that if there are any missing columns in the actual data the gaps need to be removed.
// schema ID
int s_id;
Expand All @@ -373,7 +386,7 @@ static bool invalid_file_offset(long start_index, long pre_start_index, long pre
return invalid;
}

static int64_t get_offset(parquet::format::ColumnChunk & column_chunk) {
static int64_t get_offset(parquet::format::ColumnChunk const& column_chunk) {
auto md = column_chunk.meta_data;
int64_t offset = md.data_page_offset;
if (md.__isset.dictionary_page_offset && offset > md.dictionary_page_offset) {
Expand All @@ -382,7 +395,7 @@ static int64_t get_offset(parquet::format::ColumnChunk & column_chunk) {
return offset;
}

static std::vector<parquet::format::RowGroup> filter_groups(parquet::format::FileMetaData & meta,
static std::vector<parquet::format::RowGroup> filter_groups(parquet::format::FileMetaData const& meta,
int64_t part_offset, int64_t part_length) {
CUDF_FUNC_RANGE();
// This is based off of the java parquet_mr code to find the groups in a range...
Expand All @@ -395,8 +408,8 @@ static std::vector<parquet::format::RowGroup> filter_groups(parquet::format::Fil
}

std::vector<parquet::format::RowGroup> filtered_groups;
for (uint64_t rg_i = 0; rg_i < num_row_groups; rg_i++) {
parquet::format::RowGroup & row_group = meta.row_groups[rg_i];
for (uint64_t rg_i = 0; rg_i < num_row_groups; ++rg_i) {
parquet::format::RowGroup const& row_group = meta.row_groups[rg_i];
int64_t total_size = 0;
int64_t start_index;
auto column_chunk = row_group.columns[0];
Expand All @@ -422,8 +435,8 @@ static std::vector<parquet::format::RowGroup> filter_groups(parquet::format::Fil
total_size = row_group.total_compressed_size;
} else {
auto num_columns = row_group.columns.size();
for (uint64_t cc_i = 0; cc_i < num_columns; cc_i++) {
parquet::format::ColumnChunk & col = row_group.columns[cc_i];
for (uint64_t cc_i = 0; cc_i < num_columns; ++cc_i) {
parquet::format::ColumnChunk const& col = row_group.columns[cc_i];
total_size += col.meta_data.total_compressed_size;
}
}
Expand Down Expand Up @@ -469,9 +482,9 @@ void deserialize_parquet_footer(uint8_t * buffer, uint32_t len, parquet::format:

void filter_columns(std::vector<parquet::format::RowGroup> & groups, std::vector<int> & chunk_filter) {
CUDF_FUNC_RANGE();
for (auto group_it = groups.begin(); group_it != groups.end(); group_it++) {
for (auto group_it = groups.begin(); group_it != groups.end(); ++group_it) {
std::vector<parquet::format::ColumnChunk> new_chunks;
for (auto it = chunk_filter.begin(); it != chunk_filter.end(); it++) {
for (auto it = chunk_filter.begin(); it != chunk_filter.end(); ++it) {
new_chunks.push_back(group_it->columns[*it]);
}
group_it->columns = std::move(new_chunks);
Expand Down Expand Up @@ -511,7 +524,7 @@ JNIEXPORT long JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_readAndFil
// start by filtering the schema and the chunks
std::size_t new_schema_size = filter.schema_map.size();
std::vector<parquet::format::SchemaElement> new_schema(new_schema_size);
for (std::size_t i = 0; i < new_schema_size; i++) {
for (std::size_t i = 0; i < new_schema_size; ++i) {
int orig_index = filter.schema_map[i];
int new_num_children = filter.schema_num_children[i];
new_schema[i] = meta->schema[orig_index];
Expand All @@ -520,7 +533,7 @@ JNIEXPORT long JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_readAndFil
meta->schema = std::move(new_schema);
if (meta->__isset.column_orders) {
std::vector<parquet::format::ColumnOrder> new_order;
for (auto it = filter.chunk_map.begin(); it != filter.chunk_map.end(); it++) {
for (auto it = filter.chunk_map.begin(); it != filter.chunk_map.end(); ++it) {
new_order.push_back(meta->column_orders[*it]);
}
meta->column_orders = std::move(new_order);
Expand Down Expand Up @@ -550,7 +563,7 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_ParquetFooter_getNumRow
try {
parquet::format::FileMetaData * ptr = reinterpret_cast<parquet::format::FileMetaData *>(handle);
long ret = 0;
for(auto it = ptr->row_groups.begin(); it != ptr->row_groups.end(); it++) {
for(auto it = ptr->row_groups.begin(); it != ptr->row_groups.end(); ++it) {
ret = ret + it->num_rows;
}
return ret;
Expand Down

0 comments on commit 08b9fb7

Please sign in to comment.