Skip to content

Commit

Permalink
Remove usages of Hadoop Path for Hive LocationService
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed May 5, 2023
1 parent 28d3bb4 commit 8bd9f75
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive;

import io.trino.filesystem.Location;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.LocationHandle.WriteMode;
Expand Down Expand Up @@ -54,32 +55,32 @@ public HiveLocationService(HdfsEnvironment hdfsEnvironment)
}

@Override
public Path forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName)
public Location forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName)
{
HdfsContext context = new HdfsContext(session);
Path targetPath = getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName);
Location targetPath = getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName);

// verify the target directory for table
if (pathExists(context, hdfsEnvironment, targetPath)) {
if (pathExists(context, hdfsEnvironment, new Path(targetPath.toString()))) {
throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath));
}
return targetPath;
}

@Override
public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional<Path> externalLocation)
public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional<Location> externalLocation)
{
HdfsContext context = new HdfsContext(session);
Path targetPath = externalLocation.orElseGet(() -> getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName));
Location targetPath = externalLocation.orElseGet(() -> getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName));

// verify the target directory for the table
if (pathExists(context, hdfsEnvironment, targetPath)) {
if (pathExists(context, hdfsEnvironment, new Path(targetPath.toString()))) {
throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath));
}

// TODO detect when existing table's location is a on a different file system than the temporary directory
if (shouldUseTemporaryDirectory(session, context, targetPath, externalLocation)) {
Path writePath = createTemporaryPath(session, context, hdfsEnvironment, targetPath);
if (shouldUseTemporaryDirectory(session, context, new Path(targetPath.toString()), externalLocation.isPresent())) {
Location writePath = createTemporaryPath(session, context, hdfsEnvironment, new Path(targetPath.toString()));
return new LocationHandle(targetPath, writePath, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_NEW_DIRECTORY);
Expand All @@ -89,10 +90,10 @@ public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metasto
public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table)
{
HdfsContext context = new HdfsContext(session);
Path targetPath = new Path(table.getStorage().getLocation());
Location targetPath = Location.of(table.getStorage().getLocation());

if (shouldUseTemporaryDirectory(session, context, targetPath, Optional.empty()) && !isTransactionalTable(table.getParameters())) {
Path writePath = createTemporaryPath(session, context, hdfsEnvironment, targetPath);
if (shouldUseTemporaryDirectory(session, context, new Path(targetPath.toString()), false) && !isTransactionalTable(table.getParameters())) {
Location writePath = createTemporaryPath(session, context, hdfsEnvironment, new Path(targetPath.toString()));
return new LocationHandle(targetPath, writePath, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_EXISTING_DIRECTORY);
Expand All @@ -102,19 +103,19 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore,
public LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table)
{
// For OPTIMIZE write result files directly to table directory; that is needed by the commit logic in HiveMetadata#finishTableExecute
Path targetPath = new Path(table.getStorage().getLocation());
Location targetPath = Location.of(table.getStorage().getLocation());
return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_EXISTING_DIRECTORY);
}

private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path, Optional<Path> externalLocation)
private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path, boolean hasExternalLocation)
{
return isTemporaryStagingDirectoryEnabled(session)
// skip using temporary directory for S3
&& !isS3FileSystem(context, hdfsEnvironment, path)
// skip using temporary directory if destination is encrypted; it's not possible to move a file between encryption zones
&& !isHdfsEncrypted(context, hdfsEnvironment, path)
// Skip using temporary directory if destination is external. Target may be on a different file system.
&& externalLocation.isEmpty();
&& !hasExternalLocation;
}

@Override
Expand All @@ -138,27 +139,23 @@ public WriteInfo getPartitionWriteInfo(LocationHandle locationHandle, Optional<P
if (partition.isPresent()) {
// existing partition
WriteMode writeMode = locationHandle.getWriteMode();
Path targetPath = new Path(partition.get().getStorage().getLocation());
Path writePath = getPartitionWritePath(locationHandle, partitionName, writeMode, targetPath);
Location targetPath = Location.of(partition.get().getStorage().getLocation());
Location writePath = getPartitionWritePath(locationHandle, partitionName, writeMode, targetPath);
return new WriteInfo(targetPath, writePath, writeMode);
}
// new partition
return new WriteInfo(
new Path(locationHandle.getTargetPath(), partitionName),
new Path(locationHandle.getWritePath(), partitionName),
locationHandle.getTargetPath().appendPath(partitionName),
locationHandle.getWritePath().appendPath(partitionName),
locationHandle.getWriteMode());
}

