From 27acf8bf6e16d4c53e1ed59d5ef46ac7db0306ea Mon Sep 17 00:00:00 2001 From: Pradeep Gollakota Date: Wed, 11 Sep 2024 03:45:08 -0700 Subject: [PATCH] GH-32538: [C++][Parquet] Add JSON canonical extension type (#13901) Arrow now provides a canonical extension type for JSON data. This extension is backed by utf8(). Parquet will recognize this extension and appropriately propagate the LogicalType to the storage format. * GitHub Issue: #32538 Lead-authored-by: Rok Mihevc Co-authored-by: Pradeep Gollakota Co-authored-by: Antoine Pitrou Co-authored-by: mwish Co-authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/array/validate.cc | 20 +++- cpp/src/arrow/extension/CMakeLists.txt | 2 +- .../extension/fixed_shape_tensor_test.cc | 6 +- cpp/src/arrow/extension/json.cc | 61 ++++++++++++ cpp/src/arrow/extension/json.h | 56 +++++++++++ cpp/src/arrow/extension/json_test.cc | 83 ++++++++++++++++ cpp/src/arrow/extension/uuid_test.cc | 4 +- cpp/src/arrow/extension_type.cc | 4 +- cpp/src/arrow/extension_type_test.cc | 6 +- cpp/src/arrow/ipc/test_common.cc | 17 ++-- cpp/src/arrow/ipc/test_common.h | 4 +- cpp/src/arrow/testing/gtest_util.cc | 1 + .../parquet/arrow/arrow_reader_writer_test.cc | 61 +++++++++++- cpp/src/parquet/arrow/arrow_schema_test.cc | 94 ++++++++++++++++++- cpp/src/parquet/arrow/schema.cc | 46 ++++++--- cpp/src/parquet/arrow/schema_internal.cc | 24 +++-- cpp/src/parquet/arrow/schema_internal.h | 8 +- cpp/src/parquet/properties.h | 16 +++- docs/source/status.rst | 2 +- 20 files changed, 460 insertions(+), 56 deletions(-) create mode 100644 cpp/src/arrow/extension/json.cc create mode 100644 cpp/src/arrow/extension/json.h create mode 100644 cpp/src/arrow/extension/json_test.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 01ac813f4713b..e77a02d0c0800 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -376,6 +376,7 @@ set(ARROW_SRCS device_allocation_type_set.cc extension_type.cc extension/bool8.cc + extension/json.cc extension/uuid.cc pretty_print.cc record_batch.cc diff --git a/cpp/src/arrow/array/validate.cc b/cpp/src/arrow/array/validate.cc index 0d940d3bc869e..69f1646054f4c 100644 --- a/cpp/src/arrow/array/validate.cc +++ b/cpp/src/arrow/array/validate.cc @@ -985,10 +985,22 @@ Status ValidateArrayFull(const Array& array) { return ValidateArrayFull(*array.d ARROW_EXPORT Status ValidateUTF8(const ArrayData& data) { - DCHECK(data.type->id() == Type::STRING || data.type->id() == Type::STRING_VIEW || - data.type->id() == Type::LARGE_STRING); - UTF8DataValidator validator{data}; - return VisitTypeInline(*data.type, &validator); + const auto& storage_type = + (data.type->id() == Type::EXTENSION) + ? checked_cast(*data.type).storage_type() + : data.type; + DCHECK(storage_type->id() == Type::STRING || storage_type->id() == Type::STRING_VIEW || + storage_type->id() == Type::LARGE_STRING); + + if (data.type->id() == Type::EXTENSION) { + ArrayData ext_data(data); + ext_data.type = storage_type; + UTF8DataValidator validator{ext_data}; + return VisitTypeInline(*storage_type, &validator); + } else { + UTF8DataValidator validator{data}; + return VisitTypeInline(*storage_type, &validator); + } } ARROW_EXPORT diff --git a/cpp/src/arrow/extension/CMakeLists.txt b/cpp/src/arrow/extension/CMakeLists.txt index 065ea3f1ddb16..4ab6a35b52e4f 100644 --- a/cpp/src/arrow/extension/CMakeLists.txt +++ b/cpp/src/arrow/extension/CMakeLists.txt @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -set(CANONICAL_EXTENSION_TESTS bool8_test.cc uuid_test.cc) +set(CANONICAL_EXTENSION_TESTS bool8_test.cc json_test.cc uuid_test.cc) if(ARROW_JSON) list(APPEND CANONICAL_EXTENSION_TESTS fixed_shape_tensor_test.cc opaque_test.cc) diff --git a/cpp/src/arrow/extension/fixed_shape_tensor_test.cc b/cpp/src/arrow/extension/fixed_shape_tensor_test.cc index 842a78e1a4f7a..51aea4b25fdda 100644 --- a/cpp/src/arrow/extension/fixed_shape_tensor_test.cc +++ b/cpp/src/arrow/extension/fixed_shape_tensor_test.cc @@ -205,7 +205,7 @@ TEST_F(TestExtensionType, RoundtripBatch) { std::shared_ptr read_batch; auto ext_field = field(/*name=*/"f0", /*type=*/ext_type_); auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr}); - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, /*compare_metadata=*/true); // Pass extension metadata and storage array, expect getting back extension array @@ -216,7 +216,7 @@ TEST_F(TestExtensionType, RoundtripBatch) { ext_field = field(/*name=*/"f0", /*type=*/element_type_, /*nullable=*/true, /*metadata=*/ext_metadata); auto batch2 = RecordBatch::Make(schema({ext_field}), fsla_arr->length(), {fsla_arr}); - RoundtripBatch(batch2, &read_batch2); + ASSERT_OK(RoundtripBatch(batch2, &read_batch2)); CompareBatch(*batch, *read_batch2, /*compare_metadata=*/true); } @@ -469,7 +469,7 @@ TEST_F(TestExtensionType, RoundtripBatchFromTensor) { auto ext_field = field("f0", ext_type_, true, ext_metadata); auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr}); std::shared_ptr read_batch; - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, /*compare_metadata=*/true); } diff --git a/cpp/src/arrow/extension/json.cc b/cpp/src/arrow/extension/json.cc new file mode 100644 index 0000000000000..d793233c2b573 --- /dev/null +++ b/cpp/src/arrow/extension/json.cc @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/extension/json.h" + +#include + +#include "arrow/extension_type.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace arrow::extension { + +bool JsonExtensionType::ExtensionEquals(const ExtensionType& other) const { + return other.extension_name() == this->extension_name(); +} + +Result> JsonExtensionType::Deserialize( + std::shared_ptr storage_type, const std::string& serialized) const { + if (storage_type->id() != Type::STRING && storage_type->id() != Type::STRING_VIEW && + storage_type->id() != Type::LARGE_STRING) { + return Status::Invalid("Invalid storage type for JsonExtensionType: ", + storage_type->ToString()); + } + return std::make_shared(storage_type); +} + +std::string JsonExtensionType::Serialize() const { return ""; } + +std::shared_ptr JsonExtensionType::MakeArray( + std::shared_ptr data) const { + DCHECK_EQ(data->type->id(), Type::EXTENSION); + DCHECK_EQ("arrow.json", + internal::checked_cast(*data->type).extension_name()); + return std::make_shared(data); +} + +std::shared_ptr json(const std::shared_ptr storage_type) { + ARROW_CHECK(storage_type->id() != Type::STRING || + storage_type->id() != Type::STRING_VIEW || + storage_type->id() != Type::LARGE_STRING); + return std::make_shared(storage_type); +} + +} // namespace arrow::extension diff --git a/cpp/src/arrow/extension/json.h b/cpp/src/arrow/extension/json.h new file mode 100644 index 0000000000000..4793ab2bc9b36 --- /dev/null +++ b/cpp/src/arrow/extension/json.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/extension_type.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace arrow::extension { + +/// \brief Concrete type class for variable-size JSON data, utf8-encoded. +class ARROW_EXPORT JsonExtensionType : public ExtensionType { + public: + explicit JsonExtensionType(const std::shared_ptr& storage_type) + : ExtensionType(storage_type), storage_type_(storage_type) {} + + std::string extension_name() const override { return "arrow.json"; } + + bool ExtensionEquals(const ExtensionType& other) const override; + + Result> Deserialize( + std::shared_ptr storage_type, + const std::string& serialized_data) const override; + + std::string Serialize() const override; + + std::shared_ptr MakeArray(std::shared_ptr data) const override; + + private: + std::shared_ptr storage_type_; +}; + +/// \brief Return a JsonExtensionType instance. +ARROW_EXPORT std::shared_ptr json( + std::shared_ptr storage_type = utf8()); + +} // namespace arrow::extension diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc new file mode 100644 index 0000000000000..143e4f9ceeac7 --- /dev/null +++ b/cpp/src/arrow/extension/json_test.cc @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/extension/json.h" + +#include "arrow/array/validate.h" +#include "arrow/ipc/test_common.h" +#include "arrow/record_batch.h" +#include "arrow/testing/gtest_util.h" +#include "parquet/exception.h" + +namespace arrow { + +using arrow::ipc::test::RoundtripBatch; +using extension::json; + +class TestJsonExtensionType : public ::testing::Test {}; + +std::shared_ptr ExampleJson(const std::shared_ptr& storage_type) { + std::shared_ptr arr = ArrayFromJSON(storage_type, R"([ + "null", + "1234", + "3.14159", + "true", + "false", + "\"a json string\"", + "[\"a\", \"json\", \"array\"]", + "{\"obj\": \"a simple json object\"}" + ])"); + return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr); +} + +TEST_F(TestJsonExtensionType, JsonRoundtrip) { + for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { + std::shared_ptr ext_arr = ExampleJson(storage_type); + auto batch = + RecordBatch::Make(schema({field("f0", json(storage_type))}), 8, {ext_arr}); + + std::shared_ptr read_batch; + ASSERT_OK(RoundtripBatch(batch, &read_batch)); + ASSERT_OK(read_batch->ValidateFull()); + CompareBatch(*batch, *read_batch, /*compare_metadata*/ true); + + auto read_ext_arr = read_batch->column(0); + ASSERT_OK(internal::ValidateUTF8(*read_ext_arr)); + ASSERT_OK(read_ext_arr->ValidateFull()); + } +} + +TEST_F(TestJsonExtensionType, InvalidUTF8) { + for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { + auto json_type = json(storage_type); + auto invalid_input = ArrayFromJSON(storage_type, "[\"Ⱥa\xFFⱭ\", \"Ɽ\xe1\xbdⱤaA\"]"); + auto ext_arr = ExtensionType::WrapArray(json_type, invalid_input); + + ASSERT_RAISES_WITH_MESSAGE(Invalid, + "Invalid: Invalid UTF8 sequence at string index 0", + ext_arr->ValidateFull()); + ASSERT_RAISES_WITH_MESSAGE(Invalid, + "Invalid: Invalid UTF8 sequence at string index 0", + arrow::internal::ValidateUTF8(*ext_arr)); + + auto batch = RecordBatch::Make(schema({field("f0", json_type)}), 2, {ext_arr}); + std::shared_ptr read_batch; + ASSERT_OK(RoundtripBatch(batch, &read_batch)); + } +} + +} // namespace arrow diff --git a/cpp/src/arrow/extension/uuid_test.cc b/cpp/src/arrow/extension/uuid_test.cc index 3bbb6eeb4aef1..1c1ffb6eb8e15 100644 --- a/cpp/src/arrow/extension/uuid_test.cc +++ b/cpp/src/arrow/extension/uuid_test.cc @@ -54,7 +54,7 @@ TEST(TestUuuidExtensionType, RoundtripBatch) { std::shared_ptr read_batch; auto ext_field = field(/*name=*/"f0", /*type=*/ext_type); auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr}); - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, /*compare_metadata=*/true); // Pass extension metadata and storage array, expect getting back extension array @@ -65,7 +65,7 @@ TEST(TestUuuidExtensionType, RoundtripBatch) { ext_field = field(/*name=*/"f0", /*type=*/exact_ext_type->storage_type(), /*nullable=*/true, /*metadata=*/ext_metadata); auto batch2 = RecordBatch::Make(schema({ext_field}), arr->length(), {arr}); - RoundtripBatch(batch2, &read_batch2); + ASSERT_OK(RoundtripBatch(batch2, &read_batch2)); CompareBatch(*batch, *read_batch2, /*compare_metadata=*/true); } diff --git a/cpp/src/arrow/extension_type.cc b/cpp/src/arrow/extension_type.cc index d0135e905a0c3..7ad39eab23f8d 100644 --- a/cpp/src/arrow/extension_type.cc +++ b/cpp/src/arrow/extension_type.cc @@ -32,6 +32,7 @@ # include "arrow/extension/fixed_shape_tensor.h" # include "arrow/extension/opaque.h" #endif +#include "arrow/extension/json.h" #include "arrow/extension/uuid.h" #include "arrow/status.h" #include "arrow/type.h" @@ -148,7 +149,8 @@ static void CreateGlobalRegistry() { // Register canonical extension types g_registry = std::make_shared(); - std::vector> ext_types{extension::bool8(), extension::uuid()}; + std::vector> ext_types{extension::bool8(), extension::json(), + extension::uuid()}; #ifdef ARROW_JSON ext_types.push_back(extension::fixed_shape_tensor(int64(), {})); diff --git a/cpp/src/arrow/extension_type_test.cc b/cpp/src/arrow/extension_type_test.cc index f49ffc5cba553..029d833b98cd8 100644 --- a/cpp/src/arrow/extension_type_test.cc +++ b/cpp/src/arrow/extension_type_test.cc @@ -219,14 +219,14 @@ TEST_F(TestExtensionType, IpcRoundtrip) { auto batch = RecordBatch::Make(schema({field("f0", uuid())}), 4, {ext_arr}); std::shared_ptr read_batch; - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, false /* compare_metadata */); // Wrap type in a ListArray and ensure it also makes it auto offsets_arr = ArrayFromJSON(int32(), "[0, 0, 2, 4]"); ASSERT_OK_AND_ASSIGN(auto list_arr, ListArray::FromArrays(*offsets_arr, *ext_arr)); batch = RecordBatch::Make(schema({field("f0", list(uuid()))}), 3, {list_arr}); - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, false /* compare_metadata */); } @@ -289,7 +289,7 @@ TEST_F(TestExtensionType, ParametricTypes) { 4, {p1, p2, p3, p4}); std::shared_ptr read_batch; - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, false /* compare_metadata */); } diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc index fb4f6bd8eadcf..e354e2f89b3b3 100644 --- a/cpp/src/arrow/ipc/test_common.cc +++ b/cpp/src/arrow/ipc/test_common.cc @@ -1236,18 +1236,19 @@ Status MakeRandomTensor(const std::shared_ptr& type, return Tensor::Make(type, buf, shape, strides).Value(out); } -void RoundtripBatch(const std::shared_ptr& batch, - std::shared_ptr* out) { - ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create()); - ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(), - out_stream.get())); +Status RoundtripBatch(const std::shared_ptr& batch, + std::shared_ptr* out) { + ARROW_ASSIGN_OR_RAISE(auto out_stream, io::BufferOutputStream::Create()); + RETURN_NOT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(), + out_stream.get())); - ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish()); + ARROW_ASSIGN_OR_RAISE(auto complete_ipc_stream, out_stream->Finish()); io::BufferReader reader(complete_ipc_stream); std::shared_ptr batch_reader; - ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader)); - ASSERT_OK(batch_reader->ReadNext(out)); + ARROW_ASSIGN_OR_RAISE(batch_reader, ipc::RecordBatchStreamReader::Open(&reader)); + RETURN_NOT_OK(batch_reader->ReadNext(out)); + return Status::OK(); } } // namespace test diff --git a/cpp/src/arrow/ipc/test_common.h b/cpp/src/arrow/ipc/test_common.h index 9b7e7f13e3a8e..189de288795c0 100644 --- a/cpp/src/arrow/ipc/test_common.h +++ b/cpp/src/arrow/ipc/test_common.h @@ -184,8 +184,8 @@ Status MakeRandomTensor(const std::shared_ptr& type, const std::vector& shape, bool row_major_p, std::shared_ptr* out, uint32_t seed = 0); -ARROW_TESTING_EXPORT void RoundtripBatch(const std::shared_ptr& batch, - std::shared_ptr* out); +ARROW_TESTING_EXPORT Status RoundtripBatch(const std::shared_ptr& batch, + std::shared_ptr* out); } // namespace test } // namespace ipc diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index c4a7f363c71bc..07d15826f2c8f 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -49,6 +49,7 @@ #include "arrow/buffer.h" #include "arrow/compute/api_vector.h" #include "arrow/datum.h" +#include "arrow/extension/json.h" #include "arrow/io/memory.h" #include "arrow/ipc/json_simple.h" #include "arrow/ipc/reader.h" diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 724e6c44f2ed0..5d990a5c6bd4a 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -37,6 +37,7 @@ #include "arrow/array/builder_primitive.h" #include "arrow/chunked_array.h" #include "arrow/compute/api.h" +#include "arrow/extension/json.h" #include "arrow/io/api.h" #include "arrow/record_batch.h" #include "arrow/scalar.h" @@ -618,10 +619,15 @@ class ParquetIOTestBase : public ::testing::Test { return ParquetFileWriter::Open(sink_, schema); } - void ReaderFromSink(std::unique_ptr* out) { + void ReaderFromSink( + std::unique_ptr* out, + const ArrowReaderProperties& properties = default_arrow_reader_properties()) { ASSERT_OK_AND_ASSIGN(auto buffer, sink_->Finish()); - ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), out)); + FileReaderBuilder builder; + ASSERT_OK_NO_THROW(builder.Open(std::make_shared(buffer))); + ASSERT_OK_NO_THROW(builder.memory_pool(::arrow::default_memory_pool()) + ->properties(properties) + ->Build(out)); } void ReadSingleColumnFile(std::unique_ptr file_reader, @@ -670,6 +676,7 @@ class ParquetIOTestBase : public ::testing::Test { void RoundTripSingleColumn( const std::shared_ptr& values, const std::shared_ptr& expected, const std::shared_ptr<::parquet::ArrowWriterProperties>& arrow_properties, + const ArrowReaderProperties& reader_properties = default_arrow_reader_properties(), bool nullable = true) { std::shared_ptr table = MakeSimpleTable(values, nullable); this->ResetSink(); @@ -679,7 +686,7 @@ class ParquetIOTestBase : public ::testing::Test { std::shared_ptr
out; std::unique_ptr reader; - ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); + ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader, reader_properties)); const bool expect_metadata = arrow_properties->store_schema(); ASSERT_NO_FATAL_FAILURE( this->ReadTableFromFile(std::move(reader), expect_metadata, &out)); @@ -1428,6 +1435,52 @@ TEST_F(TestLargeStringParquetIO, Basics) { this->RoundTripSingleColumn(large_array, large_array, arrow_properties); } +using TestJsonParquetIO = TestParquetIO<::arrow::extension::JsonExtensionType>; + +TEST_F(TestJsonParquetIO, JsonExtension) { + const char* json = R"([ + "null", + "1234", + "3.14159", + "true", + "false", + "\"a json string\"", + "[\"a\", \"json\", \"array\"]", + "{\"obj\": \"a simple json object\"}" + ])"; + + const auto json_type = ::arrow::extension::json(); + const auto string_array = ::arrow::ArrayFromJSON(::arrow::utf8(), json); + const auto json_array = ::arrow::ExtensionType::WrapArray(json_type, string_array); + + const auto json_large_type = ::arrow::extension::json(::arrow::large_utf8()); + const auto large_string_array = ::arrow::ArrayFromJSON(::arrow::large_utf8(), json); + const auto json_large_array = + ::arrow::ExtensionType::WrapArray(json_large_type, large_string_array); + + // When the original Arrow schema isn't stored and Arrow extensions are disabled, + // LogicalType::JSON is read as utf8. + this->RoundTripSingleColumn(json_array, string_array, + default_arrow_writer_properties()); + this->RoundTripSingleColumn(json_large_array, string_array, + default_arrow_writer_properties()); + + // When the original Arrow schema isn't stored and Arrow extensions are enabled, + // LogicalType::JSON is read as JsonExtensionType with utf8 storage. + ::parquet::ArrowReaderProperties reader_properties; + reader_properties.set_arrow_extensions_enabled(true); + this->RoundTripSingleColumn(json_array, json_array, default_arrow_writer_properties(), + reader_properties); + this->RoundTripSingleColumn(json_large_array, json_array, + default_arrow_writer_properties(), reader_properties); + + // When the original Arrow schema is stored, the stored Arrow type is respected. + const auto writer_properties = + ::parquet::ArrowWriterProperties::Builder().store_schema()->build(); + this->RoundTripSingleColumn(json_array, json_array, writer_properties); + this->RoundTripSingleColumn(json_large_array, json_large_array, writer_properties); +} + using TestNullParquetIO = TestParquetIO<::arrow::NullType>; TEST_F(TestNullParquetIO, NullColumn) { diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 9f60cd31d3541..31ead461aa6e2 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -31,8 +31,11 @@ #include "parquet/thrift_internal.h" #include "arrow/array.h" +#include "arrow/extension/json.h" +#include "arrow/ipc/writer.h" #include "arrow/testing/gtest_util.h" #include "arrow/type.h" +#include "arrow/util/base64.h" #include "arrow/util/key_value_metadata.h" using arrow::Field; @@ -76,17 +79,17 @@ class TestConvertParquetSchema : public ::testing::Test { auto result_field = result_schema_->field(i); auto expected_field = expected_schema->field(i); EXPECT_TRUE(result_field->Equals(expected_field, check_metadata)) - << "Field " << i << "\n result: " << result_field->ToString() - << "\n expected: " << expected_field->ToString(); + << "Field " << i << "\n result: " << result_field->ToString(check_metadata) + << "\n expected: " << expected_field->ToString(check_metadata); } } ::arrow::Status ConvertSchema( const std::vector& nodes, - const std::shared_ptr& key_value_metadata = nullptr) { + const std::shared_ptr& key_value_metadata = nullptr, + ArrowReaderProperties props = ArrowReaderProperties()) { NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); descr_.Init(schema); - ArrowReaderProperties props; return FromParquetSchema(&descr_, props, key_value_metadata, &result_schema_); } @@ -230,7 +233,7 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) { ::arrow::uint64()}, {"int(64, true)", LogicalType::Int(64, true), ParquetType::INT64, -1, ::arrow::int64()}, - {"json", LogicalType::JSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()}, + {"json", LogicalType::JSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::utf8()}, {"bson", LogicalType::BSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()}, {"interval", LogicalType::Interval(), ParquetType::FIXED_LEN_BYTE_ARRAY, 12, ::arrow::fixed_size_binary(12)}, @@ -724,6 +727,87 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } +Status ArrowSchemaToParquetMetadata(std::shared_ptr<::arrow::Schema>& arrow_schema, + std::shared_ptr& metadata) { + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr serialized, + ::arrow::ipc::SerializeSchema(*arrow_schema, ::arrow::default_memory_pool())); + std::string schema_as_string = serialized->ToString(); + std::string schema_base64 = ::arrow::util::base64_encode(schema_as_string); + metadata = ::arrow::key_value_metadata({"ARROW:schema"}, {schema_base64}); + return Status::OK(); +} + +TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { + std::vector parquet_fields; + parquet_fields.push_back(PrimitiveNode::Make( + "json_1", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::JSON)); + parquet_fields.push_back(PrimitiveNode::Make( + "json_2", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::JSON)); + + { + // Parquet file does not contain Arrow schema. + // By default, both fields should be treated as utf8() fields in Arrow. + auto arrow_schema = ::arrow::schema( + {::arrow::field("json_1", UTF8, true), ::arrow::field("json_2", UTF8, true)}); + std::shared_ptr metadata{}; + ASSERT_OK(ConvertSchema(parquet_fields, metadata)); + CheckFlatSchema(arrow_schema); + } + + { + // Parquet file does not contain Arrow schema. + // If Arrow extensions are enabled, both fields should be treated as json() extension + // fields. + ArrowReaderProperties props; + props.set_arrow_extensions_enabled(true); + auto arrow_schema = ::arrow::schema( + {::arrow::field("json_1", ::arrow::extension::json(), true), + ::arrow::field("json_2", ::arrow::extension::json(::arrow::large_utf8()), + true)}); + std::shared_ptr metadata{}; + ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); + CheckFlatSchema(arrow_schema); + } + + { + // Parquet file contains Arrow schema. + // Both json_1 and json_2 should be returned as a json() field + // even though extensions are not enabled. + ArrowReaderProperties props; + props.set_arrow_extensions_enabled(false); + std::shared_ptr field_metadata = + ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); + auto arrow_schema = ::arrow::schema( + {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), + ::arrow::field("json_2", ::arrow::extension::json(::arrow::large_utf8()), + true)}); + + std::shared_ptr metadata; + ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); + ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); + CheckFlatSchema(arrow_schema, true /* check_metadata */); + } + + { + // Parquet file contains Arrow schema. Extensions are enabled. + // Both json_1 and json_2 should be returned as a json() field + ArrowReaderProperties props; + props.set_arrow_extensions_enabled(true); + std::shared_ptr field_metadata = + ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); + auto arrow_schema = ::arrow::schema( + {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), + ::arrow::field("json_2", ::arrow::extension::json(::arrow::large_utf8()), + true)}); + + std::shared_ptr metadata; + ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); + ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); + CheckFlatSchema(arrow_schema, true /* check_metadata */); + } +} + class TestConvertArrowSchema : public ::testing::Test { public: virtual void SetUp() {} diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index ec3890a41f442..1623d80dcb0e4 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -21,6 +21,7 @@ #include #include +#include "arrow/extension/json.h" #include "arrow/extension_type.h" #include "arrow/io/memory.h" #include "arrow/ipc/api.h" @@ -427,6 +428,13 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, } case ArrowTypeId::EXTENSION: { auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(field->type()); + // Built-in JSON extension is handled differently. + if (ext_type->extension_name() == std::string("arrow.json")) { + // Set physical and logical types and instantiate primitive node. + type = ParquetType::BYTE_ARRAY; + logical_type = LogicalType::JSON(); + break; + } std::shared_ptr<::arrow::Field> storage_field = ::arrow::field( name, ext_type->storage_type(), field->nullable(), field->metadata()); return FieldToNode(name, storage_field, properties, arrow_properties, out); @@ -438,7 +446,7 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, } default: { - // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR + // TODO: DENSE_UNION, SPARE_UNION, DECIMAL_TEXT, VARCHAR return Status::NotImplemented( "Unhandled type for Arrow to Parquet schema conversion: ", field->type()->ToString()); @@ -476,9 +484,8 @@ bool IsDictionaryReadSupported(const ArrowType& type) { ::arrow::Result> GetTypeForNode( int column_index, const schema::PrimitiveNode& primitive_node, SchemaTreeContext* ctx) { - ASSIGN_OR_RAISE( - std::shared_ptr storage_type, - GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit())); + ASSIGN_OR_RAISE(std::shared_ptr storage_type, + GetArrowType(primitive_node, ctx->properties)); if (ctx->properties.read_dictionary(column_index) && IsDictionaryReadSupported(*storage_type)) { return ::arrow::dictionary(::arrow::int32(), storage_type); @@ -984,18 +991,35 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer bool modified = false; auto& origin_type = origin_field.type(); + const auto& inferred_type = inferred->field->type(); if (origin_type->id() == ::arrow::Type::EXTENSION) { const auto& ex_type = checked_cast(*origin_type); - auto origin_storage_field = origin_field.WithType(ex_type.storage_type()); + if (inferred_type->id() != ::arrow::Type::EXTENSION && + ex_type.extension_name() == std::string("arrow.json") && + (inferred_type->id() == ::arrow::Type::STRING || + inferred_type->id() == ::arrow::Type::LARGE_STRING || + inferred_type->id() == ::arrow::Type::STRING_VIEW)) { + // Schema mismatch. + // + // Arrow extensions are DISABLED in Parquet. + // origin_type is ::arrow::extension::json() + // inferred_type is ::arrow::utf8() + // + // Origin type is restored as Arrow should be considered the source of truth. + inferred->field = inferred->field->WithType(origin_type); + RETURN_NOT_OK(ApplyOriginalStorageMetadata(origin_field, inferred)); + } else { + auto origin_storage_field = origin_field.WithType(ex_type.storage_type()); - // Apply metadata recursively to storage type - RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred)); + // Apply metadata recursively to storage type + RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred)); - // Restore extension type, if the storage type is the same as inferred - // from the Parquet type - if (ex_type.storage_type()->Equals(*inferred->field->type())) { - inferred->field = inferred->field->WithType(origin_type); + // Restore extension type, if the storage type is the same as inferred + // from the Parquet type + if (ex_type.storage_type()->Equals(*inferred->field->type())) { + inferred->field = inferred->field->WithType(origin_type); + } } modified = true; } else { diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index a8e2a95b9b97d..261a00940654d 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -17,8 +17,11 @@ #include "parquet/arrow/schema_internal.h" +#include "arrow/extension/json.h" #include "arrow/type.h" +#include "parquet/properties.h" + using ArrowType = ::arrow::DataType; using ArrowTypeId = ::arrow::Type; using ParquetType = parquet::Type; @@ -107,7 +110,8 @@ Result> MakeArrowTimestamp(const LogicalType& logical } } -Result> FromByteArray(const LogicalType& logical_type) { +Result> FromByteArray( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::STRING: return ::arrow::utf8(); @@ -115,9 +119,15 @@ Result> FromByteArray(const LogicalType& logical_type return MakeArrowDecimal(logical_type); case LogicalType::Type::NONE: case LogicalType::Type::ENUM: - case LogicalType::Type::JSON: case LogicalType::Type::BSON: return ::arrow::binary(); + case LogicalType::Type::JSON: + if (reader_properties.get_arrow_extensions_enabled()) { + return ::arrow::extension::json(::arrow::utf8()); + } + // When the original Arrow schema isn't stored and Arrow extensions are disabled, + // LogicalType::JSON is read as utf8(). + return ::arrow::utf8(); default: return Status::NotImplemented("Unhandled logical logical_type ", logical_type.ToString(), " for binary array"); @@ -180,7 +190,7 @@ Result> FromInt64(const LogicalType& logical_type) { Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, - const ::arrow::TimeUnit::type int96_arrow_time_unit) { + const ArrowReaderProperties& reader_properties) { if (logical_type.is_invalid() || logical_type.is_null()) { return ::arrow::null(); } @@ -193,13 +203,13 @@ Result> GetArrowType( case ParquetType::INT64: return FromInt64(logical_type); case ParquetType::INT96: - return ::arrow::timestamp(int96_arrow_time_unit); + return ::arrow::timestamp(reader_properties.coerce_int96_timestamp_unit()); case ParquetType::FLOAT: return ::arrow::float32(); case ParquetType::DOUBLE: return ::arrow::float64(); case ParquetType::BYTE_ARRAY: - return FromByteArray(logical_type); + return FromByteArray(logical_type, reader_properties); case ParquetType::FIXED_LEN_BYTE_ARRAY: return FromFLBA(logical_type, type_length); default: { @@ -212,9 +222,9 @@ Result> GetArrowType( Result> GetArrowType( const schema::PrimitiveNode& primitive, - const ::arrow::TimeUnit::type int96_arrow_time_unit) { + const ArrowReaderProperties& reader_properties) { return GetArrowType(primitive.physical_type(), *primitive.logical_type(), - primitive.type_length(), int96_arrow_time_unit); + primitive.type_length(), reader_properties); } } // namespace parquet::arrow diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index f56ba0958ae2d..58828f85ab8e3 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -18,6 +18,7 @@ #pragma once #include "arrow/result.h" +#include "arrow/type_fwd.h" #include "parquet/schema.h" namespace arrow { @@ -28,7 +29,8 @@ namespace parquet::arrow { using ::arrow::Result; -Result> FromByteArray(const LogicalType& logical_type); +Result> FromByteArray(const LogicalType& logical_type, + bool use_known_arrow_extensions); Result> FromFLBA(const LogicalType& logical_type, int32_t physical_length); Result> FromInt32(const LogicalType& logical_type); @@ -36,10 +38,10 @@ Result> FromInt64(const LogicalType& logical_ Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, - ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); + const ArrowReaderProperties& reader_properties); Result> GetArrowType( const schema::PrimitiveNode& primitive, - ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); + const ArrowReaderProperties& reader_properties); } // namespace parquet::arrow diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 4d3acb491e390..7f2e371df66d7 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -870,7 +870,8 @@ class PARQUET_EXPORT ArrowReaderProperties { batch_size_(kArrowDefaultBatchSize), pre_buffer_(true), cache_options_(::arrow::io::CacheOptions::LazyDefaults()), - coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO) {} + coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO), + arrow_extensions_enabled_(false) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// @@ -941,6 +942,18 @@ class PARQUET_EXPORT ArrowReaderProperties { return coerce_int96_timestamp_unit_; } + /// Enable Parquet-supported Arrow extension types. + /// + /// When enabled, Parquet logical types will be mapped to their corresponding Arrow + /// extension types at read time, if such exist. Currently only arrow::extension::json() + /// extension type is supported. Columns whose LogicalType is JSON will be interpreted + /// as arrow::extension::json(), with storage type inferred from the serialized Arrow + /// schema if present, or `utf8` by default. + void set_arrow_extensions_enabled(bool extensions_enabled) { + arrow_extensions_enabled_ = extensions_enabled; + } + bool get_arrow_extensions_enabled() const { return arrow_extensions_enabled_; } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -949,6 +962,7 @@ class PARQUET_EXPORT ArrowReaderProperties { ::arrow::io::IOContext io_context_; ::arrow::io::CacheOptions cache_options_; ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; + bool arrow_extensions_enabled_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties diff --git a/docs/source/status.rst b/docs/source/status.rst index b685d4bbf8add..98374164d7ae0 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -119,7 +119,7 @@ Data Types +-----------------------+-------+-------+-------+------------+-------+-------+-------+-------+ | Variable shape tensor | | | | | | | | | +-----------------------+-------+-------+-------+------------+-------+-------+-------+-------+ -| JSON | | | ✓ | | | | | | +| JSON | ✓ | | ✓ | | | | | | +-----------------------+-------+-------+-------+------------+-------+-------+-------+-------+ | UUID | ✓ | | ✓ | | | | | | +-----------------------+-------+-------+-------+------------+-------+-------+-------+-------+