Skip to content

Commit

Permalink
Clean-up (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-krystianc authored Mar 22, 2024
1 parent bff8b92 commit 4c3a956
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
64 changes: 35 additions & 29 deletions python/palletjack/palletjack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,21 @@ struct DataHeader

/* File format: (Thrift-encoded metadata stored separately for each row group)
-----------------------------
| 0 - 3 | PJ_2 | (char[4]) - File header in ASCI
| 0 ... | DataHeader |
|---------------------------|
| 4 - 7 | Row groups | (uint32) - Number of row groups
| | 'PJ_2' | (char[4]) - File header in ASCI
| --------------------|
| | row groups | (uint32) - Number of row groups
| --------------------|
| | columns | (uint32) - Number of columns
| --------------------|
| | col. names length | (uint32) - Length of column names section
| --------------------|
| | metadata length | (uint32) - Length of metadata section
|---------------------------|
| 8 - 11| Columns | (uint32) - Number of columns
| . . . | column names | ['col_0', '\0', 'col_1', '\0', ....] - Section with column names
|---------------------------|
|12 - 15| Metadata length | (uint32) - Metadata length
|---------------------------|
|16 - | Row numbers | (uint32*) - number of rows per row group
|---------------------------|
| . . . | Schema offsets | (uint32*) - offsets of schema element (1 + 1 + c + 1)
|---------------------------|
| . . . | Row group offsets | (uint32*) - offsets of row grpups (1 + rg + 1)
|---------------------------|
| . . . | Chunks offsets | (uint32*) - offsets of column chunks rg * (1 + c + 1)
|---------------------------|
| . . . | Metadata | (uint8*) - Original metadata (thrift compact protocol)
| . . . | metadata | [bytes] - Section with original metadata (thrift compact protocol)
-----------------------------
*/

Expand Down Expand Up @@ -355,13 +353,17 @@ std::vector<char> GenerateMetadataIndex(const char *parquet_path)

if (data_header.column_names_length != written_column_names_length)
{
throw std::logic_error("Error when writign the index file, data_header.column_names_length != written_column_names_length !");
throw std::logic_error("Error when writign the index file, data_header.column_names_length != written_column_names_length !");
}

uint32_t offset = fs.tellp();

fs.write((const char *)thrift_buffer.get()->data(), thrift_buffer.get()->size());
auto s = fs.str();
if (sizeof(data_header) + data_header.get_body_size() != s.size())
{
auto msg = std::string("Error when writign the index file, exexted size=") + std::to_string(sizeof(data_header) + data_header.get_body_size()) + ", actual size=" + std::to_string(s.size()) + " !";
throw std::logic_error(msg);
}

std::vector<char> v(s.size());
memcpy(&v[0], s.data(), s.size());
return v;
Expand Down Expand Up @@ -437,6 +439,8 @@ std::shared_ptr<parquet::FileMetaData> ReadMetadata(const DataHeader &dataHeader
std::vector<uint32_t> columns = column_indices;
if (column_names.size() > 0)
{
columns.reserve(column_names.size());

std::unordered_map<std::string, uint32_t> columns_map;
for (uint32_t c = 0; c < dataHeader.columns; c++)
{
Expand All @@ -460,28 +464,29 @@ std::shared_ptr<parquet::FileMetaData> ReadMetadata(const DataHeader &dataHeader
throw std::logic_error(msg);
}

columns.push_back(kvp->second);
columns.emplace_back(kvp->second);
}
}

if (columns.size() > 0)
{
//> 2:required list<SchemaElement> schema;
auto schema_list = &schema_offsets[0];
toCopy = schema_list[0] - index_src;
thriftCopier.CopyFrom(index_src, toCopy);
index_src += toCopy;

thriftCopier.WriteListBegin(::apache::thrift::protocol::T_STRUCT, columns.size() + 1); // one extra element for root
index_src = schema_list[1];
thriftCopier.WriteListBegin(::apache::thrift::protocol::T_STRUCT, columns.size() + 1); // one extra schema element for root
index_src = schema_list[1]; // skip the list header and jump to the first schema element (which is the root element)

auto root_schema_element = &schema_list[1];
toCopy = root_schema_element[0] + schema_num_children_offsets[0] - index_src;
thriftCopier.CopyFrom(index_src, toCopy);
index_src = root_schema_element[0] + schema_num_children_offsets[1];

// Update the num children in the schema
// Write updated num children in the root element
//> 5: optional i32 num_children;
thriftCopier.WriteI32(columns.size());

index_src = root_schema_element[0] + schema_num_children_offsets[1];
toCopy = root_schema_element[1] - index_src;
thriftCopier.CopyFrom(index_src, toCopy);
index_src += toCopy;
Expand All @@ -498,6 +503,7 @@ std::shared_ptr<parquet::FileMetaData> ReadMetadata(const DataHeader &dataHeader

if (row_groups.size() > 0)
{
//> 3: required i64 num_rows
int64_t num_rows = 0;
for (auto row_group : row_groups)
{
Expand All @@ -515,6 +521,7 @@ std::shared_ptr<parquet::FileMetaData> ReadMetadata(const DataHeader &dataHeader
auto row_group_filtering = row_groups.size() > 0;
if (row_group_filtering)
{
//> 4: required list<RowGroup> row_groups
auto row_groups_list = &row_groups_offsets[0];
toCopy = row_groups_list[0] - index_src;
thriftCopier.CopyFrom(index_src, toCopy);
Expand All @@ -525,7 +532,7 @@ std::shared_ptr<parquet::FileMetaData> ReadMetadata(const DataHeader &dataHeader
}
else
{
// copy, including the list
// Copy to here, including the list header
auto row_groups_list = &row_groups_offsets[0];
toCopy = row_groups_list[1] - index_src;
thriftCopier.CopyFrom(index_src, toCopy);
Expand Down Expand Up @@ -555,14 +562,12 @@ std::shared_ptr<parquet::FileMetaData> ReadMetadata(const DataHeader &dataHeader
index_src = row_groups_offsets[1 + row_group_idx];
if (columns.size() > 0)
{
//> 1: required list<ColumnChunk> columns
auto chunks_list = &column_chunks_offsets[(1 + dataHeader.columns + 1) * row_group_idx];
auto chunks = &chunks_list[1];

// START HERE
toCopy = row_group_offset + chunks_list[0] - index_src;
thriftCopier.CopyFrom(index_src, toCopy);

thriftCopier.WriteListBegin(::apache::thrift::protocol::T_STRUCT, columns.size()); // one extra element for root);
thriftCopier.WriteListBegin(::apache::thrift::protocol::T_STRUCT, columns.size());

for (auto column_to_copy : columns)
{
Expand All @@ -583,10 +588,11 @@ std::shared_ptr<parquet::FileMetaData> ReadMetadata(const DataHeader &dataHeader
}
}

index_src = row_groups_offsets[dataHeader.get_row_groups_offsets_size() - 1];
index_src = row_groups_offsets[dataHeader.get_row_groups_offsets_size()];

if (columns.size() > 0)
{
//> 7: optional list<ColumnOrder> column_orders;
auto column_orders_list = &column_orders_offsets[0];
toCopy = column_orders_list[0] - index_src;
thriftCopier.CopyFrom(index_src, toCopy);
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "palletjack"
version = "2.1.0"
version = "2.1.1"
authors = [
{ name="Marcin Krystianc", email="[email protected]" },
]
Expand Down

0 comments on commit 4c3a956

Please sign in to comment.