Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof-of-concept Parquet GEOMETRY logical type implementation #43977

Open
wants to merge 63 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
4a4ebc9
update thrift
paleolimbot Jun 29, 2024
6f4e4b7
update thrift defs
paleolimbot Jun 29, 2024
6acb2f2
add stubs
paleolimbot Jun 30, 2024
6ff5855
split methods out of line
paleolimbot Jun 30, 2024
0ac5d84
maybe to/from thrift
paleolimbot Jul 9, 2024
8a80ac7
a few more serializers
paleolimbot Jul 9, 2024
f330997
add basic test for serialization
paleolimbot Aug 7, 2024
b34b1c2
add sort order check
paleolimbot Aug 7, 2024
7e9f9b2
update thrift
paleolimbot Aug 12, 2024
6a9ac3a
start geom utiles
paleolimbot Aug 12, 2024
a22908f
test roundtrip thrift cases
paleolimbot Aug 14, 2024
dcc083a
more geometry utils
paleolimbot Aug 14, 2024
9acc840
more
paleolimbot Aug 14, 2024
e18412c
bounder
paleolimbot Aug 14, 2024
76b3f59
add basic test
paleolimbot Aug 14, 2024
fc77ff2
a few more strings
paleolimbot Aug 14, 2024
21f11a1
test some bounding box things
paleolimbot Aug 14, 2024
6bbce5c
more tests
paleolimbot Aug 14, 2024
bb41b06
fix test
paleolimbot Aug 14, 2024
88a42f5
with passing tests
paleolimbot Aug 14, 2024
3798ef1
add in WKT equiv
paleolimbot Aug 14, 2024
ccbd616
more tests
paleolimbot Aug 14, 2024
3752f83
start on stats
paleolimbot Aug 17, 2024
2ff6078
implement update/merge for geometry statistics
paleolimbot Aug 19, 2024
79e4715
more complete stats
paleolimbot Aug 19, 2024
4444757
start on factory methods
paleolimbot Aug 19, 2024
176c997
more stats things
paleolimbot Aug 19, 2024
1de193e
maybe work with serde
paleolimbot Aug 19, 2024
d2f8157
Update cpp/src/parquet/types.cc
paleolimbot Aug 19, 2024
b525e84
Updated parquet.thrift and re-generated cpp sources
Kontinuation Sep 3, 2024
6f0500e
Geometry value writer could make use of the geometry statistics class to
Kontinuation Sep 3, 2024
c052ae0
Geometry column writer now populates correct statistics
Kontinuation Sep 4, 2024
3c6b222
format/tidy
Kontinuation Sep 4, 2024
f6ae9ae
Run clang-tidy
Kontinuation Sep 4, 2024
1c9523b
Added a test that writes and reads a parquet file containing a geomet…
Kontinuation Sep 5, 2024
5a50790
Remove redundant include
Kontinuation Sep 6, 2024
51e4ab8
Fix problems found by reviewers
Kontinuation Sep 6, 2024
c40c04e
Try to make it build properly on other platforms
Kontinuation Sep 6, 2024
e89962f
Merge branch 'main' into kontinuation-parquet-geometry
Kontinuation Sep 6, 2024
fb134d3
Address review comments in https://github.com/apache/arrow/pull/43196
Kontinuation Sep 6, 2024
2f4329e
Resolve compile errors for MSVC
Kontinuation Sep 6, 2024
1db855f
Expose getters in GeometryStatistics, Change geometry_types from
Kontinuation Sep 10, 2024
ad92bb6
Add test case for UpdateSpaced, don't generate min/max stats for geom…
Kontinuation Sep 11, 2024
f782e30
Support covering
Kontinuation Sep 11, 2024
9813f48
MakeStatistics and Statistics::Make should not be a breaking change
Kontinuation Sep 12, 2024
c56133c
ColumnIndex, as well as some other fixes and refacturings
Kontinuation Sep 12, 2024
174e1e1
Fix compiler warnings on AMD platforms as well as sanitizer warnings
Kontinuation Sep 12, 2024
bd0e2ad
Remove all newly added include directives
Kontinuation Sep 12, 2024
1521bac
include cmath for std::isnan
Kontinuation Sep 12, 2024
572e865
Test writing WKB encoded geometries using WriteArrow
Kontinuation Sep 16, 2024
6c322d5
Change the sort order of geometry from unknown to unsigned; resolved …
Kontinuation Sep 19, 2024
cd43ba5
Add generate_covering_ member to be explicit that' we'll generate the…
Kontinuation Sep 19, 2024
33803bc
Refactor unscoped enums in geometry_util_internal to enum classes
Kontinuation Sep 19, 2024
31a70c5
Revert more special case handling for unknown sort order
Kontinuation Sep 19, 2024
7dfbf4b
Fix WKB covering test to take native endianness into consideration
Kontinuation Sep 19, 2024
5d8ab77
min/max of geometry columns are the WKB representation of lower-left …
Kontinuation Sep 19, 2024
ae5926f
Address latest review comments
Kontinuation Sep 20, 2024
672f19c
A better implementation of geometry min/max statistics
Kontinuation Sep 20, 2024
26ba162
Update the code to accomodate the latest changes of the standard:
Kontinuation Oct 7, 2024
fe8a3e5
Fix problem decoding WKB geometries with more than 32 coordinates
Kontinuation Oct 15, 2024
ba80f3e
Re-implemented geometry statistics according to the updated spec:
Kontinuation Oct 30, 2024
9aca79d
Merge branch 'main' into kontinuation-parquet-geometry
Kontinuation Oct 30, 2024
da55a55
Revert some unnecessary changes
Kontinuation Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,534 changes: 1,495 additions & 1,039 deletions cpp/src/generated/parquet_types.cpp

