Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify Delta, Iceberg do not call file length unnecessarily #18294

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Verify.verify;
import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_EXISTS;
import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_GET_LENGTH;
import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_NEW_STREAM;
Expand Down Expand Up @@ -107,6 +109,7 @@ public TrinoInputFile newInputFile(Location location)
int nextId = fileId.incrementAndGet();
return new TrackingInputFile(
delegate.newInputFile(location),
OptionalLong.empty(),
operation -> tracker.track(location, nextId, operation));
}

Expand All @@ -116,6 +119,7 @@ public TrinoInputFile newInputFile(Location location, long length)
int nextId = fileId.incrementAndGet();
return new TrackingInputFile(
delegate.newInputFile(location, length),
OptionalLong.of(length),
operation -> tracker.track(location, nextId, operation));
}

Expand Down Expand Up @@ -175,18 +179,27 @@ private static class TrackingInputFile
implements TrinoInputFile
{
private final TrinoInputFile delegate;
private final OptionalLong length;
private final Consumer<OperationType> tracker;

public TrackingInputFile(TrinoInputFile delegate, Consumer<OperationType> tracker)
public TrackingInputFile(TrinoInputFile delegate, OptionalLong length, Consumer<OperationType> tracker)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.length = requireNonNull(length, "length is null");
this.tracker = requireNonNull(tracker, "tracker is null");
}

@Override
public long length()
throws IOException
{
if (length.isPresent()) {
// Without TrinoInputFile, known length would be returned. This is additional verification
long actualLength = delegate.length();
verify(length.getAsLong() == actualLength, "Provided length does not match actual: %s != %s", length.getAsLong(), actualLength);
// No call tracking -- the filesystem call is for verification only. Normally it wouldn't take place.
return length.getAsLong();
}
tracker.accept(INPUT_FILE_GET_LENGTH);
return delegate.length();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.regex.Pattern;

import static com.google.inject.util.Modules.EMPTY_MODULE;
import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_GET_LENGTH;
import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_NEW_STREAM;
import static io.trino.plugin.base.util.Closables.closeAllSuppress;
import static io.trino.plugin.deltalake.TestDeltaLakeFileOperations.FileType.CDF_DATA;
Expand Down Expand Up @@ -112,8 +111,6 @@ public void testReadWholePartition()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 3) // TODO (https://github.com/trinodb/trino/issues/16780) why is last transaction log accessed more times than others?
.addCopies(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_GET_LENGTH), 2)
.addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_GET_LENGTH), 2)
.addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 2)
.addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 2)
.build());
Expand Down Expand Up @@ -187,15 +184,9 @@ public void testTableChangesFileSystemAccess()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", INPUT_FILE_NEW_STREAM), 2)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000007.json", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(CDF_DATA, "key=domain1/", INPUT_FILE_GET_LENGTH), 1)
.addCopies(new FileOperation(CDF_DATA, "key=domain2/", INPUT_FILE_GET_LENGTH), cdfFilesForDomain2)
.addCopies(new FileOperation(CDF_DATA, "key=domain3/", INPUT_FILE_GET_LENGTH), cdfFilesForDomain3)
.addCopies(new FileOperation(CDF_DATA, "key=domain1/", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(CDF_DATA, "key=domain2/", INPUT_FILE_NEW_STREAM), cdfFilesForDomain2)
.addCopies(new FileOperation(CDF_DATA, "key=domain3/", INPUT_FILE_NEW_STREAM), cdfFilesForDomain3)
.addCopies(new FileOperation(DATA, "key=domain1/", INPUT_FILE_GET_LENGTH), 1)
.addCopies(new FileOperation(DATA, "key=domain2/", INPUT_FILE_GET_LENGTH), 1)
.addCopies(new FileOperation(DATA, "key=domain3/", INPUT_FILE_GET_LENGTH), 1)
.addCopies(new FileOperation(DATA, "key=domain1/", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=domain2/", INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(DATA, "key=domain3/", INPUT_FILE_NEW_STREAM), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airlift.json.JsonModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.manager.FileSystemModule;
import io.trino.hdfs.HdfsModule;
import io.trino.hdfs.authentication.HdfsAuthenticationModule;
Expand Down Expand Up @@ -86,7 +87,7 @@ private InternalHiveConnectorFactory() {}

public static Connector createConnector(String catalogName, Map<String, String> config, ConnectorContext context, Module module)
{
return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty());
return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty(), Optional.empty());
}

public static Connector createConnector(
Expand All @@ -95,6 +96,7 @@ public static Connector createConnector(
ConnectorContext context,
Module module,
Optional<HiveMetastore> metastore,
Optional<TrinoFileSystemFactory> fileSystemFactory,
Optional<DirectoryLister> directoryLister)
{
requireNonNull(config, "config is null");
Expand All @@ -119,7 +121,9 @@ public static Connector createConnector(
new HiveMetastoreModule(metastore),
new HiveSecurityModule(),
new HdfsAuthenticationModule(),
new FileSystemModule(),
fileSystemFactory
.map(factory -> (Module) binder -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory))
.orElseGet(FileSystemModule::new),
new HiveProcedureModule(),
new MBeanServerModule(),
binder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
return createConnector(catalogName, config, context, module, metastore, directoryLister);
return createConnector(catalogName, config, context, module, metastore, Optional.empty(), directoryLister);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
ClassLoader classLoader = context.duplicatePluginClassLoader();
try {
return (Connector) classLoader.loadClass(InternalHudiConnectorFactory.class.getName())
.getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class)
.invoke(null, catalogName, config, context, Optional.empty());
.getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class, Optional.class)
.invoke(null, catalogName, config, context, Optional.empty(), Optional.empty());
}
catch (InvocationTargetException e) {
Throwable targetException = e.getTargetException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.manager.FileSystemModule;
import io.trino.hdfs.HdfsModule;
import io.trino.hdfs.authentication.HdfsAuthenticationModule;
Expand Down Expand Up @@ -59,7 +61,8 @@ public static Connector createConnector(
String catalogName,
Map<String, String> config,
ConnectorContext context,
Optional<HiveMetastore> metastore)
Optional<HiveMetastore> metastore,
Optional<TrinoFileSystemFactory> fileSystemFactory)
{
ClassLoader classLoader = InternalHudiConnectorFactory.class.getClassLoader();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Expand All @@ -73,7 +76,9 @@ public static Connector createConnector(
new HiveGcsModule(),
new HiveAzureModule(),
new HdfsAuthenticationModule(),
new FileSystemModule(),
fileSystemFactory
.map(factory -> (Module) binder -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory))
.orElseGet(FileSystemModule::new),
new MBeanServerModule(),
binder -> {
binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
return createConnector(catalogName, config, context, metastore);
return createConnector(catalogName, config, context, metastore, Optional.empty());
}
}
Loading