Skip to content

Commit

Permalink
Remove usages of Hadoop Path for HivePageSourceFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed May 5, 2023
1 parent 3632c37 commit 28d3bb4
Show file tree
Hide file tree
Showing 24 changed files with 88 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive;

import io.airlift.units.DataSize;
import io.trino.filesystem.Location;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.util.HiveUtil;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -64,7 +65,7 @@ public GenericHiveRecordCursorProvider(HdfsEnvironment hdfsEnvironment, DataSize
public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
Configuration configuration,
ConnectorSession session,
Path path,
Location location,
long start,
long length,
long fileSize,
Expand All @@ -77,6 +78,7 @@ public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
configuration.setInt(LineRecordReader.MAX_LINE_LENGTH, textMaxLineLengthBytes);

// make sure the FileSystem is created with the proper Configuration object
Path path = new Path(location.toString());
try {
this.hdfsEnvironment.getFileSystem(session.getIdentity(), path, configuration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.VerifyException;
import io.airlift.slice.Slice;
import io.trino.filesystem.Location;
import io.trino.plugin.hive.type.TypeInfo;
import io.trino.plugin.hive.util.ForwardingRecordCursor;
import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion;
Expand All @@ -24,7 +25,6 @@
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.fs.Path;

import java.util.List;

Expand All @@ -39,7 +39,7 @@ public class HiveBucketValidationRecordCursor
extends ForwardingRecordCursor
{
private final RecordCursor delegate;
private final Path path;
private final Location path;
private final int[] bucketColumnIndices;
private final List<Class<?>> javaTypeList;
private final List<TypeInfo> typeInfoList;
Expand All @@ -52,7 +52,7 @@ public class HiveBucketValidationRecordCursor
private int validationCounter;

public HiveBucketValidationRecordCursor(
Path path,
Location path,
int[] bucketColumnIndices,
List<HiveType> bucketColumnTypes,
BucketingVersion bucketingVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableList;
import io.trino.filesystem.Location;
import io.trino.plugin.hive.HivePageSourceProvider.BucketAdaptation;
import io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping;
import io.trino.plugin.hive.coercions.CharCoercer;
Expand Down Expand Up @@ -55,7 +56,6 @@
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.VarcharType;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.hadoop.fs.Path;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -604,15 +604,15 @@ public static class BucketValidator
// validate every ~100 rows but using a prime number
public static final int VALIDATION_STRIDE = 97;

private final Path path;
private final Location path;
private final int[] bucketColumnIndices;
private final List<TypeInfo> bucketColumnTypes;
private final BucketingVersion bucketingVersion;
private final int bucketCount;
private final int expectedBucket;

public BucketValidator(
Path path,
Location path,
int[] bucketColumnIndices,
List<TypeInfo> bucketColumnTypes,
BucketingVersion bucketingVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
*/
package io.trino.plugin.hive;

import io.trino.filesystem.Location;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.predicate.TupleDomain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.util.List;
import java.util.Optional;
Expand All @@ -29,7 +29,7 @@ public interface HivePageSourceFactory
Optional<ReaderPageSource> createPageSource(
Configuration configuration,
ConnectorSession session,
Path path,
Location path,
long start,
long length,
long estimatedFileSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.filesystem.Location;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.HivePageSource.BucketValidator;
Expand Down Expand Up @@ -127,16 +128,15 @@ public ConnectorPageSource createPageSource(
.map(HiveColumnHandle.class::cast)
.collect(toList());

Path path = new Path(hiveSplit.getPath());
boolean originalFile = ORIGINAL_FILE_PATH_MATCHER.matcher(path.toString()).matches();
boolean originalFile = ORIGINAL_FILE_PATH_MATCHER.matcher(hiveSplit.getPath()).matches();

List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
hiveSplit.getPartitionName(),
hiveSplit.getPartitionKeys(),
hiveColumns,
hiveSplit.getBucketConversion().map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
hiveSplit.getTableToPartitionMapping(),
path,
hiveSplit.getPath(),
hiveSplit.getTableBucketNumber(),
hiveSplit.getEstimatedFileSize(),
hiveSplit.getFileModifiedTime());
Expand All @@ -147,7 +147,7 @@ public ConnectorPageSource createPageSource(
return new EmptyPageSource();
}

Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), path);
Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(hiveSplit.getPath()));

TupleDomain<HiveColumnHandle> simplifiedDynamicFilter = dynamicFilter
.getCurrentPredicate()
Expand All @@ -157,7 +157,7 @@ public ConnectorPageSource createPageSource(
cursorProviders,
configuration,
session,
path,
Location.of(hiveSplit.getPath()),
hiveSplit.getTableBucketNumber(),
hiveSplit.getStart(),
hiveSplit.getLength(),
Expand Down Expand Up @@ -185,7 +185,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
Set<HiveRecordCursorProvider> cursorProviders,
Configuration configuration,
ConnectorSession session,
Path path,
Location path,
OptionalInt tableBucketNumber,
long start,
long length,
Expand Down Expand Up @@ -453,7 +453,7 @@ public static List<ColumnMapping> buildColumnMappings(
List<HiveColumnHandle> columns,
List<HiveColumnHandle> requiredInterimColumns,
TableToPartitionMapping tableToPartitionMapping,
Path path,
String path,
OptionalInt bucketNumber,
long estimatedFileSize,
long fileModifiedTime)
Expand Down Expand Up @@ -667,7 +667,7 @@ public int getBucketToKeep()
}
}

private static Optional<BucketValidator> createBucketValidator(Path path, Optional<BucketValidation> bucketValidation, OptionalInt bucketNumber, List<ColumnMapping> columnMappings)
private static Optional<BucketValidator> createBucketValidator(Location path, Optional<BucketValidation> bucketValidation, OptionalInt bucketNumber, List<ColumnMapping> columnMappings)
{
return bucketValidation.flatMap(validation -> {
Map<Integer, ColumnMapping> baseHiveColumnToBlockIndex = columnMappings.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
*/
package io.trino.plugin.hive;

import io.trino.filesystem.Location;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.util.List;
import java.util.Optional;
Expand All @@ -31,7 +31,7 @@ public interface HiveRecordCursorProvider
Optional<ReaderRecordCursorWithProjections> createRecordCursor(
Configuration configuration,
ConnectorSession session,
Path path,
Location path,
long start,
long length,
long fileSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.line;

import io.trino.filesystem.Location;
import io.trino.hive.formats.line.LineBuffer;
import io.trino.hive.formats.line.LineDeserializer;
import io.trino.hive.formats.line.LineReader;
Expand All @@ -38,12 +39,12 @@ public class LinePageSource
private final LineReader lineReader;
private final LineDeserializer deserializer;
private final LineBuffer lineBuffer;
private final String filePath;
private final Location filePath;

private PageBuilder pageBuilder;
private long completedPositions;

public LinePageSource(LineReader lineReader, LineDeserializer deserializer, LineBuffer lineBuffer, String filePath)
public LinePageSource(LineReader lineReader, LineDeserializer deserializer, LineBuffer lineBuffer, Location filePath)
{
this.lineReader = requireNonNull(lineReader, "lineReader is null");
this.deserializer = requireNonNull(deserializer, "deserializer is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.predicate.TupleDomain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.InputStream;
import java.util.List;
Expand Down Expand Up @@ -92,7 +91,7 @@ protected LinePageSourceFactory(
public Optional<ReaderPageSource> createPageSource(
Configuration configuration,
ConnectorSession session,
Path path,
Location path,
long start,
long length,
long estimatedFileSize,
Expand Down Expand Up @@ -142,9 +141,8 @@ public Optional<ReaderPageSource> createPageSource(
}

// buffer file if small
Location location = Location.of(path.toString());
TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session.getIdentity());
TrinoInputFile inputFile = new MonitoredInputFile(stats, trinoFileSystem.newInputFile(location));
TrinoInputFile inputFile = new MonitoredInputFile(stats, trinoFileSystem.newInputFile(path));
try {
length = min(inputFile.length() - start, length);
if (!inputFile.exists()) {
Expand All @@ -153,7 +151,7 @@ public Optional<ReaderPageSource> createPageSource(
if (estimatedFileSize < SMALL_FILE_SIZE.toBytes()) {
try (InputStream inputStream = inputFile.newStream()) {
byte[] data = inputStream.readAllBytes();
inputFile = new MemoryInputFile(location, Slices.wrappedBuffer(data));
inputFile = new MemoryInputFile(path, Slices.wrappedBuffer(data));
}
}
}
Expand All @@ -171,7 +169,7 @@ public Optional<ReaderPageSource> createPageSource(

try {
LineReader lineReader = lineReaderFactory.createLineReader(inputFile, start, length, headerCount, footerCount);
LinePageSource pageSource = new LinePageSource(lineReader, lineDeserializer, lineReaderFactory.createLineBuffer(), path.toString());
LinePageSource pageSource = new LinePageSource(lineReader, lineDeserializer, lineReaderFactory.createLineBuffer(), path);
return Optional.of(new ReaderPageSource(pageSource, readerProjections));
}
catch (TrinoException e) {
Expand All @@ -182,7 +180,7 @@ public Optional<ReaderPageSource> createPageSource(
}
}

private static String splitError(Throwable t, Path path, long start, long length)
private static String splitError(Throwable t, Location path, long start, long length)
{
return format("Error opening Hive split %s (offset=%s, length=%s): %s", path, start, length, t.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.hive.orc;

import com.google.common.collect.ImmutableList;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoInputFile;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.orc.NameBasedFieldMapper;
Expand All @@ -29,7 +30,6 @@
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorPageSource;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -74,10 +74,10 @@ public static Optional<ConnectorPageSource> createOrcDeleteDeltaPageSource(
FileFormatDataSourceStats stats)
{
OrcDataSource orcDataSource;
String path = inputFile.location().toString();
Location path = inputFile.location();
try {
orcDataSource = new HdfsOrcDataSource(
new OrcDataSourceId(path),
new OrcDataSourceId(path.toString()),
inputFile.length(),
options,
inputFile,
Expand Down Expand Up @@ -112,7 +112,7 @@ public static Optional<ConnectorPageSource> createOrcDeleteDeltaPageSource(
}

private OrcDeleteDeltaPageSource(
String path,
Location path,
long fileSize,
OrcReader reader,
OrcDataSource orcDataSource,
Expand All @@ -122,7 +122,7 @@ private OrcDeleteDeltaPageSource(
this.stats = requireNonNull(stats, "stats is null");
this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null");

verifyAcidSchema(reader, new Path(path));
verifyAcidSchema(reader, path);
Map<String, OrcColumn> acidColumns = uniqueIndex(
reader.getRootColumn().getNestedColumns(),
orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH));
Expand Down Expand Up @@ -211,7 +211,7 @@ public long getMemoryUsage()
return memoryContext.getBytes();
}

private static String openError(Throwable t, String path)
private static String openError(Throwable t, Location path)
{
return format("Error opening Hive delete delta file %s: %s", path, t.getMessage());
}
Expand Down
Loading

0 comments on commit 28d3bb4

Please sign in to comment.