Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive cleanup #18418

Merged
merged 22 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
85f8620
Move Hudi class name constants to HiveClassNames
electrum Jul 11, 2023
0bf374e
Rename REGEX_SERDE_CLASS constant to match class name
electrum Jul 11, 2023
09f5f7a
Add test for REGEX_SERDE_CLASS
electrum Jul 11, 2023
8c45fb7
Rename AvroPageSource and AvroPageSourceFactory
electrum Jul 16, 2023
63aa68a
Add Avro to testing page source factories
electrum Jul 16, 2023
2311f17
Update testing Hive file writer factories
electrum Jul 15, 2023
2123a62
Use FSDataOutputStream for MemoryAwareFileSystem
electrum Jul 24, 2023
f9e3cc3
Handle Kerberos ticket refresh during long write operations
electrum Jul 24, 2023
49cd2a7
Convert PartitionUpdateInfo to compact constructor
electrum Jul 16, 2023
7764b7d
Inline getPartitionSchema method
electrum Jul 25, 2023
e97c3cd
Simplify expression in S3SelectPushdown
electrum Jul 25, 2023
66e4b7f
Make getInputFormatName return Optional
electrum Jul 25, 2023
3f12ea2
Remove usage of InputFormat from S3SelectPushdown
electrum Jul 25, 2023
ef5de69
Cleanup code in HiveCompressionCodecs
electrum Jul 25, 2023
40a33f3
Support legacy Parquet input formats in HiveStorageFormat
electrum Jul 26, 2023
3f10cee
Remove usage of InputFormat for isSplittable
electrum Jul 26, 2023
a209472
Remove support for splits from custom input formats
electrum Jul 25, 2023
ea4d29c
Remove symlink usage of getInputFormat
electrum Jul 26, 2023
255e2c9
Replace usage of Hadoop SecurityUtil
electrum Jul 26, 2023
67aabd8
Remove usage of HadoopAuthentication for Thrift metastore
electrum Jul 26, 2023
a7a9f36
Remove usages of Hadoop Path for HiveFileIterator
electrum Jul 16, 2023
b45f67c
Remove usages of Hadoop Path for PartitionUpdate
electrum Jul 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.MemoryAwareFileSystem;
import io.trino.hdfs.authentication.GenericExceptionAction;
import io.trino.memory.context.AggregatedMemoryContext;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -70,16 +72,23 @@ private OutputStream create(boolean overwrite, AggregatedMemoryContext memoryCon
FileSystem rawFileSystem = getRawFileSystem(fileSystem);
try (TimeStat.BlockTimer ignored = createFileCallStat.time()) {
if (rawFileSystem instanceof MemoryAwareFileSystem memoryAwareFileSystem) {
return environment.doAs(context.getIdentity(), () -> memoryAwareFileSystem.create(file, memoryContext));
return create(() -> memoryAwareFileSystem.create(file, memoryContext));
}
return environment.doAs(context.getIdentity(), () -> fileSystem.create(file, overwrite));
return create(() -> fileSystem.create(file, overwrite));
}
catch (IOException e) {
createFileCallStat.recordException(e);
throw e;
}
}

private OutputStream create(GenericExceptionAction<FSDataOutputStream, IOException> action)
throws IOException
{
FSDataOutputStream out = environment.doAs(context.getIdentity(), action);
return new HdfsOutputStream(out, environment, context);
}

@Override
public Location location()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.hdfs;

import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.hadoop.fs.FSDataOutputStream;

import java.io.IOException;
import java.io.OutputStream;