Large diffs are not rendered by default.

709 changes: 615 additions & 94 deletions cpp/src/generated/parquet_types.h

Large diffs are not rendered by default.

1,003 changes: 768 additions & 235 deletions cpp/src/generated/parquet_types.tcc

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ add_parquet_test(internals-test
statistics_test.cc
encoding_test.cc
metadata_test.cc
geometry_util_internal_test.cc
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
page_index_test.cc
public_api_test.cc
types_test.cc)
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) {
if (stats.__isset.distinct_count) {
page_statistics.set_distinct_count(stats.distinct_count);
}
if (stats.__isset.geometry_stats) {
page_statistics.set_geometry(
FromThrift(stats.geometry_stats, stats.__isset.geometry_stats));
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
}
return page_statistics;
}

Expand Down
5 changes: 4 additions & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1224,8 +1224,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
// Will be null if not using dictionary, but that's ok
current_dict_encoder_ = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());

bool is_geometry =
(descr_->logical_type() != nullptr && descr_->logical_type()->is_geometry());
bool has_sort_order = SortOrder::UNKNOWN != descr_->sort_order();
if (properties->statistics_enabled(descr_->path()) &&
(SortOrder::UNKNOWN != descr_->sort_order())) {
(is_geometry || has_sort_order)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just pretend geometry type is a binary type? Then we don't have to change any line here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the sort order of geometry is unknown, so I have to special case the handling of statistics generation for geometry. This nasty workaround happens in lots of places so we definitely have to make it cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the sort order of geometry is unknown

We didn't add this to the spec yet so it is still using the default column order. That's why there is a comment saying it is undefined.

page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
}
Expand Down
194 changes: 194 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "parquet/column_writer.h"
#include "parquet/file_reader.h"
#include "parquet/file_writer.h"
#include "parquet/geometry_util_internal.h"
#include "parquet/metadata.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
Expand Down Expand Up @@ -399,6 +400,13 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
return metadata_accessor->key_value_metadata();
}

std::shared_ptr<Statistics> metadata_stats() {
ApplicationVersion app_version(this->writer_properties_->created_by());
auto metadata_accessor = ColumnChunkMetaData::Make(
metadata_->contents(), this->descr_, default_reader_properties(), &app_version);
return metadata_accessor->statistics();
}

protected:
int64_t values_read_;
// Keep the reader alive as for ByteArray the lifetime of the ByteArray
Expand Down Expand Up @@ -1774,5 +1782,191 @@ TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) {
ASSERT_EQ("bar", value);
}