private Path getPartitionWritePath(LocationHandle locationHandle, String partitionName, WriteMode writeMode, Path targetPath)
private static Location getPartitionWritePath(LocationHandle locationHandle, String partitionName, WriteMode writeMode, Location targetPath)
{
switch (writeMode) {
case STAGE_AND_MOVE_TO_TARGET_DIRECTORY:
return new Path(locationHandle.getWritePath(), partitionName);
case DIRECT_TO_TARGET_EXISTING_DIRECTORY:
return targetPath;
case DIRECT_TO_TARGET_NEW_DIRECTORY:
throw new UnsupportedOperationException(format("inserting into existing partition is not supported for %s", writeMode));
}
throw new UnsupportedOperationException("Unexpected write mode: " + writeMode);
return switch (writeMode) {
case STAGE_AND_MOVE_TO_TARGET_DIRECTORY -> locationHandle.getWritePath().appendPath(partitionName);
case DIRECT_TO_TARGET_EXISTING_DIRECTORY -> targetPath;
case DIRECT_TO_TARGET_NEW_DIRECTORY -> throw new UnsupportedOperationException(format("inserting into existing partition is not supported for %s", writeMode));
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
.collect(toImmutableList());
checkPartitionTypesSupported(partitionColumns);

Optional<Path> targetPath;
Optional<Location> targetPath;
boolean external;
String externalLocation = getExternalLocation(tableMetadata.getProperties());
if (externalLocation != null) {
Expand All @@ -993,8 +993,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

external = true;
targetPath = Optional.of(getExternalLocationAsPath(externalLocation));
checkExternalPath(new HdfsContext(session), targetPath.get());
targetPath = Optional.of(getValidatedExternalLocation(externalLocation));
checkExternalPath(new HdfsContext(session), new Path(targetPath.get().toString()));
}
else {
external = false;
Expand Down Expand Up @@ -1241,10 +1241,10 @@ private static String validateAvroSchemaLiteral(String avroSchemaLiteral)
}
}

private static Path getExternalLocationAsPath(String location)
private static Location getValidatedExternalLocation(String location)
{
try {
return new Path(location);
return Location.of(location);
}
catch (IllegalArgumentException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI: " + location, e);
Expand Down Expand Up @@ -1283,7 +1283,7 @@ private static Table buildTableObject(
List<String> partitionedBy,
Optional<HiveBucketProperty> bucketProperty,
Map<String, String> additionalTableParameters,
Optional<Path> targetPath,
Optional<Location> targetPath,
boolean external,
String prestoVersion,
boolean usingSystemSecurity)
Expand Down Expand Up @@ -1574,8 +1574,8 @@ private static List<String> canonicalizePartitionValues(String partitionName, Li
@Override
public HiveOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
Optional<Path> externalLocation = Optional.ofNullable(getExternalLocation(tableMetadata.getProperties()))
.map(HiveMetadata::getExternalLocationAsPath);
Optional<Location> externalLocation = Optional.ofNullable(getExternalLocation(tableMetadata.getProperties()))
.map(HiveMetadata::getValidatedExternalLocation);
if (!createsOfNonManagedTablesEnabled && externalLocation.isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "Creating non-managed Hive tables is disabled");
}
Expand Down Expand Up @@ -1662,7 +1662,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
retryMode != NO_RETRIES);

WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), schemaTableName);
metastore.declareIntentionToWrite(session, writeInfo.writeMode(), writeInfo.writePath(), schemaTableName);

return result;
}
Expand All @@ -1688,7 +1688,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
handle.getPartitionedBy(),
handle.getBucketProperty(),
handle.getAdditionalTableParameters(),
Optional.of(writeInfo.getTargetPath()),
Optional.of(writeInfo.targetPath()),
handle.isExternal(),
prestoVersion,
accessControlMetadata.isUsingSystemSecurity());
Expand Down Expand Up @@ -1733,6 +1733,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
tableStatistics = new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of());
}

Optional<Path> writePath = Optional.of(new Path(writeInfo.writePath().toString()));
if (handle.getPartitionedBy().isEmpty()) {
List<String> fileNames;
if (partitionUpdates.isEmpty()) {
Expand All @@ -1742,10 +1743,10 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
else {
fileNames = getOnlyElement(partitionUpdates).getFileNames();
}
metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), Optional.of(fileNames), false, tableStatistics, handle.isRetriesEnabled());
metastore.createTable(session, table, principalPrivileges, writePath, Optional.of(fileNames), false, tableStatistics, handle.isRetriesEnabled());
}
else {
metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), Optional.empty(), false, tableStatistics, false);
metastore.createTable(session, table, principalPrivileges, writePath, Optional.empty(), false, tableStatistics, false);
}

if (!handle.getPartitionedBy().isEmpty()) {
Expand Down Expand Up @@ -1968,7 +1969,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg

LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table);
WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
metastore.finishMerge(session, table.getDatabaseName(), table.getTableName(), writeInfo.getWritePath(), partitionMergeResults, partitions);
metastore.finishMerge(session, table.getDatabaseName(), table.getTableName(), writeInfo.writePath(), partitionMergeResults, partitions);
}

@Override
Expand Down Expand Up @@ -2039,7 +2040,7 @@ else if (isTransactional) {

WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
if (getInsertExistingPartitionsBehavior(session) == InsertExistingPartitionsBehavior.OVERWRITE
&& writeInfo.getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
&& writeInfo.writeMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
if (isTransactional) {
throw new TrinoException(NOT_SUPPORTED, "Overwriting existing partition in transactional tables doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode");
}
Expand All @@ -2049,7 +2050,7 @@ else if (isTransactional) {
throw new TrinoException(NOT_SUPPORTED, "Overwriting existing partition in non auto commit context doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode");
}
}
metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), tableName);
metastore.declareIntentionToWrite(session, writeInfo.writeMode(), writeInfo.writePath(), tableName);
return result;
}

Expand Down Expand Up @@ -2420,7 +2421,7 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
HiveTableHandle hiveSourceTableHandle = (HiveTableHandle) sourceTableHandle;

WriteInfo writeInfo = locationService.getQueryWriteInfo(hiveExecuteHandle.getLocationHandle());
String writeDeclarationId = metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), hiveExecuteHandle.getSchemaTableName());
String writeDeclarationId = metastore.declareIntentionToWrite(session, writeInfo.writeMode(), writeInfo.writePath(), hiveExecuteHandle.getSchemaTableName());

return new BeginTableExecuteResult<>(
hiveExecuteHandle
Expand Down
Loading

0 comments on commit 8bd9f75

Please sign in to comment.