/**
* Handle Kerberos ticket refresh during long write operations.
*/
class HdfsOutputStream
extends FSDataOutputStream
{
private final HdfsEnvironment environment;
private final ConnectorIdentity identity;

public HdfsOutputStream(FSDataOutputStream out, HdfsEnvironment environment, HdfsContext context)
{
super(out, null, out.getPos());
this.environment = environment;
this.identity = context.getIdentity();
}

@Override
public OutputStream getWrappedStream()
{
// return the originally wrapped stream, not the delegate
return ((FSDataOutputStream) super.getWrappedStream()).getWrappedStream();
}

@Override
public void write(int b)
throws IOException
{
environment.doAs(identity, () -> {
super.write(b);
return null;
});
}

@Override
public void write(byte[] b, int off, int len)
throws IOException
{
environment.doAs(identity, () -> {
super.write(b, off, len);
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
package io.trino.hdfs;

import io.trino.memory.context.AggregatedMemoryContext;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.io.OutputStream;

public interface MemoryAwareFileSystem
{
OutputStream create(Path f, AggregatedMemoryContext memoryContext)
FSDataOutputStream create(Path f, AggregatedMemoryContext memoryContext)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ public FSDataOutputStream create(Path path, FsPermission permission, boolean ove
}

@Override
public OutputStream create(Path path, AggregatedMemoryContext aggregatedMemoryContext)
public FSDataOutputStream create(Path path, AggregatedMemoryContext aggregatedMemoryContext)
throws IOException
{
return new FSDataOutputStream(createOutputStream(path, aggregatedMemoryContext), statistics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.plugin.hive.fs.TrinoFileStatus;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.StorageFormat;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.s3select.S3SelectPushdown;
import io.trino.plugin.hive.util.AcidTables.AcidState;
Expand Down Expand Up @@ -69,7 +70,6 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.annotation.Annotation;
import java.nio.charset.StandardCharsets;
import java.security.Principal;
import java.util.ArrayList;
Expand Down Expand Up @@ -100,7 +100,6 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Maps.fromProperties;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.addExceptionCallback;
Expand All @@ -114,9 +113,12 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static io.trino.plugin.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static io.trino.plugin.hive.HiveSessionProperties.isForceLocalScheduling;
import static io.trino.plugin.hive.HiveSessionProperties.isValidateBucketing;
import static io.trino.plugin.hive.HiveStorageFormat.TEXTFILE;
import static io.trino.plugin.hive.HiveStorageFormat.getHiveStorageFormat;
import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.FAIL;
import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.IGNORED;
import static io.trino.plugin.hive.fs.HiveFileIterator.NestedDirectoryPolicy.RECURSE;
Expand All @@ -128,9 +130,11 @@
import static io.trino.plugin.hive.util.AcidTables.readAcidVersionFile;
import static io.trino.plugin.hive.util.HiveClassNames.SYMLINK_TEXT_INPUT_FORMAT_CLASS;
import static io.trino.plugin.hive.util.HiveUtil.checkCondition;
import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static io.trino.plugin.hive.util.HiveUtil.getFooterCount;
import static io.trino.plugin.hive.util.HiveUtil.getHeaderCount;
import static io.trino.plugin.hive.util.HiveUtil.getInputFormat;
import static io.trino.plugin.hive.util.HiveUtil.getInputFormatName;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.PartitionMatchSupplier.createPartitionMatchSupplier;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -418,7 +422,9 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
{
HivePartition hivePartition = partition.getHivePartition();
String partitionName = hivePartition.getPartitionId();
Properties schema = getPartitionSchema(table, partition.getPartition());
Properties schema = partition.getPartition()
.map(value -> getHiveSchema(value, table))
.orElseGet(() -> getHiveSchema(table));
List<HivePartitionKey> partitionKeys = getPartitionKeys(table, partition.getPartition());
TupleDomain<HiveColumnHandle> effectivePredicate = compactEffectivePredicate.transformKeys(HiveColumnHandle.class::cast);

Expand All @@ -430,20 +436,23 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)

Path path = new Path(getPartitionLocation(table, partition.getPartition()));
Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, path);
InputFormat<?, ?> inputFormat = getInputFormat(configuration, schema, false);
FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path);

boolean s3SelectPushdownEnabled = S3SelectPushdown.shouldEnablePushdownForTable(session, table, path.toString(), partition.getPartition());

// S3 Select pushdown works at the granularity of individual S3 objects for compressed files
// and finer granularity for uncompressed files using scan range feature.
boolean shouldEnableSplits = S3SelectPushdown.isSplittable(s3SelectPushdownEnabled, schema, inputFormat, path.toString());
boolean shouldEnableSplits = S3SelectPushdown.isSplittable(s3SelectPushdownEnabled, schema, path.toString());

// Skip header / footer lines are not splittable except for a special case when skip.header.line.count=1
boolean splittable = shouldEnableSplits && getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1;

if (inputFormat.getClass().getName().equals(SYMLINK_TEXT_INPUT_FORMAT_CLASS)) {
if (SYMLINK_TEXT_INPUT_FORMAT_CLASS.equals(getInputFormatName(schema).orElse(null))) {
if (tableBucketInfo.isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Bucketed table in SymlinkTextInputFormat is not yet supported");
}
InputFormat<?, ?> targetInputFormat = getInputFormat(configuration, schema, true);
HiveStorageFormat targetStorageFormat = getSymlinkStorageFormat(getDeserializerClassName(schema));
InputFormat<?, ?> targetInputFormat = getInputFormat(configuration, schema);
List<Path> targetPaths = hdfsEnvironment.doAs(
hdfsContext.getIdentity(),
() -> getTargetPathsFromSymlink(fs, path));
Expand All @@ -453,7 +462,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
.collect(toImmutableSet());
if (optimizeSymlinkListing && parents.size() == 1 && !recursiveDirWalkerEnabled) {
Optional<Iterator<InternalHiveSplit>> manifestFileIterator = buildManifestFileIterator(
targetInputFormat,
targetStorageFormat,
partitionName,
schema,
partitionKeys,
Expand All @@ -471,6 +480,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
}
return createHiveSymlinkSplits(
partitionName,
targetStorageFormat,
targetInputFormat,
schema,
partitionKeys,
Expand All @@ -481,6 +491,11 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
targetPaths);
}

StorageFormat rawStorageFormat = partition.getPartition()
.map(Partition::getStorage).orElseGet(table::getStorage).getStorageFormat();
HiveStorageFormat storageFormat = getHiveStorageFormat(rawStorageFormat)
.orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "Unsupported storage format: %s %s".formatted(hivePartition, rawStorageFormat)));

Optional<BucketConversion> bucketConversion = Optional.empty();
boolean bucketConversionRequiresWorkerParticipation = false;
if (partition.getPartition().isPresent()) {
Expand Down Expand Up @@ -509,7 +524,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
InternalHiveSplitFactory splitFactory = new InternalHiveSplitFactory(
fs,
partitionName,
inputFormat,
storageFormat,
schema,
partitionKeys,
effectivePredicate,
Expand All @@ -522,26 +537,6 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
s3SelectPushdownEnabled,
maxSplitFileSize);

// To support custom input formats, we want to call getSplits()
// on the input format to obtain file splits.
if (shouldUseFileSplitsFromInputFormat(inputFormat)) {
if (tableBucketInfo.isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Trino cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName());
}

if (isTransactionalTable(table.getParameters())) {
throw new TrinoException(NOT_SUPPORTED, "Hive transactional tables in an input format with UseFileSplitsFromInputFormat annotation are not supported: " + inputFormat.getClass().getSimpleName());
}

JobConf jobConf = toJobConf(configuration);
jobConf.set(FILE_INPUT_FORMAT_INPUT_DIR, StringUtils.escapeString(path.toString()));
// Pass SerDes and Table parameters into input format configuration
fromProperties(schema).forEach(jobConf::set);
InputSplit[] splits = hdfsEnvironment.doAs(hdfsContext.getIdentity(), () -> inputFormat.getSplits(jobConf, 0));

return addSplitsToSource(splits, splitFactory);
}

if (isTransactionalTable(table.getParameters())) {
return getTransactionalSplits(Location.of(path.toString()), splittable, bucketConversion, splitFactory);
}
Expand Down Expand Up @@ -578,6 +573,7 @@ private List<TrinoFileStatus> listBucketFiles(TrinoFileSystem fs, Location locat

private ListenableFuture<Void> createHiveSymlinkSplits(
String partitionName,
HiveStorageFormat storageFormat,
InputFormat<?, ?> targetInputFormat,
Properties schema,
List<HivePartitionKey> partitionKeys,
Expand Down Expand Up @@ -611,7 +607,7 @@ private ListenableFuture<Void> createHiveSymlinkSplits(
InternalHiveSplitFactory splitFactory = new InternalHiveSplitFactory(
targetFilesystem,
partitionName,
targetInputFormat,
storageFormat,
schema,
partitionKeys,
effectivePredicate,
Expand All @@ -633,7 +629,7 @@ private ListenableFuture<Void> createHiveSymlinkSplits(

@VisibleForTesting
Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
InputFormat<?, ?> targetInputFormat,
HiveStorageFormat targetStorageFormat,
String partitionName,
Properties schema,
List<HivePartitionKey> partitionKeys,
Expand Down Expand Up @@ -672,7 +668,7 @@ Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
InternalHiveSplitFactory splitFactory = new InternalHiveSplitFactory(
targetFilesystem,
partitionName,
targetInputFormat,
targetStorageFormat,
schema,
partitionKeys,
effectivePredicate,
Expand Down Expand Up @@ -805,14 +801,6 @@ private ListenableFuture<Void> addSplitsToSource(InputSplit[] targetSplits, Inte
return lastResult;
}

private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inputFormat)
{
return Arrays.stream(inputFormat.getClass().getAnnotations())
.map(Annotation::annotationType)
.map(Class::getSimpleName)
.anyMatch(name -> name.equals("UseFileSplitsFromInputFormat"));
}

private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(TrinoFileSystem fileSystem, Location location, InternalHiveSplitFactory splitFactory, boolean splittable, Optional<AcidInfo> acidInfo)
{
Iterator<TrinoFileStatus> iterator = new HiveFileIterator(table, location, fileSystem, directoryLister, hdfsNamenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED);
Expand Down Expand Up @@ -986,6 +974,18 @@ public static boolean hasAttemptId(String bucketFilename)
return matcher.matches() && matcher.group(2) != null;
}

private static HiveStorageFormat getSymlinkStorageFormat(String serde)
{
// LazySimpleSerDe is used by TEXTFILE and SEQUENCEFILE. Use TEXTFILE per Hive behavior.
if (serde.equals(TEXTFILE.getSerde())) {
return TEXTFILE;
}
return Arrays.stream(HiveStorageFormat.values())
.filter(format -> serde.equals(format.getSerde()))
.findFirst()
.orElseThrow(() -> new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Unknown SerDe for SymlinkTextInputFormat: " + serde));
}

private static List<Path> getTargetPathsFromSymlink(FileSystem fileSystem, Path symlinkDir)
{
try {
Expand Down Expand Up @@ -1029,14 +1029,6 @@ private static List<HivePartitionKey> getPartitionKeys(Table table, Optional<Par
return partitionKeys.build();
}

private static Properties getPartitionSchema(Table table, Optional<Partition> partition)
{
if (partition.isEmpty()) {
return getHiveSchema(table);
}
return getHiveSchema(partition.get(), table);
}

public static class BucketSplitInfo
{
private final BucketingVersion bucketingVersion;
Expand Down
Loading