From 9383c3e36603c281a8800466ff236c42658e9c4c Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 30 Aug 2024 13:43:27 -0500 Subject: [PATCH] fix: Use parquet converted type only if logical type not set (#5997) If a column in a parquet file has both logical type and converted type set, we should prioritize logical type over converted type. --- extensions/iceberg/s3/build.gradle | 24 +++++------ .../iceberg/util/IcebergLocalStackTest.java | 11 ++--- .../iceberg/util/IcebergMinIOTest.java | 11 ++--- .../iceberg/util/IcebergToolsTest.java | 40 +++++++++++-------- .../parquet/base/ParquetFileReader.java | 28 +++++-------- 5 files changed, 52 insertions(+), 62 deletions(-) diff --git a/extensions/iceberg/s3/build.gradle b/extensions/iceberg/s3/build.gradle index bde1e84bc7f..c580989fcb2 100644 --- a/extensions/iceberg/s3/build.gradle +++ b/extensions/iceberg/s3/build.gradle @@ -26,6 +26,10 @@ dependencies { runtimeOnly libs.awssdk.sts runtimeOnly libs.awssdk.glue + testImplementation libs.junit4 + + testImplementation project(':engine-test-utils') + testImplementation libs.testcontainers testImplementation libs.testcontainers.junit.jupiter testImplementation libs.testcontainers.localstack @@ -39,20 +43,10 @@ dependencies { testRuntimeOnly libs.slf4j.simple } -test { - useJUnitPlatform { - excludeTags("testcontainers") - } -} +TestTools.addEngineOutOfBandTest(project) -tasks.register('testOutOfBand', Test) { - useJUnitPlatform { - includeTags("testcontainers") - } +testOutOfBand.dependsOn Docker.registryTask(project, 'localstack') +testOutOfBand.systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack') - dependsOn Docker.registryTask(project, 'localstack') - systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack') - - dependsOn Docker.registryTask(project, 'minio') - systemProperty 'testcontainers.minio.image', Docker.localImageName('minio') -} +testOutOfBand.dependsOn Docker.registryTask(project, 'minio') +testOutOfBand.systemProperty 'testcontainers.minio.image', Docker.localImageName('minio') \ No newline at end of file diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java index 578e358985e..de15eceaa04 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java @@ -3,24 +3,21 @@ // package io.deephaven.iceberg.util; - import io.deephaven.extensions.s3.S3Instructions.Builder; import io.deephaven.extensions.s3.testlib.SingletonContainers; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; +import org.junit.BeforeClass; import software.amazon.awssdk.services.s3.S3AsyncClient; -@Tag("testcontainers") public class IcebergLocalStackTest extends IcebergToolsTest { - @BeforeAll - static void initContainer() { + @BeforeClass + public static void initContainer() { // ensure container is started so container startup time isn't associated with a specific test SingletonContainers.LocalStack.init(); } @Override - public Builder s3Instructions(Builder builder) { + public Builder s3Instructions(final Builder builder) { return SingletonContainers.LocalStack.s3Instructions(builder); } diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java index 804d2d01746..0e789a64df2 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java @@ -3,20 +3,17 @@ // package io.deephaven.iceberg.util; - import io.deephaven.extensions.s3.S3Instructions.Builder; import io.deephaven.extensions.s3.testlib.SingletonContainers; import io.deephaven.stats.util.OSUtil; import org.junit.jupiter.api.Assumptions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; +import org.junit.BeforeClass; import software.amazon.awssdk.services.s3.S3AsyncClient; -@Tag("testcontainers") public class IcebergMinIOTest extends IcebergToolsTest { - @BeforeAll - static void initContainer() { + @BeforeClass + public static void initContainer() { // TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()"); // ensure container is started so container startup time isn't associated with a specific test @@ -24,7 +21,7 @@ static void initContainer() { } @Override - public Builder s3Instructions(Builder builder) { + public Builder s3Instructions(final Builder builder) { return SingletonContainers.MinIO.s3Instructions(builder); } diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java index 44a17942cdf..2218e9e3556 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java @@ -9,17 +9,21 @@ import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.extensions.s3.S3Instructions; import io.deephaven.iceberg.TestCatalog.IcebergTestCatalog; import io.deephaven.iceberg.TestCatalog.IcebergTestFileIO; +import io.deephaven.test.types.OutOfBandTest; import org.apache.iceberg.Snapshot; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; @@ -47,6 +51,7 @@ import static io.deephaven.iceberg.util.IcebergCatalogAdapter.SNAPSHOT_DEFINITION; import static io.deephaven.iceberg.util.IcebergCatalogAdapter.TABLES_DEFINITION; +@Category(OutOfBandTest.class) public abstract class IcebergToolsTest { private static final TableDefinition SALES_SINGLE_DEFINITION = TableDefinition.of( @@ -110,8 +115,11 @@ public abstract class IcebergToolsTest { private Catalog resourceCatalog; private FileIO resourceFileIO; - @BeforeEach - void setUp() throws ExecutionException, InterruptedException { + @Rule + public final EngineCleanup framework = new EngineCleanup(); + + @Before + public void setUp() throws ExecutionException, InterruptedException { bucket = "warehouse"; asyncClient = s3AsyncClient(); asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(); @@ -129,6 +137,16 @@ void setUp() throws ExecutionException, InterruptedException { .build(); } + @After + public void tearDown() throws ExecutionException, InterruptedException { + for (String key : keys) { + asyncClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).get(); + } + keys.clear(); + asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(); + asyncClient.close(); + } + private void uploadParquetFiles(final File root, final String prefixToRemove) throws ExecutionException, InterruptedException, TimeoutException { for (final File file : root.listFiles()) { @@ -175,16 +193,6 @@ private void uploadSalesRenamed() throws ExecutionException, InterruptedExceptio warehousePath); } - @AfterEach - public void tearDown() throws ExecutionException, InterruptedException { - for (String key : keys) { - asyncClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).get(); - } - keys.clear(); - asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(); - asyncClient.close(); - } - @Test public void testListNamespaces() { final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO); @@ -805,7 +813,7 @@ public void testOpenAllTypesTable() throws ExecutionException, InterruptedExcept final TableIdentifier tableId = TableIdentifier.of(ns, "all_types"); // Verify we retrieved all the rows. - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions).select(); Assert.eq(table.size(), "table.size()", 10, "10 rows in the table"); Assert.equals(table.getDefinition(), "table.getDefinition()", ALL_TYPES_DEF); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 22df95783a0..da085646a41 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -297,20 +297,13 @@ private static void buildChildren(Types.GroupBuilder builder, IteratorReference for conversions + */ + private static LogicalTypeAnnotation getLogicalTypeFromConvertedType( final ConvertedType convertedType, final SchemaElement schemaElement) throws ParquetFileReaderException { switch (convertedType) { @@ -429,17 +428,12 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation( case DATE: return LogicalTypeAnnotation.dateType(); case TIME_MILLIS: - // isAdjustedToUTC parameter is ignored while reading Parquet TIME type, so disregard it here return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); case TIME_MICROS: return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS); case TIMESTAMP_MILLIS: - // TIMESTAMP_MILLIS is always adjusted to UTC - // ref: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); case TIMESTAMP_MICROS: - // TIMESTAMP_MICROS is always adjusted to UTC - // ref: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS); case INTERVAL: return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();