Skip to content

Commit

Permalink
Remove usages of Hadoop Path for PartitionUpdate
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jul 27, 2023
1 parent 0e2e237 commit 02f1a63
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2157,7 +2157,15 @@ private Table finishChangingTable(AcidOperation acidOperation, String changeDesc
metastore.dropTable(session, handle.getSchemaName(), handle.getTableName());

// create the table with the new location
metastore.createTable(session, table, principalPrivileges, Optional.of(partitionUpdate.getWritePath()), Optional.of(partitionUpdate.getFileNames()), false, partitionStatistics, handle.isRetriesEnabled());
metastore.createTable(
session,
table,
principalPrivileges,
Optional.of(new Path(partitionUpdate.getWritePath().toString())),
Optional.of(partitionUpdate.getFileNames()),
false,
partitionStatistics,
handle.isRetriesEnabled());
}
else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode() == APPEND) {
// insert into unpartitioned table
Expand Down Expand Up @@ -2243,10 +2251,11 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
return table;
}

private void removeNonCurrentQueryFiles(ConnectorSession session, Path partitionPath)
private void removeNonCurrentQueryFiles(ConnectorSession session, Location partitionLocation)
{
String queryId = session.getQueryId();
try {
Path partitionPath = new Path(partitionLocation.toString());
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session), partitionPath);
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(partitionPath, false);
while (iterator.hasNext()) {
Expand All @@ -2259,7 +2268,7 @@ private void removeNonCurrentQueryFiles(ConnectorSession session, Path partition
catch (Exception ex) {
throw new TrinoException(
HIVE_FILESYSTEM_ERROR,
format("Failed to delete partition %s files during overwrite", partitionPath),
format("Failed to delete partition %s files during overwrite", partitionLocation),
ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimaps;
import io.trino.filesystem.Location;
import io.trino.spi.TrinoException;
import org.apache.hadoop.fs.Path;

import java.util.Collection;
import java.util.List;
Expand All @@ -33,8 +33,8 @@ public class PartitionUpdate
{
private final String name;
private final UpdateMode updateMode;
private final Path writePath;
private final Path targetPath;
private final Location writePath;
private final Location targetPath;
private final List<String> fileNames;
private final long rowCount;
private final long inMemoryDataSizeInBytes;
Expand All @@ -54,8 +54,8 @@ public PartitionUpdate(
this(
name,
updateMode,
new Path(requireNonNull(writePath, "writePath is null")),
new Path(requireNonNull(targetPath, "targetPath is null")),
Location.of(requireNonNull(writePath, "writePath is null")),
Location.of(requireNonNull(targetPath, "targetPath is null")),
fileNames,
rowCount,
inMemoryDataSizeInBytes,
Expand All @@ -65,8 +65,8 @@ public PartitionUpdate(
public PartitionUpdate(
String name,
UpdateMode updateMode,
Path writePath,
Path targetPath,
Location writePath,
Location targetPath,
List<String> fileNames,
long rowCount,
long inMemoryDataSizeInBytes,
Expand Down Expand Up @@ -101,12 +101,12 @@ public UpdateMode getUpdateMode()
return updateMode;
}

public Path getWritePath()
public Location getWritePath()
{
return writePath;
}

public Path getTargetPath()
public Location getTargetPath()
{
return targetPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ public synchronized void finishChangingExistingTable(
ConnectorSession session,
String databaseName,
String tableName,
Path currentLocation,
Location currentLocation,
List<String> fileNames,
PartitionStatistics statisticsUpdate,
boolean cleanExtraOutputFilesOnCommit)
Expand All @@ -704,7 +704,7 @@ public synchronized void finishChangingExistingTable(
new TableAndMore(
table,
Optional.empty(),
Optional.of(currentLocation),
Optional.of(new Path(currentLocation.toString())),
Optional.of(fileNames),
false,
merge(currentStatistics, statisticsUpdate),
Expand Down Expand Up @@ -967,7 +967,7 @@ public synchronized void addPartition(
String databaseName,
String tableName,
Partition partition,
Path currentLocation,
Location currentLocation,
Optional<List<String>> files,
PartitionStatistics statistics,
boolean cleanExtraOutputFilesOnCommit)
Expand Down Expand Up @@ -1919,7 +1919,7 @@ private void prepareAlterPartition(HdfsContext hdfsContext, String queryId, Part
}
}

Path currentPath = partitionAndMore.getCurrentLocation();
Path currentPath = new Path(partitionAndMore.getCurrentLocation().toString());
Path targetPath = new Path(targetLocation);
if (!targetPath.equals(currentPath)) {
renameDirectory(
Expand Down Expand Up @@ -1953,7 +1953,7 @@ private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Tabl
}
Path tableLocation = tableAndMore.getCurrentLocation().orElseThrow(() -> new IllegalArgumentException("currentLocation expected to be set if isCleanExtraOutputFilesOnCommit is true"));
List<String> files = tableAndMore.getFileNames().orElseThrow(() -> new IllegalArgumentException("fileNames expected to be set if isCleanExtraOutputFilesOnCommit is true"));
SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, tableLocation, ImmutableSet.copyOf(files));
SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, Location.of(tableLocation.toString()), ImmutableSet.copyOf(files));
}

private PartitionStatistics getExistingPartitionStatistics(Partition partition, String partitionName)
Expand Down Expand Up @@ -1988,7 +1988,7 @@ private void prepareAddPartition(HdfsContext hdfsContext, String queryId, Partit

Partition partition = partitionAndMore.getPartition();
String targetLocation = partition.getStorage().getLocation();
Path currentPath = partitionAndMore.getCurrentLocation();
Path currentPath = new Path(partitionAndMore.getCurrentLocation().toString());
Path targetPath = new Path(targetLocation);

cleanExtraOutputFiles(hdfsContext, queryId, partitionAndMore);
Expand Down Expand Up @@ -2028,7 +2028,7 @@ private void prepareInsertExistingPartition(HdfsContext hdfsContext, String quer
Partition partition = partitionAndMore.getPartition();
partitionsToInvalidate.add(partition);
Path targetPath = new Path(partition.getStorage().getLocation());
Path currentPath = partitionAndMore.getCurrentLocation();
Path currentPath = new Path(partitionAndMore.getCurrentLocation().toString());
cleanUpTasksForAbort.add(new DirectoryCleanUpTask(hdfsContext, targetPath, false));

if (!targetPath.equals(currentPath)) {
Expand Down Expand Up @@ -2950,13 +2950,13 @@ public String toString()
private static class PartitionAndMore
{
private final Partition partition;
private final Path currentLocation;
private final Location currentLocation;
private final Optional<List<String>> fileNames;
private final PartitionStatistics statistics;
private final PartitionStatistics statisticsUpdate;
private final boolean cleanExtraOutputFilesOnCommit;

public PartitionAndMore(Partition partition, Path currentLocation, Optional<List<String>> fileNames, PartitionStatistics statistics, PartitionStatistics statisticsUpdate, boolean cleanExtraOutputFilesOnCommit)
public PartitionAndMore(Partition partition, Location currentLocation, Optional<List<String>> fileNames, PartitionStatistics statistics, PartitionStatistics statisticsUpdate, boolean cleanExtraOutputFilesOnCommit)
{
this.partition = requireNonNull(partition, "partition is null");
this.currentLocation = requireNonNull(currentLocation, "currentLocation is null");
Expand All @@ -2971,7 +2971,7 @@ public Partition getPartition()
return partition;
}

public Path getCurrentLocation()
public Location getCurrentLocation()
{
return currentLocation;
}
Expand Down Expand Up @@ -3644,8 +3644,9 @@ public void commitTransaction(long transactionId)
delegate.commitTransaction(transactionId);
}

public static void cleanExtraOutputFiles(HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, String queryId, Path path, Set<String> filesToKeep)
public static void cleanExtraOutputFiles(HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, String queryId, Location location, Set<String> filesToKeep)
{
Path path = new Path(location.toString());
List<String> filesToDelete = new LinkedList<>();
try {
log.debug("Deleting failed attempt files from %s for query %s", path, queryId);
Expand Down Expand Up @@ -3703,7 +3704,7 @@ public static void cleanExtraOutputFiles(HdfsEnvironment hdfsEnvironment, HdfsCo
}
}

public record PartitionUpdateInfo(List<String> partitionValues, Path currentLocation, List<String> fileNames, PartitionStatistics statisticsUpdate)
public record PartitionUpdateInfo(List<String> partitionValues, Location currentLocation, List<String> fileNames, PartitionStatistics statisticsUpdate)
{
public PartitionUpdateInfo
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.trino.filesystem.Location;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.HiveConfig;
Expand Down Expand Up @@ -149,7 +150,7 @@ private void doRegisterPartition(ConnectorSession session, ConnectorAccessContro
table.getDatabaseName(),
table.getTableName(),
buildPartitionObject(session, table, partitionValues, partitionLocation),
partitionLocation,
Location.of(partitionLocation.toString()),
Optional.empty(), // no need for failed attempts cleanup
PartitionStatistics.empty(),
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.trino.filesystem.Location;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.PartitionStatistics;
Expand Down Expand Up @@ -240,7 +241,7 @@ private static void addPartitions(
table.getDatabaseName(),
table.getTableName(),
buildPartitionObject(session, table, name),
new Path(table.getStorage().getLocation(), name),
Location.of(table.getStorage().getLocation()).appendPath(name),
Optional.empty(), // no need for failed attempts cleanup
PartitionStatistics.empty(),
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6296,8 +6296,8 @@ public void triggerConflict(ConnectorSession session, SchemaTableName tableName,
throws IOException
{
for (PartitionUpdate partitionUpdate : partitionUpdates) {
if ("pk2=insert2".equals(partitionUpdate.getTargetPath().getName())) {
path = new Path(partitionUpdate.getTargetPath(), partitionUpdate.getFileNames().get(0));
if ("pk2=insert2".equals(partitionUpdate.getTargetPath().fileName())) {
path = new Path(partitionUpdate.getTargetPath().toString(), partitionUpdate.getFileNames().get(0));
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.trino.filesystem.Location;
import io.trino.plugin.hive.PartitionUpdate.UpdateMode;
import org.apache.hadoop.fs.Path;
import org.testng.annotations.Test;

import static io.airlift.json.JsonCodec.jsonCodec;
Expand All @@ -43,8 +43,8 @@ public void testRoundTrip()

assertEquals(actual.getName(), "test");
assertEquals(actual.getUpdateMode(), UpdateMode.APPEND);
assertEquals(actual.getWritePath(), new Path("/writePath"));
assertEquals(actual.getTargetPath(), new Path("/targetPath"));
assertEquals(actual.getWritePath(), Location.of("/writePath"));
assertEquals(actual.getTargetPath(), Location.of("/targetPath"));
assertEquals(actual.getFileNames(), ImmutableList.of("file1", "file3"));
assertEquals(actual.getRowCount(), 123);
assertEquals(actual.getInMemoryDataSizeInBytes(), 456);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.filesystem.Location;
import io.trino.plugin.hive.HiveBucketProperty;
import io.trino.plugin.hive.HiveMetastoreClosure;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.PartitionStatistics;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.fs.FileSystemDirectoryLister;
import org.apache.hadoop.fs.Path;
import org.testng.annotations.Test;

import java.util.List;
Expand Down Expand Up @@ -53,7 +53,7 @@ public class TestSemiTransactionalHiveMetastore
Optional.of("comment"));
private static final Storage TABLE_STORAGE = new Storage(
StorageFormat.create("serde", "input", "output"),
Optional.of("location"),
Optional.of("/test"),
Optional.of(new HiveBucketProperty(ImmutableList.of("column"), BUCKETING_V1, 10, ImmutableList.of(new SortingColumn("column", SortingColumn.Order.ASCENDING)))),
true,
ImmutableMap.of("param", "value2"));
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testParallelUpdateStatisticsOperations()
IntStream.range(0, tablesToUpdate).forEach(i -> semiTransactionalHiveMetastore.finishChangingExistingTable(INSERT, SESSION,
"database",
"table_" + i,
new Path("location"),
Location.of(TABLE_STORAGE.getLocation()),
ImmutableList.of(),
PartitionStatistics.empty(),
false));
Expand Down

0 comments on commit 02f1a63

Please sign in to comment.