diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 09636155c5aa0..edd40b5f8b101 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -896,6 +896,21 @@ std::optional getTimestampTimeZone( return std::nullopt; } +std::optional getParquetDataPageVersion( + const config::ConfigBase& config, + const char* configKey) { + if (const auto version = config.get(configKey)) { + if (version == "PARQUET_1_0") { + return "V1"; + } else if (version == "PARQUET_2_0") { + return "V2"; + } else { + VELOX_FAIL("Unsupported parquet datapage version {}", version.value()); + } + } + return std::nullopt; +} + void updateParquetWriterOptions( const std::shared_ptr& hiveConfig, const config::ConfigBase* sessionProperties, @@ -931,6 +946,18 @@ void updateParquetWriterOptions( *hiveConfig->config(), core::QueryConfig::kSessionTimezone); } + parquetWriterOptions->parquetDataPageVersion = + getParquetDataPageVersion( + *sessionProperties, + parquet::WriterOptions::kParquetSessionDataPageVersion) + .has_value() + ? getParquetDataPageVersion( + *sessionProperties, + parquet::WriterOptions::kParquetSessionDataPageVersion) + : getParquetDataPageVersion( + *hiveConfig->config(), + parquet::WriterOptions::kParquetSessionDataPageVersion); + writerOptions = std::move(parquetWriterOptions); } #endif diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index 87a7c6f0421d1..14abe185da1d0 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -22,7 +22,9 @@ #include "velox/connectors/hive/HiveConnector.h" // @manual #include "velox/core/QueryCtx.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual +#include "velox/dwio/parquet/reader/PageReader.h" #include "velox/dwio/parquet/tests/ParquetTestBase.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/Cursor.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -74,6 +76,26 @@ class ParquetWriterTest : public ParquetTestBase { opts); }; + facebook::velox::parquet::thrift::PageType::type getDataPageVersion( + const dwio::common::MemorySink* sinkPtr, + const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) { + std::string_view sinkData(sinkPtr->data(), sinkPtr->size()); + auto readFile = std::make_shared(sinkData); + auto file = std::make_shared(std::move(readFile)); + auto inputStream = std::make_unique( + std::move(file), + colChunkPtr.dataPageOffset(), + 150, + *leafPool_, + LogType::TEST); + auto pageReader = std::make_unique( + std::move(inputStream), + *leafPool_, + colChunkPtr.compression(), + colChunkPtr.totalCompressedSize()); + return pageReader->readPageHeader().type; + }; + inline static const std::string kHiveConnectorId = "test-hive"; }; @@ -143,6 +165,43 @@ TEST_F(ParquetWriterTest, compression) { assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_); }; +TEST_F(ParquetWriterTest, datapageVersion) { + auto schema = ROW({"c0"}, {INTEGER()}); + const int64_t kRows = 1; + const auto data = makeRowVector({ + makeFlatVector(kRows, [](auto row) { return 987; }), + }); + + // Set parquet datapage version and write data - then read to ensure the + // property took effect. + const auto testDataPageVersion = + [&](std::optional dataPageVersion) { + // Create an in-memory writer. + auto sink = std::make_unique( + 200 * 1024 * 1024, + dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto sinkPtr = sink.get(); + facebook::velox::parquet::WriterOptions writerOptions; + writerOptions.memoryPool = leafPool_.get(); + writerOptions.parquetDataPageVersion = dataPageVersion; + + auto writer = std::make_unique( + std::move(sink), writerOptions, rootPool_, schema); + writer->write(data); + writer->close(); + + dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + auto reader = createReaderInMemory(*sinkPtr, readerOptions); + auto readDataPageVersion = getDataPageVersion( + sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0)); + return readDataPageVersion; + }; + + ASSERT_EQ(testDataPageVersion("V1"), thrift::PageType::type::DATA_PAGE); + + ASSERT_EQ(testDataPageVersion("V2"), thrift::PageType::type::DATA_PAGE_V2); +}; + DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { SCOPED_TESTVALUE_SET( "facebook::velox::parquet::Writer::write", diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index bce6919a7cc63..8c3884c5e6757 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -147,6 +147,11 @@ std::shared_ptr getArrowParquetWriterOptions( static_cast(flushPolicy->rowsInRowGroup())); properties = properties->codec_options(options.codecOptions); properties = properties->enable_store_decimal_as_integer(); + + if (options.parquetDataPageVersion == "V2") { + properties = properties->data_page_version( + facebook::velox::parquet::arrow::ParquetDataPageVersion::V2); + } return properties->build(); } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index ce98acb64fab5..a0c415d638ceb 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -24,6 +24,7 @@ #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" @@ -108,6 +109,7 @@ struct WriterOptions : public dwio::common::WriterOptions { /// Timestamp time zone for Parquet write through Arrow bridge. std::optional parquetWriteTimestampTimeZone; bool writeInt96AsTimestamp = false; + std::optional parquetDataPageVersion = std::nullopt; // Parsing session and hive configs. @@ -117,6 +119,9 @@ struct WriterOptions : public dwio::common::WriterOptions { "hive.parquet.writer.timestamp_unit"; static constexpr const char* kParquetHiveConnectorWriteTimestampUnit = "hive.parquet.writer.timestamp-unit"; + /// Parquet datapage version to be used (V1 or V2). Defaults to V2. + static constexpr const char* kParquetSessionDataPageVersion = + "parquet_writer_version"; }; // Writes Velox vectors into a DataSink using Arrow Parquet writer.