From 56bf62040e7ce5de1f348ace0bc1f041a5ad04e2 Mon Sep 17 00:00:00 2001 From: Rob Marrowstone <99764876+rmarrowstone@users.noreply.github.com> Date: Wed, 18 Dec 2024 17:12:06 -0800 Subject: [PATCH] Minor Fixups This commit fixes up a few things I noticed when previewing the PR to Trino. * Column/Field name casing should be preserved when writing * Some missing operational/metrics calls in IonPageSource * Throw clearer Exception for errors in IonFileWriter * Move some tests from TestHiveFileFormats to IonPageSourceSmokeTest * Add test for Timestamp Encoding --- .../hive/formats/ion/IonEncoderFactory.java | 8 +-- .../trino/hive/formats/ion/TestIonFormat.java | 21 ++++++ .../trino/plugin/hive/ion/IonFileWriter.java | 5 +- .../plugin/hive/ion/IonFileWriterFactory.java | 2 - .../trino/plugin/hive/ion/IonPageSource.java | 10 ++- .../plugin/hive/TestHiveFileFormats.java | 68 ++----------------- .../hive/ion/IonPageSourceSmokeTest.java | 10 +++ 7 files changed, 50 insertions(+), 74 deletions(-) diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoderFactory.java index bcb8e469399d..0a75f2ca3504 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoderFactory.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoderFactory.java @@ -51,7 +51,6 @@ import java.time.ZoneId; import java.util.Date; import java.util.List; -import java.util.Locale; import java.util.Optional; import java.util.function.IntFunction; @@ -64,7 +63,7 @@ private IonEncoderFactory() {} public static IonEncoder buildEncoder(List columns) { return RowEncoder.forFields(columns.stream() - .map(c -> new RowType.Field(Optional.of(c.name().toLowerCase(Locale.ROOT)), c.type())) + .map(c -> new RowType.Field(Optional.of(c.name()), c.type())) .toList()); } @@ -89,8 +88,7 @@ private static BlockEncoder encoderForType(Type type) case DecimalType t -> decimalEncoder(t); case DateType _ -> dateEncoder; case TimestampType t -> timestampEncoder(t); - case MapType t -> new MapEncoder(t, t.getKeyType(), - encoderForType(t.getValueType())); + case MapType t -> new MapEncoder(t, t.getKeyType(), encoderForType(t.getValueType())); case RowType t -> RowEncoder.forFields(t.getFields()); case ArrayType t -> new ArrayEncoder(wrapEncoder(encoderForType(t.getElementType()))); default -> throw new IllegalArgumentException(String.format("Unsupported type: %s", type)); @@ -119,7 +117,7 @@ private static RowEncoder forFields(List fields) ImmutableList.Builder fieldEncodersBuilder = ImmutableList.builder(); for (RowType.Field field : fields) { - fieldNamesBuilder.add(field.getName().get().toLowerCase(Locale.ROOT)); + fieldNamesBuilder.add(field.getName().get()); fieldEncodersBuilder.add(wrapEncoder(encoderForType(field.getType()))); } diff --git a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/ion/TestIonFormat.java b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/ion/TestIonFormat.java index e946c3ee23f1..bb9b3539e566 100644 --- a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/ion/TestIonFormat.java +++ b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/ion/TestIonFormat.java @@ -435,6 +435,27 @@ public void testEncode() assertIonEquivalence(TEST_COLUMNS, page, ionText); } + @Test + public void testEncodeTimestamp() + throws IOException + { + List timestampColumn = List.of(new Column("my_ts", TimestampType.TIMESTAMP_NANOS, 0)); + Page page = toPage(timestampColumn, List.of( + toSqlTimestamp(TimestampType.TIMESTAMP_NANOS, LocalDateTime.of(2024, 11, 23, 1, 23, 45, 666777888)))); + assertIonEquivalence(timestampColumn, page, "{ my_ts: 2024-11-23T01:23:45.666777888Z }"); + } + + @Test + public void testEncodeMixedCaseColumn() + throws IOException + { + List casedColumn = List.of( + new Column("TheAnswer", INTEGER, 0)); + + Page page = toPage(casedColumn, List.of(42)); + assertIonEquivalence(casedColumn, page, "{ TheAnswer: 42 }"); + } + @Test public void testEncodeWithNullField() throws IOException diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriter.java index 89d0bd408762..693c41238138 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriter.java @@ -33,6 +33,7 @@ import java.util.function.LongSupplier; import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; public class IonFileWriter implements FileWriter @@ -106,7 +107,7 @@ public void rollback() writer.close(); } catch (IOException e) { - throw new RuntimeException(e); + throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", e); } } @@ -123,7 +124,7 @@ public void appendRows(Page page) pageEncoder.encode(writer, page); } catch (IOException e) { - throw new RuntimeException(e); + throw new TrinoException(HIVE_WRITER_DATA_ERROR, e); } } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriterFactory.java index 5a4f82354aaa..83acc5162327 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriterFactory.java @@ -85,8 +85,6 @@ public Optional createFileWriter( Closeable rollbackAction = () -> fileSystem.deleteFile(location); - // we take the column names from the schema, not what was input - // this is what the LineWriterFactory does, I don't understand why List fileColumnNames = getColumnNames(schema); List fileColumnTypes = getColumnTypes(schema).stream() .map(hiveType -> getType(hiveType, typeManager, getTimestampPrecision(session))) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSource.java index d20a5cc887ff..5fbc9549429a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSource.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive.ion; +import com.amazon.ion.IonBufferConfiguration; import com.amazon.ion.IonReader; import com.amazon.ion.IonType; import io.trino.hive.formats.ion.IonDecoder; @@ -24,9 +25,13 @@ import java.util.OptionalLong; import java.util.function.LongSupplier; +import static io.airlift.slice.SizeOf.instanceSize; + public class IonPageSource implements ConnectorPageSource { + private static final int INSTANCE_SIZE = instanceSize(IonPageSource.class); + private final IonReader ionReader; private final PageBuilder pageBuilder; private final IonDecoder decoder; @@ -86,7 +91,10 @@ public Page getNextPage() @Override public long getMemoryUsage() { - return 4096; + // we don't have the ability to ask an IonReader how many bytes it has buffered + // it will buffer as much as is needed for each top-level-value. + int assumedIonBufferSize = IonBufferConfiguration.DEFAULT.getInitialBufferSize() * 4; + return INSTANCE_SIZE + assumedIonBufferSize + pageBuilder.getRetainedSizeInBytes(); } @Override diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java index 235c725beb30..6c8a918ce360 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java @@ -139,7 +139,6 @@ import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; -import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR; import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings; import static io.trino.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION; import static io.trino.plugin.hive.HiveStorageFormat.AVRO; @@ -157,8 +156,6 @@ import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; import static io.trino.plugin.hive.HiveTestUtils.mapType; import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; -import static io.trino.plugin.hive.ion.IonWriterOptions.ION_ENCODING_PROPERTY; -import static io.trino.plugin.hive.ion.IonWriterOptions.TEXT_ENCODING; import static io.trino.plugin.hive.util.HiveTypeTranslator.toHiveType; import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS; import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMN_TYPES; @@ -234,7 +231,6 @@ public final class TestHiveFileFormats private static final FileFormatDataSourceStats STATS = new FileFormatDataSourceStats(); private static final ConnectorSession PARQUET_SESSION = getHiveSession(createParquetHiveConfig(false)); private static final ConnectorSession PARQUET_SESSION_USE_NAME = getHiveSession(createParquetHiveConfig(true)); - private static final String ERROR_ENCODING = "error_encoding"; @DataProvider(name = "rowCount") public static Object[][] rowCountProvider() @@ -377,7 +373,8 @@ public void testIonWithBinaryEncoding(int rowCount, long fileSizePadding) throws Exception { List testColumns = TEST_COLUMNS.stream() - // todo: add support for maps to trino impl + // even though maps with text keys work with the native trino impl + // there is an error when testing against the hive serde .filter(tc -> !(tc.type instanceof MapType)) .collect(toList()); @@ -394,54 +391,6 @@ public void testIonWithBinaryEncoding(int rowCount, long fileSizePadding) .isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig)); } - @Test(dataProvider = "validRowAndFileSizePadding") - public void testIonWithTextEncoding(int rowCount, long fileSizePadding) - throws Exception - { - List testColumns = TEST_COLUMNS.stream() - // todo: add support for maps to trino impl - .filter(tc -> !(tc.type instanceof MapType)) - .collect(toList()); - - HiveConfig hiveConfig = new HiveConfig(); - // enable Ion native trino integration for testing while the implementation is in progress - // TODO: In future this flag should change to `true` as default and then the following statement can be removed. - hiveConfig.setIonNativeTrinoEnabled(true); - - assertThatFileFormat(ION) - .withColumns(testColumns) - .withRowsCount(rowCount) - .withFileSizePadding(fileSizePadding) - .withTableProperties(ImmutableMap.of(ION_ENCODING_PROPERTY, TEXT_ENCODING)) - .withFileWriterFactory(fileSystemFactory -> new IonFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER)) - .isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig)); - } - - @Test(dataProvider = "validRowAndFileSizePadding") - public void testInvalidIonEncoding(int rowCount, long fileSizePadding) - throws Exception - { - List testColumns = TEST_COLUMNS.stream() - // todo: add support for maps to trino impl - .filter(tc -> !(tc.type instanceof MapType)) - .collect(toList()); - - HiveConfig hiveConfig = new HiveConfig(); - // enable Ion native trino integration for testing while the implementation is in progress - // TODO: In future this flag should change to `true` as default and then the following statement can be removed. - hiveConfig.setIonNativeTrinoEnabled(true); - - assertTrinoExceptionThrownBy(() -> assertThatFileFormat(ION) - .withColumns(testColumns) - .withRowsCount(rowCount) - .withFileSizePadding(fileSizePadding) - .withTableProperties(ImmutableMap.of(ION_ENCODING_PROPERTY, ERROR_ENCODING)) - .withFileWriterFactory(fileSystemFactory -> new IonFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER)) - .isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig))) - .hasErrorCode(HIVE_WRITER_OPEN_ERROR) - .hasMessage("Error creating Ion Output"); - } - @Test(dataProvider = "validRowAndFileSizePadding") public void testRcTextPageSource(int rowCount, long fileSizePadding) throws Exception @@ -1275,7 +1224,6 @@ private static class FileFormatAssertion private boolean skipGenericWrite; private HiveFileWriterFactory fileWriterFactory; private long fileSizePadding; - private Map customTableProperties = ImmutableMap.of(); private final TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); @@ -1333,12 +1281,6 @@ public FileFormatAssertion withRowsCount(int rowsCount) return this; } - public FileFormatAssertion withTableProperties(Map tableProperties) - { - this.customTableProperties = requireNonNull(tableProperties, "customTableProperties is null"); - return this; - } - public FileFormatAssertion withSession(ConnectorSession session) { this.session = requireNonNull(session, "session is null"); @@ -1397,7 +1339,7 @@ private void assertRead(HivePageSourceFactory pageSourceFactory) if (fileWriterFactory == null) { continue; } - createTestFileTrino(location, storageFormat, compressionCodec, writeColumns, session, rowsCount, fileWriterFactory, customTableProperties); + createTestFileTrino(location, storageFormat, compressionCodec, writeColumns, session, rowsCount, fileWriterFactory); } else { if (skipGenericWrite) { @@ -1427,8 +1369,7 @@ private static void createTestFileTrino( List testColumns, ConnectorSession session, int numRows, - HiveFileWriterFactory fileWriterFactory, - Map customTableProperties) + HiveFileWriterFactory fileWriterFactory) { // filter out partition keys, which are not written to the file testColumns = testColumns.stream() @@ -1453,7 +1394,6 @@ private static void createTestFileTrino( Map tableProperties = ImmutableMap.builder() .put(LIST_COLUMNS, testColumns.stream().map(TestColumn::name).collect(Collectors.joining(","))) .put(LIST_COLUMN_TYPES, testColumns.stream().map(TestColumn::type).map(HiveTypeTranslator::toHiveType).map(HiveType::toString).collect(Collectors.joining(","))) - .putAll(customTableProperties) .buildOrThrow(); Optional fileWriter = fileWriterFactory.createFileWriter( diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/ion/IonPageSourceSmokeTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/ion/IonPageSourceSmokeTest.java index 834f64fd81cb..58ba7e743e13 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/ion/IonPageSourceSmokeTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/ion/IonPageSourceSmokeTest.java @@ -251,6 +251,16 @@ public void testBinaryEncoding() assertEncoding(tableColumns, BINARY_ENCODING); } + @Test + public void testBadEncodingName() + throws IOException + { + TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS) + .withEncoding("unknown_encoding_name"); + + Assertions.assertThrows(TrinoException.class, fixture::getFileWriter); + } + private void assertEncoding(List tableColumns, String encoding) throws IOException