// Test writing and reading geometry columns
class TestGeometryValuesWriter : public TestPrimitiveWriter<ByteArrayType> {
public:
static const char* CRS;
static const char* METADATA;
wgtmac marked this conversation as resolved.
Show resolved Hide resolved

void SetUpSchema(Repetition::type repetition, int num_columns) override {
std::vector<schema::NodePtr> fields;

for (int i = 0; i < num_columns; ++i) {
std::string name = TestColumnName(i);
std::shared_ptr<const LogicalType> logical_type =
GeometryLogicalType::Make(CRS, LogicalType::GeometryEdges::PLANAR,
LogicalType::GeometryEncoding::WKB, METADATA);
fields.push_back(schema::PrimitiveNode::Make(name, repetition, logical_type,
ByteArrayType::type_num));
}
node_ = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
schema_.Init(node_);
}

void GenerateData(int64_t num_values, uint32_t seed = 0) {
values_.resize(num_values);

buffer_.resize(num_values * WKB_POINT_SIZE);
uint8_t* ptr = buffer_.data();
for (int k = 0; k < num_values; k++) {
GenerateWKBPoint(ptr, k, k + 1);
values_[k].len = WKB_POINT_SIZE;
values_[k].ptr = ptr;
ptr += WKB_POINT_SIZE;
}

values_ptr_ = values_.data();
}

void TestWriteAndRead(ParquetVersion::type version,
ParquetDataPageVersion data_page_version) {
this->SetUpSchema(Repetition::REQUIRED, 1);
this->GenerateData(SMALL_SIZE);
size_t num_values = this->values_.size();
auto writer =
this->BuildWriter(num_values, ColumnProperties(), version, data_page_version,
/*enable_checksum*/ false);
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_.data());

writer->Close();
this->ReadColumn();
for (size_t i = 0; i < num_values; i++) {
const ByteArray& value = this->values_out_[i];
double x = 0;
double y = 0;
EXPECT_TRUE(GetWKBPointCoordinate(value, &x, &y));
auto expected_x = static_cast<double>(i);
auto expected_y = static_cast<double>(i + 1);
EXPECT_DOUBLE_EQ(expected_x, x);
EXPECT_DOUBLE_EQ(expected_y, y);
}

std::shared_ptr<Statistics> statistics = metadata_stats();
EXPECT_FALSE(statistics->HasMinMax());
EXPECT_TRUE(statistics->HasGeometryStatistics());
const GeometryStatistics* geometry_statistics = statistics->geometry_statistics();
std::vector<int32_t> geometry_types = geometry_statistics->GetGeometryTypes();
EXPECT_EQ(1, geometry_types.size());
EXPECT_EQ(1, geometry_types[0]);
EXPECT_DOUBLE_EQ(0, geometry_statistics->GetXMin());
EXPECT_DOUBLE_EQ(1, geometry_statistics->GetYMin());
EXPECT_DOUBLE_EQ(99, geometry_statistics->GetXMax());
EXPECT_DOUBLE_EQ(100, geometry_statistics->GetYMax());
EXPECT_FALSE(geometry_statistics->HasZ());
EXPECT_FALSE(geometry_statistics->HasM());

auto coverings = geometry_statistics->GetCoverings();
EXPECT_EQ(1, coverings.size());
EXPECT_EQ("WKB", coverings[0].first);
geometry::WKBGeometryBounder bounder;
const std::string& wkb = coverings[0].second;
geometry::WKBBuffer wkb_buffer(reinterpret_cast<const uint8_t*>(wkb.data()),
wkb.size());
bounder.ReadGeometry(&wkb_buffer);
bounder.Flush();
auto bounds = bounder.Bounds();
EXPECT_DOUBLE_EQ(0, bounds.min[0]);
EXPECT_DOUBLE_EQ(1, bounds.min[1]);
EXPECT_DOUBLE_EQ(99, bounds.max[0]);
EXPECT_DOUBLE_EQ(100, bounds.max[1]);
}

