Skip to content

Commit

Permalink
Optionally plug TrinoFileSystemFactory in Hive, Hudi
Browse files Browse the repository at this point in the history
Like in Delta, Iceberg. Prepare for file operations testing.
  • Loading branch information
findepi committed Jul 14, 2023
1 parent b2c566e commit 60a712e
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airlift.json.JsonModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.manager.FileSystemModule;
import io.trino.hdfs.HdfsModule;
import io.trino.hdfs.authentication.HdfsAuthenticationModule;
Expand Down Expand Up @@ -86,7 +87,7 @@ private InternalHiveConnectorFactory() {}

public static Connector createConnector(String catalogName, Map<String, String> config, ConnectorContext context, Module module)
{
return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty());
return createConnector(catalogName, config, context, module, Optional.empty(), Optional.empty(), Optional.empty());
}

public static Connector createConnector(
Expand All @@ -95,6 +96,7 @@ public static Connector createConnector(
ConnectorContext context,
Module module,
Optional<HiveMetastore> metastore,
Optional<TrinoFileSystemFactory> fileSystemFactory,
Optional<DirectoryLister> directoryLister)
{
requireNonNull(config, "config is null");
Expand All @@ -119,7 +121,9 @@ public static Connector createConnector(
new HiveMetastoreModule(metastore),
new HiveSecurityModule(),
new HdfsAuthenticationModule(),
new FileSystemModule(),
fileSystemFactory
.map(factory -> (Module) binder -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory))
.orElseGet(FileSystemModule::new),
new HiveProcedureModule(),
new MBeanServerModule(),
binder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
return createConnector(catalogName, config, context, module, metastore, directoryLister);
return createConnector(catalogName, config, context, module, metastore, Optional.empty(), directoryLister);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
ClassLoader classLoader = context.duplicatePluginClassLoader();
try {
return (Connector) classLoader.loadClass(InternalHudiConnectorFactory.class.getName())
.getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class)
.invoke(null, catalogName, config, context, Optional.empty());
.getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class, Optional.class)
.invoke(null, catalogName, config, context, Optional.empty(), Optional.empty());
}
catch (InvocationTargetException e) {
Throwable targetException = e.getTargetException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.manager.FileSystemModule;
import io.trino.hdfs.HdfsModule;
import io.trino.hdfs.authentication.HdfsAuthenticationModule;
Expand Down Expand Up @@ -59,7 +61,8 @@ public static Connector createConnector(
String catalogName,
Map<String, String> config,
ConnectorContext context,
Optional<HiveMetastore> metastore)
Optional<HiveMetastore> metastore,
Optional<TrinoFileSystemFactory> fileSystemFactory)
{
ClassLoader classLoader = InternalHudiConnectorFactory.class.getClassLoader();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Expand All @@ -73,7 +76,9 @@ public static Connector createConnector(
new HiveGcsModule(),
new HiveAzureModule(),
new HdfsAuthenticationModule(),
new FileSystemModule(),
fileSystemFactory
.map(factory -> (Module) binder -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory))
.orElseGet(FileSystemModule::new),
new MBeanServerModule(),
binder -> {
binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
return createConnector(catalogName, config, context, metastore);
return createConnector(catalogName, config, context, metastore, Optional.empty());
}
}

0 comments on commit 60a712e

Please sign in to comment.