Skip to content

Commit

Permalink
Remove usages of Path from Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed May 15, 2023
1 parent db43ee8 commit e053284
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.format.CompressionCodec;
import org.joda.time.DateTimeZone;
import org.roaringbitmap.longlong.LongBitmapDataProvider;
Expand Down Expand Up @@ -70,6 +69,7 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.relativePath;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getCompressionCodec;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterBlockSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetWriterPageSize;
Expand Down Expand Up @@ -322,16 +322,16 @@ public CompletableFuture<Collection<Slice>> finish()
}

// In spite of the name "Delta" Lake, we must rewrite the entire file to delete rows.
private List<Slice> rewriteFile(String sourceLocationPath, FileDeletion deletion)
private List<Slice> rewriteFile(String sourcePath, FileDeletion deletion)
{
try {
Path sourcePath = new Path(sourceLocationPath);
Path rootTablePath = new Path(rootTableLocation.toString());
String sourceRelativePath = rootTablePath.toUri().relativize(sourcePath.toUri()).toString();
String tablePath = rootTableLocation.toString();
Location sourceLocation = Location.of(sourcePath);
String sourceRelativePath = relativePath(tablePath, sourcePath);

Path targetPath = new Path(sourcePath.getParent(), session.getQueryId() + "_" + randomUUID());
String targetRelativePath = rootTablePath.toUri().relativize(targetPath.toUri()).toString();
FileWriter fileWriter = createParquetFileWriter(Location.of(targetPath.toString()), dataColumns);
Location targetLocation = sourceLocation.parentDirectory().appendPath(session.getQueryId() + "_" + randomUUID());
String targetRelativePath = relativePath(tablePath, targetLocation.toString());
FileWriter fileWriter = createParquetFileWriter(targetLocation, dataColumns);

DeltaLakeWriter writer = new DeltaLakeWriter(
fileSystem,
Expand All @@ -343,7 +343,7 @@ private List<Slice> rewriteFile(String sourceLocationPath, FileDeletion deletion
dataColumns,
DATA);

Optional<DataFileInfo> newFileInfo = rewriteParquetFile(Location.of(sourcePath.toString()), deletion, writer);
Optional<DataFileInfo> newFileInfo = rewriteParquetFile(sourceLocation, deletion, writer);

DeltaLakeMergeResult result = new DeltaLakeMergeResult(Optional.of(sourceRelativePath), newFileInfo);
return ImmutableList.of(utf8Slice(mergeResultJsonCodec.toJson(result)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,12 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.VarcharType;
import org.apache.hadoop.fs.Path;

import javax.annotation.Nullable;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Collection;
Expand Down Expand Up @@ -1345,13 +1346,7 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode)
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
if (!allowWrite(session, table)) {
String fileSystem = new Path(table.getLocation()).toUri().getScheme();
throw new TrinoException(
NOT_SUPPORTED,
format("Inserts are not enabled on the %1$s filesystem in order to avoid eventual data corruption which may be caused by concurrent data modifications on the table. " +
"Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY));
}
checkWriteAllowed(session, table);
checkWriteSupported(session, table);

List<DeltaLakeColumnHandle> inputColumns = columns.stream()
Expand Down Expand Up @@ -1511,13 +1506,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
if (isAppendOnly(handle.getMetadataEntry())) {
throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true");
}
if (!allowWrite(session, handle)) {
String fileSystem = new Path(handle.getLocation()).toUri().getScheme();
throw new TrinoException(
NOT_SUPPORTED,
format("Updates are not enabled on the %1$s filesystem in order to avoid eventual data corruption which may be caused by concurrent data modifications on the table. " +
"Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY));
}
checkWriteAllowed(session, handle);
if (!getColumnInvariants(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Updates are not supported for tables with delta invariants");
}
Expand Down Expand Up @@ -1734,13 +1723,7 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
{
DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.getProcedureHandle();

if (!allowWrite(session, table)) {
String fileSystem = new Path(table.getLocation()).toUri().getScheme();
throw new TrinoException(
NOT_SUPPORTED,
format("Optimize is not enabled on the %1$s filesystem in order to avoid eventual data corruption which may be caused by concurrent data modifications on the table. " +
"Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY));
}
checkWriteAllowed(session, table);
checkSupportedWriterVersion(session, table);
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
if (columnMappingMode != NONE) {
Expand Down Expand Up @@ -1799,7 +1782,7 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
long writeTimestamp = Instant.now().toEpochMilli();

for (String scannedPath : scannedPaths) {
String relativePath = new Path(tableLocation).toUri().relativize(new Path(scannedPath).toUri()).toString();
String relativePath = relativePath(tableLocation, scannedPath);
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(relativePath, writeTimestamp, false));
}

Expand All @@ -1821,6 +1804,17 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
}
}

private void checkWriteAllowed(ConnectorSession session, DeltaLakeTableHandle table)
{
if (!allowWrite(session, table)) {
String fileSystem = Location.of(table.getLocation()).scheme().orElse("unknown");
throw new TrinoException(
NOT_SUPPORTED,
format("Writes are not enabled on the %1$s filesystem in order to avoid eventual data corruption which may be caused by concurrent data modifications on the table. " +
"Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY));
}
}

private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableHandle)
{
try {
Expand Down Expand Up @@ -2197,7 +2191,20 @@ private void setRollback(Runnable action)

private static String toUriFormat(String path)
{
return new Path(path).toUri().toString();
verify(!path.startsWith("/") && !path.contains(":/"), "unexpected path: %s", path);
try {
return new URI(null, null, path, null).toString();
}
catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid path: " + path, e);
}
}

static String relativePath(String basePath, String path)
{
checkArgument(path.startsWith(basePath + "/") && (path.length() >= (basePath.length() + 2)),
"path [%s] must be a subdirectory of basePath [%s]", path, basePath);
return toUriFormat(path.substring(basePath.length() + 1));
}

public void rollback()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
package io.trino.plugin.deltalake.transactionlog.writer;

import com.google.common.collect.ImmutableMap;
import io.trino.filesystem.Location;
import io.trino.spi.TrinoException;
import org.apache.hadoop.fs.Path;

import javax.inject.Inject;

import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand All @@ -43,8 +42,8 @@ public TransactionLogSynchronizerManager(

public TransactionLogSynchronizer getSynchronizer(String tableLocation)
{
String uriScheme = new Path(tableLocation).toUri().getScheme();
checkArgument(uriScheme != null, "URI scheme undefined for " + tableLocation);
String uriScheme = Location.of(tableLocation).scheme()
.orElseThrow(() -> new IllegalArgumentException("URI scheme undefined for " + tableLocation));
TransactionLogSynchronizer synchronizer = synchronizers.get(uriScheme.toLowerCase(ENGLISH));
if (synchronizer == null) {
throw new TrinoException(NOT_SUPPORTED, format("Cannot write to table in %s; %s not supported", tableLocation, uriScheme));
Expand Down

0 comments on commit e053284

Please sign in to comment.