void TestWriteAndReadSpaced(ParquetVersion::type version,
ParquetDataPageVersion data_page_version) {
this->SetUpSchema(Repetition::OPTIONAL, 1);
this->GenerateData(SMALL_SIZE);
size_t num_values = this->values_.size();

std::vector<int16_t> definition_levels(num_values, 1);
std::vector<int16_t> repetition_levels(num_values, 0);
std::vector<size_t> non_null_indices;

// Replace some of the generated data with NULL
for (size_t i = 0; i < num_values; i++) {
if (i % 3 == 0) {
definition_levels[i] = 0;
} else {
non_null_indices.push_back(i);
}
}

// Construct valid bits using definition levels
std::vector<uint8_t> valid_bytes(num_values);
std::transform(definition_levels.begin(), definition_levels.end(),
valid_bytes.begin(),
[&](int64_t level) { return static_cast<uint8_t>(level); });
std::shared_ptr<Buffer> valid_bits;
ASSERT_OK_AND_ASSIGN(valid_bits, ::arrow::internal::BytesToBits(valid_bytes));

auto writer =
this->BuildWriter(num_values, ColumnProperties(), version, data_page_version,
/*enable_checksum*/ false);
writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(),
repetition_levels.data(), valid_bits->data(), 0,
this->values_.data());

writer->Close();
this->ReadColumn();
size_t expected_values_read = non_null_indices.size();
EXPECT_EQ(expected_values_read, values_read_);
for (int64_t i = 0; i < values_read_; i++) {
const ByteArray& value = this->values_out_[i];
double x = 0;
double y = 0;
EXPECT_TRUE(GetWKBPointCoordinate(value, &x, &y));
auto expected_x = static_cast<double>(non_null_indices[i]);
auto expected_y = static_cast<double>(non_null_indices[i] + 1);
EXPECT_DOUBLE_EQ(expected_x, x);
EXPECT_DOUBLE_EQ(expected_y, y);
}

std::shared_ptr<Statistics> statistics = metadata_stats();
EXPECT_FALSE(statistics->HasMinMax());
EXPECT_TRUE(statistics->HasGeometryStatistics());
const GeometryStatistics* geometry_statistics = statistics->geometry_statistics();
std::vector<int32_t> geometry_types = geometry_statistics->GetGeometryTypes();
EXPECT_EQ(1, geometry_types.size());
EXPECT_EQ(1, geometry_types[0]);
EXPECT_DOUBLE_EQ(1, geometry_statistics->GetXMin());
EXPECT_DOUBLE_EQ(2, geometry_statistics->GetYMin());
EXPECT_DOUBLE_EQ(98, geometry_statistics->GetXMax());
EXPECT_DOUBLE_EQ(99, geometry_statistics->GetYMax());
EXPECT_FALSE(geometry_statistics->HasZ());
EXPECT_FALSE(geometry_statistics->HasM());
}
};

const char* TestGeometryValuesWriter::CRS =
R"({"id": {"authority": "OGC", "code": "CRS84"}})";
const char* TestGeometryValuesWriter::METADATA = "test_metadata";

TEST_F(TestGeometryValuesWriter, TestWriteAndReadV1) {
for (auto data_page_version :
{ParquetDataPageVersion::V1, ParquetDataPageVersion::V2}) {
TestWriteAndRead(ParquetVersion::PARQUET_1_0, data_page_version);
}
}

TEST_F(TestGeometryValuesWriter, TestWriteAndReadV2) {
for (auto data_page_version :
{ParquetDataPageVersion::V1, ParquetDataPageVersion::V2}) {
TestWriteAndRead(ParquetVersion::PARQUET_2_4, data_page_version);
}
}

TEST_F(TestGeometryValuesWriter, TestWriteAndReadV1Spaced) {
for (auto data_page_version :
{ParquetDataPageVersion::V1, ParquetDataPageVersion::V2}) {
TestWriteAndReadSpaced(ParquetVersion::PARQUET_1_0, data_page_version);
}
}

TEST_F(TestGeometryValuesWriter, TestWriteAndReadV2Spaced) {
for (auto data_page_version :
{ParquetDataPageVersion::V1, ParquetDataPageVersion::V2}) {
TestWriteAndReadSpaced(ParquetVersion::PARQUET_2_4, data_page_version);
}
}

} // namespace test
} // namespace parquet
Loading
Loading