From 28d3bb4c83a99f4eecaa0f1d696d2baef789e18e Mon Sep 17 00:00:00 2001 From: David Phillips Date: Sun, 23 Apr 2023 00:00:56 -0700 Subject: [PATCH] Remove usages of Hadoop Path for HivePageSourceFactory --- .../hive/GenericHiveRecordCursorProvider.java | 4 +++- .../HiveBucketValidationRecordCursor.java | 6 +++--- .../io/trino/plugin/hive/HivePageSource.java | 6 +++--- .../plugin/hive/HivePageSourceFactory.java | 4 ++-- .../plugin/hive/HivePageSourceProvider.java | 16 ++++++++-------- .../plugin/hive/HiveRecordCursorProvider.java | 4 ++-- .../plugin/hive/line/LinePageSource.java | 5 +++-- .../hive/line/LinePageSourceFactory.java | 12 +++++------- .../hive/orc/OrcDeleteDeltaPageSource.java | 12 ++++++------ .../plugin/hive/orc/OrcPageSourceFactory.java | 19 +++++++++---------- .../plugin/hive/orc/OriginalFilesUtils.java | 9 ++++----- .../parquet/ParquetPageSourceFactory.java | 6 ++---- .../hive/rcfile/RcFilePageSourceFactory.java | 10 ++++------ .../S3SelectRecordCursorProvider.java | 4 +++- .../io/trino/plugin/hive/util/HiveUtil.java | 4 ++-- .../plugin/hive/TestHiveFileFormats.java | 9 +++++---- .../hive/TestOrcPageSourceMemoryTracking.java | 8 +++++--- .../plugin/hive/TestOriginalFilesUtils.java | 13 ++++++------- .../hive/benchmark/AbstractFileFormat.java | 6 +++--- .../hive/orc/TestOrcPageSourceFactory.java | 3 +-- .../plugin/hive/orc/TestOrcPredicates.java | 5 +++-- .../plugin/hive/parquet/TestOnlyNulls.java | 4 ++-- .../hive/parquet/TestTimestampMicros.java | 4 ++-- .../TestS3SelectRecordCursorProvider.java | 4 ++-- 24 files changed, 88 insertions(+), 89 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursorProvider.java index 488ceb9d243f..4cbf60fe4990 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursorProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/GenericHiveRecordCursorProvider.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive; import io.airlift.units.DataSize; +import io.trino.filesystem.Location; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.util.HiveUtil; import io.trino.spi.TrinoException; @@ -64,7 +65,7 @@ public GenericHiveRecordCursorProvider(HdfsEnvironment hdfsEnvironment, DataSize public Optional createRecordCursor( Configuration configuration, ConnectorSession session, - Path path, + Location location, long start, long length, long fileSize, @@ -77,6 +78,7 @@ public Optional createRecordCursor( configuration.setInt(LineRecordReader.MAX_LINE_LENGTH, textMaxLineLengthBytes); // make sure the FileSystem is created with the proper Configuration object + Path path = new Path(location.toString()); try { this.hdfsEnvironment.getFileSystem(session.getIdentity(), path, configuration); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveBucketValidationRecordCursor.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveBucketValidationRecordCursor.java index 8dca3520d1eb..cb4a07b238a7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveBucketValidationRecordCursor.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveBucketValidationRecordCursor.java @@ -16,6 +16,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.VerifyException; import io.airlift.slice.Slice; +import io.trino.filesystem.Location; import io.trino.plugin.hive.type.TypeInfo; import io.trino.plugin.hive.util.ForwardingRecordCursor; import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion; @@ -24,7 +25,6 @@ import io.trino.spi.connector.RecordCursor; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; -import org.apache.hadoop.fs.Path; import java.util.List; @@ -39,7 +39,7 @@ public class HiveBucketValidationRecordCursor extends ForwardingRecordCursor { private final RecordCursor delegate; - private final Path path; + private final Location path; private final int[] bucketColumnIndices; private final List> javaTypeList; private final List typeInfoList; @@ -52,7 +52,7 @@ public class HiveBucketValidationRecordCursor private int validationCounter; public HiveBucketValidationRecordCursor( - Path path, + Location path, int[] bucketColumnIndices, List bucketColumnTypes, BucketingVersion bucketingVersion, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java index 2e0a42fc2ee2..0300c6d9b2a8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive; import com.google.common.collect.ImmutableList; +import io.trino.filesystem.Location; import io.trino.plugin.hive.HivePageSourceProvider.BucketAdaptation; import io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping; import io.trino.plugin.hive.coercions.CharCoercer; @@ -55,7 +56,6 @@ import io.trino.spi.type.TypeManager; import io.trino.spi.type.VarcharType; import it.unimi.dsi.fastutil.ints.IntArrayList; -import org.apache.hadoop.fs.Path; import javax.annotation.Nullable; @@ -604,7 +604,7 @@ public static class BucketValidator // validate every ~100 rows but using a prime number public static final int VALIDATION_STRIDE = 97; - private final Path path; + private final Location path; private final int[] bucketColumnIndices; private final List bucketColumnTypes; private final BucketingVersion bucketingVersion; @@ -612,7 +612,7 @@ public static class BucketValidator private final int expectedBucket; public BucketValidator( - Path path, + Location path, int[] bucketColumnIndices, List bucketColumnTypes, BucketingVersion bucketingVersion, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java index 768893c7b80e..8fc8ec8feb6e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceFactory.java @@ -13,11 +13,11 @@ */ package io.trino.plugin.hive; +import io.trino.filesystem.Location; import io.trino.plugin.hive.acid.AcidTransaction; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.predicate.TupleDomain; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import java.util.List; import java.util.Optional; @@ -29,7 +29,7 @@ public interface HivePageSourceFactory Optional createPageSource( Configuration configuration, ConnectorSession session, - Path path, + Location path, long start, long length, long estimatedFileSize, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java index 32db00886bcc..7d3a7c676f1b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSourceProvider.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.Location; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HivePageSource.BucketValidator; @@ -127,8 +128,7 @@ public ConnectorPageSource createPageSource( .map(HiveColumnHandle.class::cast) .collect(toList()); - Path path = new Path(hiveSplit.getPath()); - boolean originalFile = ORIGINAL_FILE_PATH_MATCHER.matcher(path.toString()).matches(); + boolean originalFile = ORIGINAL_FILE_PATH_MATCHER.matcher(hiveSplit.getPath()).matches(); List columnMappings = ColumnMapping.buildColumnMappings( hiveSplit.getPartitionName(), @@ -136,7 +136,7 @@ public ConnectorPageSource createPageSource( hiveColumns, hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()), hiveSplit.getTableToPartitionMapping(), - path, + hiveSplit.getPath(), hiveSplit.getTableBucketNumber(), hiveSplit.getEstimatedFileSize(), hiveSplit.getFileModifiedTime()); @@ -147,7 +147,7 @@ public ConnectorPageSource createPageSource( return new EmptyPageSource(); } - Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), path); + Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(hiveSplit.getPath())); TupleDomain simplifiedDynamicFilter = dynamicFilter .getCurrentPredicate() @@ -157,7 +157,7 @@ public ConnectorPageSource createPageSource( cursorProviders, configuration, session, - path, + Location.of(hiveSplit.getPath()), hiveSplit.getTableBucketNumber(), hiveSplit.getStart(), hiveSplit.getLength(), @@ -185,7 +185,7 @@ public static Optional createHivePageSource( Set cursorProviders, Configuration configuration, ConnectorSession session, - Path path, + Location path, OptionalInt tableBucketNumber, long start, long length, @@ -453,7 +453,7 @@ public static List buildColumnMappings( List columns, List requiredInterimColumns, TableToPartitionMapping tableToPartitionMapping, - Path path, + String path, OptionalInt bucketNumber, long estimatedFileSize, long fileModifiedTime) @@ -667,7 +667,7 @@ public int getBucketToKeep() } } - private static Optional createBucketValidator(Path path, Optional bucketValidation, OptionalInt bucketNumber, List columnMappings) + private static Optional createBucketValidator(Location path, Optional bucketValidation, OptionalInt bucketNumber, List columnMappings) { return bucketValidation.flatMap(validation -> { Map baseHiveColumnToBlockIndex = columnMappings.stream() diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveRecordCursorProvider.java index ca924cb27e48..aad75d221610 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveRecordCursorProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveRecordCursorProvider.java @@ -13,12 +13,12 @@ */ package io.trino.plugin.hive; +import io.trino.filesystem.Location; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.RecordCursor; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import java.util.List; import java.util.Optional; @@ -31,7 +31,7 @@ public interface HiveRecordCursorProvider Optional createRecordCursor( Configuration configuration, ConnectorSession session, - Path path, + Location path, long start, long length, long fileSize, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSource.java index f4afff0b37a1..5b3eb79d5f55 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSource.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive.line; +import io.trino.filesystem.Location; import io.trino.hive.formats.line.LineBuffer; import io.trino.hive.formats.line.LineDeserializer; import io.trino.hive.formats.line.LineReader; @@ -38,12 +39,12 @@ public class LinePageSource private final LineReader lineReader; private final LineDeserializer deserializer; private final LineBuffer lineBuffer; - private final String filePath; + private final Location filePath; private PageBuilder pageBuilder; private long completedPositions; - public LinePageSource(LineReader lineReader, LineDeserializer deserializer, LineBuffer lineBuffer, String filePath) + public LinePageSource(LineReader lineReader, LineDeserializer deserializer, LineBuffer lineBuffer, Location filePath) { this.lineReader = requireNonNull(lineReader, "lineReader is null"); this.deserializer = requireNonNull(deserializer, "deserializer is null"); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java index 3b335c5441ee..f59bd348a4ac 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/line/LinePageSourceFactory.java @@ -40,7 +40,6 @@ import io.trino.spi.connector.EmptyPageSource; import io.trino.spi.predicate.TupleDomain; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import java.io.InputStream; import java.util.List; @@ -92,7 +91,7 @@ protected LinePageSourceFactory( public Optional createPageSource( Configuration configuration, ConnectorSession session, - Path path, + Location path, long start, long length, long estimatedFileSize, @@ -142,9 +141,8 @@ public Optional createPageSource( } // buffer file if small - Location location = Location.of(path.toString()); TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session.getIdentity()); - TrinoInputFile inputFile = new MonitoredInputFile(stats, trinoFileSystem.newInputFile(location)); + TrinoInputFile inputFile = new MonitoredInputFile(stats, trinoFileSystem.newInputFile(path)); try { length = min(inputFile.length() - start, length); if (!inputFile.exists()) { @@ -153,7 +151,7 @@ public Optional createPageSource( if (estimatedFileSize < SMALL_FILE_SIZE.toBytes()) { try (InputStream inputStream = inputFile.newStream()) { byte[] data = inputStream.readAllBytes(); - inputFile = new MemoryInputFile(location, Slices.wrappedBuffer(data)); + inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data)); } } } @@ -171,7 +169,7 @@ public Optional createPageSource( try { LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount); - LinePageSource pageSource = new LinePageSource(lineReader, lineDeserializer, lineReaderFactory.createLineBuffer(), path.toString()); + LinePageSource pageSource = new LinePageSource(lineReader, lineDeserializer, lineReaderFactory.createLineBuffer(), path); return Optional.of(new ReaderPageSource(pageSource, readerProjections)); } catch (TrinoException e) { @@ -182,7 +180,7 @@ public Optional createPageSource( } } - private static String splitError(Throwable t, Path path, long start, long length) + private static String splitError(Throwable t, Location path, long start, long length) { return format("Error opening Hive split %s (offset=%s, length=%s): %s", path, start, length, t.getMessage()); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java index a4f80feae9aa..d44084457184 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive.orc; import com.google.common.collect.ImmutableList; +import io.trino.filesystem.Location; import io.trino.filesystem.TrinoInputFile; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.orc.NameBasedFieldMapper; @@ -29,7 +30,6 @@ import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorPageSource; -import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.UncheckedIOException; @@ -74,10 +74,10 @@ public static Optional createOrcDeleteDeltaPageSource( FileFormatDataSourceStats stats) { OrcDataSource orcDataSource; - String path = inputFile.location().toString(); + Location path = inputFile.location(); try { orcDataSource = new HdfsOrcDataSource( - new OrcDataSourceId(path), + new OrcDataSourceId(path.toString()), inputFile.length(), options, inputFile, @@ -112,7 +112,7 @@ public static Optional createOrcDeleteDeltaPageSource( } private OrcDeleteDeltaPageSource( - String path, + Location path, long fileSize, OrcReader reader, OrcDataSource orcDataSource, @@ -122,7 +122,7 @@ private OrcDeleteDeltaPageSource( this.stats = requireNonNull(stats, "stats is null"); this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null"); - verifyAcidSchema(reader, new Path(path)); + verifyAcidSchema(reader, path); Map acidColumns = uniqueIndex( reader.getRootColumn().getNestedColumns(), orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH)); @@ -211,7 +211,7 @@ public long getMemoryUsage() return memoryContext.getBytes(); } - private static String openError(Throwable t, String path) + private static String openError(Throwable t, Location path) { return format("Error opening Hive delete delta file %s: %s", path, t.getMessage()); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java index 155c0201081b..4e7a29a73103 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java @@ -53,7 +53,6 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.joda.time.DateTimeZone; import javax.inject.Inject; @@ -177,7 +176,7 @@ public static Properties stripUnnecessaryProperties(Properties schema) public Optional createPageSource( Configuration configuration, ConnectorSession session, - Path path, + Location path, long start, long length, long estimatedFileSize, @@ -234,7 +233,7 @@ public Optional createPageSource( private ConnectorPageSource createOrcPageSource( ConnectorSession session, - Path path, + Location path, long start, long length, long estimatedFileSize, @@ -261,7 +260,7 @@ private ConnectorPageSource createOrcPageSource( boolean originalFilesPresent = acidInfo.isPresent() && !acidInfo.get().getOriginalFiles().isEmpty(); try { TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity()); - TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path.toString())); + TrinoInputFile inputFile = fileSystem.newInputFile(path); orcDataSource = new HdfsOrcDataSource( new OrcDataSourceId(path.toString()), estimatedFileSize, @@ -407,7 +406,7 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) { Optional deletedRows = acidInfo.map(info -> new OrcDeletedRows( - path.getName(), + path.fileName(), new OrcDeleteDeltaPageSourceFactory(options, stats), session.getIdentity(), fileSystemFactory, @@ -463,7 +462,7 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) { } } - private static void validateOrcAcidVersion(Path path, OrcReader reader) + private static void validateOrcAcidVersion(Location path, OrcReader reader) { // Trino cannot read ORC ACID tables with version < 2 (written by Hive older than 3.0) // See https://github.com/trinodb/trino/issues/2790#issuecomment-591901728 for more context @@ -542,12 +541,12 @@ private static boolean hasOriginalFiles(AcidInfo acidInfo) return !acidInfo.getOriginalFiles().isEmpty(); } - private static String splitError(Throwable t, Path path, long start, long length) + private static String splitError(Throwable t, Location path, long start, long length) { return format("Error opening Hive split %s (offset=%s, length=%s): %s", path, start, length, t.getMessage()); } - private static void verifyFileHasColumnNames(List columns, Path path) + private static void verifyFileHasColumnNames(List columns, Location path) { if (!columns.isEmpty() && columns.stream().map(OrcColumn::getColumnName).allMatch(physicalColumnName -> DEFAULT_HIVE_COLUMN_NAME_PATTERN.matcher(physicalColumnName).matches())) { throw new TrinoException( @@ -556,7 +555,7 @@ private static void verifyFileHasColumnNames(List columns, Path path) } } - static void verifyAcidSchema(OrcReader orcReader, Path path) + static void verifyAcidSchema(OrcReader orcReader, Location path) { OrcColumn rootColumn = orcReader.getRootColumn(); List nestedColumns = rootColumn.getNestedColumns(); @@ -579,7 +578,7 @@ static void verifyAcidSchema(OrcReader orcReader, Path path) verifyAcidColumn(orcReader, 5, AcidSchema.ACID_COLUMN_ROW_STRUCT, STRUCT, path); } - private static void verifyAcidColumn(OrcReader orcReader, int columnIndex, String columnName, OrcTypeKind columnType, Path path) + private static void verifyAcidColumn(OrcReader orcReader, int columnIndex, String columnName, OrcTypeKind columnType, Location path) { OrcColumn column = orcReader.getRootColumn().getNestedColumns().get(columnIndex); if (!column.getColumnName().toLowerCase(ENGLISH).equals(columnName.toLowerCase(ENGLISH))) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OriginalFilesUtils.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OriginalFilesUtils.java index 5c2516b5ecac..e2942718c056 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OriginalFilesUtils.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OriginalFilesUtils.java @@ -23,7 +23,6 @@ import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.spi.TrinoException; import io.trino.spi.security.ConnectorIdentity; -import org.apache.hadoop.fs.Path; import java.util.Collection; @@ -46,7 +45,7 @@ private OriginalFilesUtils() {} */ public static long getPrecedingRowCount( Collection originalFileInfos, - Path splitPath, + Location splitPath, TrinoFileSystemFactory fileSystemFactory, ConnectorIdentity identity, OrcReaderOptions options, @@ -54,10 +53,10 @@ public static long getPrecedingRowCount( { long rowCount = 0; for (OriginalFileInfo originalFileInfo : originalFileInfos) { - Path path = new Path(splitPath.getParent() + "/" + originalFileInfo.getName()); - if (path.compareTo(splitPath) < 0) { + if (originalFileInfo.getName().compareTo(splitPath.fileName()) < 0) { + Location path = splitPath.parentDirectory().appendPath(originalFileInfo.getName()); TrinoInputFile inputFile = fileSystemFactory.create(identity) - .newInputFile(Location.of(path.toString()), originalFileInfo.getFileSize()); + .newInputFile(path, originalFileInfo.getFileSize()); rowCount += getRowsInFile(inputFile, options, stats); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index bcd9947bf8d8..39d3474909b8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -47,7 +47,6 @@ import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -164,7 +163,7 @@ public static Properties stripUnnecessaryProperties(Properties schema) public Optional createPageSource( Configuration configuration, ConnectorSession session, - Path path, + Location path, long start, long length, long estimatedFileSize, @@ -182,9 +181,8 @@ public Optional createPageSource( checkArgument(acidInfo.isEmpty(), "Acid is not supported"); - Location location = Location.of(path.toString()); TrinoFileSystem fileSystem = fileSystemFactory.create(session); - TrinoInputFile inputFile = fileSystem.newInputFile(location, estimatedFileSize); + TrinoInputFile inputFile = fileSystem.newInputFile(path, estimatedFileSize); return Optional.of(createPageSource( inputFile, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java index e08721a28c09..cb73475640e1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/rcfile/RcFilePageSourceFactory.java @@ -47,7 +47,6 @@ import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.joda.time.DateTimeZone; import javax.inject.Inject; @@ -106,7 +105,7 @@ public static Properties stripUnnecessaryProperties(Properties schema) public Optional createPageSource( Configuration configuration, ConnectorSession session, - Path path, + Location path, long start, long length, long estimatedFileSize, @@ -141,9 +140,8 @@ else if (deserializerClassName.equals(COLUMNAR_SERDE_CLASS)) { .collect(toImmutableList()); } - Location location = Location.of(path.toString()); TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session.getIdentity()); - TrinoInputFile inputFile = new MonitoredInputFile(stats, trinoFileSystem.newInputFile(location)); + TrinoInputFile inputFile = new MonitoredInputFile(stats, trinoFileSystem.newInputFile(path)); try { length = min(inputFile.length() - start, length); if (!inputFile.exists()) { @@ -152,7 +150,7 @@ else if (deserializerClassName.equals(COLUMNAR_SERDE_CLASS)) { if (estimatedFileSize < BUFFER_SIZE.toBytes()) { try (InputStream inputStream = inputFile.newStream()) { byte[] data = inputStream.readAllBytes(); - inputFile = new MemoryInputFile(location, Slices.wrappedBuffer(data)); + inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data)); } } } @@ -197,7 +195,7 @@ else if (deserializerClassName.equals(COLUMNAR_SERDE_CLASS)) { } } - private static String splitError(Throwable t, Path path, long start, long length) + private static String splitError(Throwable t, Location path, long start, long length) { return format("Error opening Hive split %s (offset=%s, length=%s): %s", path, start, length, t.getMessage()); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java index 355b56385290..22663f055d3e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectRecordCursorProvider.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.Location; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveRecordCursorProvider; @@ -66,7 +67,7 @@ public S3SelectRecordCursorProvider(HdfsEnvironment hdfsEnvironment, TrinoS3Clie public Optional createRecordCursor( Configuration configuration, ConnectorSession session, - Path path, + Location location, long start, long length, long fileSize, @@ -80,6 +81,7 @@ public Optional createRecordCursor( return Optional.empty(); } + Path path = new Path(location.toString()); try { this.hdfsEnvironment.getFileSystem(session.getIdentity(), path, configuration); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java index 59ae442ba775..2bb6a42b739f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java @@ -931,7 +931,7 @@ public static List toPartitionValues(String partitionName) public static NullableValue getPrefilledColumnValue( HiveColumnHandle columnHandle, HivePartitionKey partitionKey, - Path path, + String path, OptionalInt bucketNumber, long fileSize, long fileModifiedTime, @@ -942,7 +942,7 @@ public static NullableValue getPrefilledColumnValue( columnValue = partitionKey.getValue(); } else if (isPathColumnHandle(columnHandle)) { - columnValue = path.toString(); + columnValue = path; } else if (isBucketColumnHandle(columnHandle)) { columnValue = String.valueOf(bucketNumber.getAsInt()); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java index 7ab5258d69f6..d5539a746257 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java @@ -18,6 +18,7 @@ import com.google.common.collect.Lists; import io.airlift.compress.lzo.LzoCodec; import io.airlift.compress.lzo.LzopCodec; +import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hive.formats.compression.CompressionKind; @@ -1005,7 +1006,7 @@ private ConnectorPageSource createPageSourceFromCursorProvider( columnHandles, ImmutableList.of(), TableToPartitionMapping.empty(), - split.getPath(), + split.getPath().toString(), OptionalInt.empty(), fileSize, Instant.now().toEpochMilli()); @@ -1015,7 +1016,7 @@ private ConnectorPageSource createPageSourceFromCursorProvider( ImmutableSet.of(cursorProvider), configuration, session, - split.getPath(), + Location.of(split.getPath().toString()), OptionalInt.empty(), split.getStart(), split.getLength(), @@ -1083,7 +1084,7 @@ private void testPageSourceFactory( columnHandles, ImmutableList.of(), TableToPartitionMapping.empty(), - split.getPath(), + split.getPath().toString(), OptionalInt.empty(), fileSize, Instant.now().toEpochMilli()); @@ -1093,7 +1094,7 @@ private void testPageSourceFactory( ImmutableSet.of(), newEmptyConfiguration(), session, - split.getPath(), + Location.of(split.getPath().toString()), OptionalInt.empty(), split.getStart(), split.getLength(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java index 6a2873a39a33..f6cfe43d2d96 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java @@ -18,6 +18,7 @@ import io.airlift.slice.Slice; import io.airlift.stats.Distribution; import io.airlift.units.DataSize; +import io.trino.filesystem.Location; import io.trino.hive.orc.NullMemoryManager; import io.trino.hive.orc.impl.WriterImpl; import io.trino.metadata.FunctionManager; @@ -560,17 +561,17 @@ public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, Connec columns, ImmutableList.of(), TableToPartitionMapping.empty(), - fileSplit.getPath(), + fileSplit.getPath().toString(), OptionalInt.empty(), fileSplit.getLength(), Instant.now().toEpochMilli()); - return HivePageSourceProvider.createHivePageSource( + ConnectorPageSource connectorPageSource = HivePageSourceProvider.createHivePageSource( ImmutableSet.of(orcPageSourceFactory), ImmutableSet.of(), newEmptyConfiguration(), session, - fileSplit.getPath(), + Location.of(fileSplit.getPath().toString()), OptionalInt.empty(), fileSplit.getStart(), fileSplit.getLength(), @@ -586,6 +587,7 @@ public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, Connec false, NO_ACID_TRANSACTION, columnMappings).orElseThrow(); + return connectorPageSource; } public SourceOperator newTableScanOperator(DriverContext driverContext) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOriginalFilesUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOriginalFilesUtils.java index 278711998c1d..14365c0873e8 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOriginalFilesUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOriginalFilesUtils.java @@ -13,17 +13,16 @@ */ package io.trino.plugin.hive; -import com.google.common.io.Resources; +import io.trino.filesystem.Location; import io.trino.orc.OrcReaderOptions; import io.trino.plugin.hive.orc.OriginalFilesUtils; -import org.apache.hadoop.fs.Path; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.io.File; import java.util.ArrayList; import java.util.List; +import static com.google.common.io.Resources.getResource; import static io.trino.plugin.hive.AcidInfo.OriginalFileInfo; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.testing.TestingConnectorSession.SESSION; @@ -31,13 +30,13 @@ public class TestOriginalFilesUtils { - private String tablePath; + private Location tablePath; @BeforeClass public void setup() throws Exception { - tablePath = new File(Resources.getResource(("dummy_id_data_orc")).toURI()).getPath(); + tablePath = Location.of(getResource("dummy_id_data_orc").toString()); } @Test @@ -48,7 +47,7 @@ public void testGetPrecedingRowCountSingleFile() long rowCountResult = OriginalFilesUtils.getPrecedingRowCount( originalFileInfoList, - new Path(tablePath + "/000001_0"), + tablePath.appendPath("000001_0"), HDFS_FILE_SYSTEM_FACTORY, SESSION.getIdentity(), new OrcReaderOptions(), @@ -67,7 +66,7 @@ public void testGetPrecedingRowCount() long rowCountResult = OriginalFilesUtils.getPrecedingRowCount( originalFileInfos, - new Path(tablePath + "/000002_0_copy_2"), + tablePath.appendPath("000002_0_copy_2"), HDFS_FILE_SYSTEM_FACTORY, SESSION.getIdentity(), new OrcReaderOptions(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java index dfec148ef1d7..6946d646ab73 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/AbstractFileFormat.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.Location; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.GenericHiveRecordCursorProvider; import io.trino.plugin.hive.HiveColumnHandle; @@ -40,7 +41,6 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; import io.trino.sql.planner.TestingConnectorTransactionHandle; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import java.io.File; @@ -186,7 +186,7 @@ static ConnectorPageSource createPageSource( Optional recordCursorWithProjections = cursorProvider.createRecordCursor( conf, session, - new Path(targetFile.getAbsolutePath()), + Location.of(targetFile.getAbsolutePath()), 0, targetFile.length(), targetFile.length(), @@ -218,7 +218,7 @@ static ConnectorPageSource createPageSource( .createPageSource( conf, session, - new Path(targetFile.getAbsolutePath()), + Location.of(targetFile.getAbsolutePath()), 0, targetFile.length(), targetFile.length(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java index d8c987df2231..915d29920f83 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java @@ -31,7 +31,6 @@ import io.trino.tpch.Nation; import io.trino.tpch.NationColumn; import io.trino.tpch.NationGenerator; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.assertj.core.api.Assertions; import org.testng.annotations.Test; @@ -234,7 +233,7 @@ private static List readFile(Map columns, Optiona Optional pageSourceWithProjections = PAGE_SOURCE_FACTORY.createPageSource( new JobConf(newEmptyConfiguration()), SESSION, - new Path(filePath), + Location.of(filePath), 0, fileSize, fileSize, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java index ee9c89db5d3d..383bf51e08c7 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.trino.filesystem.Location; import io.trino.orc.OrcReaderOptions; import io.trino.orc.OrcWriterOptions; import io.trino.plugin.hive.AbstractTestHiveFileFormats; @@ -211,7 +212,7 @@ private ConnectorPageSource createPageSource( columnHandles, ImmutableList.of(), TableToPartitionMapping.empty(), - split.getPath(), + split.getPath().toString(), OptionalInt.empty(), split.getLength(), Instant.now().toEpochMilli()); @@ -221,7 +222,7 @@ private ConnectorPageSource createPageSource( ImmutableSet.of(), newEmptyConfiguration(), session, - split.getPath(), + Location.of(split.getPath().toString()), OptionalInt.empty(), split.getStart(), split.getLength(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestOnlyNulls.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestOnlyNulls.java index e5c3a19cbe67..68df9a8fe3b6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestOnlyNulls.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestOnlyNulls.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive.parquet; import com.google.common.io.Resources; +import io.trino.filesystem.Location; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.HivePageSourceFactory; @@ -27,7 +28,6 @@ import io.trino.spi.type.Type; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; -import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; import java.io.File; @@ -90,7 +90,7 @@ private static ConnectorPageSource createPageSource(File parquetFile, HiveColumn return pageSourceFactory.createPageSource( newEmptyConfiguration(), getHiveSession(new HiveConfig()), - new Path(parquetFile.toURI()), + Location.of(parquetFile.getPath()), 0, parquetFile.length(), parquetFile.length(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java index f41f3ed4b7f8..8086631127d5 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive.parquet; import com.google.common.io.Resources; +import io.trino.filesystem.Location; import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.HivePageSourceFactory; import io.trino.plugin.hive.HiveStorageFormat; @@ -28,7 +29,6 @@ import io.trino.spi.type.Type; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; -import org.apache.hadoop.fs.Path; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -116,7 +116,7 @@ private ConnectorPageSource createPageSource(ConnectorSession session, File parq ReaderPageSource pageSourceWithProjections = pageSourceFactory.createPageSource( newEmptyConfiguration(), session, - new Path(parquetFile.toURI()), + Location.of(parquetFile.getPath()), 0, parquetFile.length(), parquetFile.length(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java index b676e405297c..fb5e17500d0f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectRecordCursorProvider.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.Location; import io.trino.hadoop.ConfigurationInstantiator; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveConfig; @@ -24,7 +25,6 @@ import io.trino.spi.predicate.Range; import io.trino.spi.predicate.SortedRangeSet; import io.trino.spi.predicate.TupleDomain; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.testng.annotations.Test; @@ -120,7 +120,7 @@ private static Optional getRecordCursor(Tuple return s3SelectRecordCursorProvider.createRecordCursor( ConfigurationInstantiator.newEmptyConfiguration(), SESSION, - new Path("s3://fakeBucket/fakeObject.gz"), + Location.of("s3://fakeBucket/fakeObject.gz"), 0, 10, 10,