Skip to content

Commit

Permalink
Replace HiveIdentity in metadata APIs
Browse files Browse the repository at this point in the history
HiveIdentity is provided to metadata objects during construction.
The underlying ThriftHiveMetastore uses identity on a per call bases.
  • Loading branch information
dain committed Feb 7, 2022
1 parent 2509a59 commit 262c6ec
Show file tree
Hide file tree
Showing 64 changed files with 1,062 additions and 1,270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.trino.Session;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.security.PrincipalType;
Expand All @@ -28,7 +27,6 @@

import static io.trino.SystemSessionProperties.SPATIAL_PARTITIONING_TABLE_NAME;
import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;

Expand Down Expand Up @@ -79,7 +77,6 @@ protected DistributedQueryRunner createQueryRunner()
HiveMetastore metastore = createTestingFileHiveMetastore(baseDir);

metastore.createDatabase(
new HiveIdentity(SESSION),
Database.builder()
.setDatabaseName("default")
.setOwnerName(Optional.of("public"))
Expand Down
105 changes: 48 additions & 57 deletions plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.airlift.json.JsonCodec;
import io.airlift.units.Duration;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.MetastoreConfig;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
Expand Down Expand Up @@ -193,7 +192,7 @@ public HiveMetadataFactory(
public TransactionalMetadata create(ConnectorIdentity identity, boolean autoCommit)
{
HiveMetastoreClosure hiveMetastoreClosure = new HiveMetastoreClosure(
memoizeMetastore(metastoreFactory.createMetastore(Optional.of(identity)), new HiveIdentity(identity), perTransactionCacheMaximumSize)); // per-transaction cache
memoizeMetastore(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize)); // per-transaction cache

SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore(
hdfsEnvironment,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.airlift.event.client.EventClient;
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.HivePageSinkMetadataProvider;
import io.trino.plugin.hive.metastore.SortingColumn;
Expand Down Expand Up @@ -157,8 +156,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
session.getQueryId(),
new HivePageSinkMetadataProvider(
handle.getPageSinkMetadata(),
new HiveMetastoreClosure(memoizeMetastore(metastoreFactory.createMetastore(Optional.of(session.getIdentity())), new HiveIdentity(session), perTransactionMetastoreCacheMaximumSize)),
new HiveIdentity(session)),
new HiveMetastoreClosure(memoizeMetastore(metastoreFactory.createMetastore(Optional.of(session.getIdentity())), perTransactionMetastoreCacheMaximumSize))),
typeManager,
hdfsEnvironment,
pageSorter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.trino.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -76,7 +75,7 @@ public HivePartitionManager(
this.domainCompactionThreshold = domainCompactionThreshold;
}

public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastore, HiveIdentity identity, ConnectorTableHandle tableHandle, Constraint constraint)
public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastore, ConnectorTableHandle tableHandle, Constraint constraint)
{
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
TupleDomain<ColumnHandle> effectivePredicate = constraint.getSummary()
Expand Down Expand Up @@ -118,7 +117,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
.collect(toImmutableList());
}
else {
List<String> partitionNames = getFilteredPartitionNames(metastore, identity, tableName, partitionColumns, compactEffectivePredicate);
List<String> partitionNames = getFilteredPartitionNames(metastore, tableName, partitionColumns, compactEffectivePredicate);
partitionsIterable = () -> partitionNames.stream()
// Apply extra filters which could not be done by getFilteredPartitionNames
.map(partitionName -> parseValuesAndFilterPartition(tableName, partitionName, partitionColumns, partitionTypes, effectivePredicate, predicate))
Expand Down Expand Up @@ -198,10 +197,10 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio
handle.getMaxScannedFileSize());
}

public List<HivePartition> getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveIdentity identity, HiveTableHandle table)
public List<HivePartition> getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveTableHandle table)
{
return table.getPartitions().orElseGet(() ->
getPartitionsAsList(getPartitions(metastore, identity, table, new Constraint(table.getEnforcedConstraint()))));
getPartitionsAsList(getPartitions(metastore, table, new Constraint(table.getEnforcedConstraint()))));
}

private Optional<HivePartition> parseValuesAndFilterPartition(
Expand Down Expand Up @@ -241,14 +240,14 @@ public static boolean partitionMatches(List<HiveColumnHandle> partitionColumns,
return true;
}

private List<String> getFilteredPartitionNames(SemiTransactionalHiveMetastore metastore, HiveIdentity identity, SchemaTableName tableName, List<HiveColumnHandle> partitionKeys, TupleDomain<HiveColumnHandle> effectivePredicate)
private List<String> getFilteredPartitionNames(SemiTransactionalHiveMetastore metastore, SchemaTableName tableName, List<HiveColumnHandle> partitionKeys, TupleDomain<HiveColumnHandle> effectivePredicate)
{
List<String> columnNames = partitionKeys.stream()
.map(HiveColumnHandle::getName)
.collect(toImmutableList());
TupleDomain<String> partitionKeysFilter = computePartitionKeyFilter(partitionKeys, effectivePredicate);
// fetch the partition names
return metastore.getPartitionNamesByFilter(identity, tableName.getSchemaName(), tableName.getTableName(), columnNames, partitionKeysFilter)
return metastore.getPartitionNamesByFilter(tableName.getSchemaName(), tableName.getTableName(), columnNames, partitionKeysFilter)
.orElseThrow(() -> new TableNotFoundException(tableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
Expand Down Expand Up @@ -192,7 +191,7 @@ public ConnectorSplitSource getSplits(

// get table metadata
SemiTransactionalHiveMetastore metastore = transactionManager.get(transaction, session.getIdentity()).getMetastore();
Table table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName())
Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

// verify table is not marked as non-readable
Expand All @@ -202,7 +201,7 @@ public ConnectorSplitSource getSplits(
}

// get partitions
List<HivePartition> partitions = partitionManager.getOrLoadPartitions(metastore, new HiveIdentity(session), hiveTable);
List<HivePartition> partitions = partitionManager.getOrLoadPartitions(metastore, hiveTable);

// short circuit if we don't have any partitions
if (partitions.isEmpty()) {
Expand Down Expand Up @@ -313,7 +312,6 @@ private Iterable<HivePartitionMetadata> getPartitionMetadata(ConnectorSession se
Iterable<List<HivePartition>> partitionNameBatches = partitionExponentially(hivePartitions, minPartitionBatchSize, maxPartitionBatchSize);
Iterable<List<HivePartitionMetadata>> partitionBatches = transform(partitionNameBatches, partitionBatch -> {
Map<String, Optional<Partition>> batch = metastore.getPartitionsByNames(
new HiveIdentity(session),
tableName.getSchemaName(),
tableName.getTableName(),
Lists.transform(partitionBatch, HivePartition::getPartitionId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.hive;

import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -81,7 +80,7 @@ public Optional<SystemTable> getSystemTable(HiveMetadata metadata, ConnectorSess

SchemaTableName sourceTableName = PARTITIONS.getSourceTableName(tableName);
Table sourceTable = metadata.getMetastore()
.getTable(new HiveIdentity(session), sourceTableName.getSchemaName(), sourceTableName.getTableName())
.getTable(sourceTableName.getSchemaName(), sourceTableName.getTableName())
.orElse(null);
if (sourceTable == null || isDeltaLakeTable(sourceTable) || isIcebergTable(sourceTable)) {
return Optional.empty();
Expand Down Expand Up @@ -123,7 +122,7 @@ public Optional<SystemTable> getSystemTable(HiveMetadata metadata, ConnectorSess
constraint -> {
Constraint targetConstraint = new Constraint(constraint.transformKeys(fieldIdToColumnHandle::get));
Iterable<List<Object>> records = () ->
stream(partitionManager.getPartitions(metadata.getMetastore(), new HiveIdentity(session), sourceTableHandle, targetConstraint).getPartitions())
stream(partitionManager.getPartitions(metadata.getMetastore(), sourceTableHandle, targetConstraint).getPartitions())
.map(hivePartition ->
IntStream.range(0, partitionColumns.size())
.mapToObj(fieldIdToColumnHandle::get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -59,7 +58,7 @@ public Optional<SystemTable> getSystemTable(HiveMetadata metadata, ConnectorSess

SchemaTableName sourceTableName = PROPERTIES.getSourceTableName(tableName);
Table table = metadata.getMetastore()
.getTable(new HiveIdentity(session), sourceTableName.getSchemaName(), sourceTableName.getTableName())
.getTable(sourceTableName.getSchemaName(), sourceTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

if (isDeltaLakeTable(table) || isIcebergTable(table)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.airlift.json.JsonCodecFactory;
import io.airlift.json.ObjectMapperProvider;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.CoralSemiTransactionalHiveMSCAdapter;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
Expand Down Expand Up @@ -88,7 +87,7 @@ public static ViewReader createViewReader(
}

return new HiveViewReader(
new CoralSemiTransactionalHiveMSCAdapter(metastore, new HiveIdentity(session), coralTableRedirectionResolver(session, tableRedirectionResolver, metadataProvider)),
new CoralSemiTransactionalHiveMSCAdapter(metastore, coralTableRedirectionResolver(session, tableRedirectionResolver, metadataProvider)),
typeManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.hive.authentication;

import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.security.ConnectorIdentity;

import java.util.Objects;
Expand All @@ -33,11 +32,6 @@ private HiveIdentity()
this.username = Optional.empty();
}

public HiveIdentity(ConnectorSession session)
{
this(requireNonNull(session, "session is null").getIdentity());
}

public HiveIdentity(ConnectorIdentity identity)
{
requireNonNull(identity, "identity is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.linkedin.coral.hive.hive2rel.HiveMetastoreClient;
import io.trino.plugin.hive.CoralTableRedirectionResolver;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil;
import io.trino.spi.connector.SchemaTableName;
import org.apache.hadoop.hive.metastore.api.Database;
Expand All @@ -36,16 +35,13 @@ public class CoralSemiTransactionalHiveMSCAdapter
implements HiveMetastoreClient
{
private final SemiTransactionalHiveMetastore delegate;
private final HiveIdentity identity;
private final CoralTableRedirectionResolver tableRedirection;

public CoralSemiTransactionalHiveMSCAdapter(
SemiTransactionalHiveMetastore coralHiveMetastoreClient,
HiveIdentity identity,
CoralTableRedirectionResolver tableRedirection)
{
this.delegate = requireNonNull(coralHiveMetastoreClient, "coralHiveMetastoreClient is null");
this.identity = requireNonNull(identity, "identity is null");
this.tableRedirection = requireNonNull(tableRedirection, "tableRedirection is null");
}

Expand Down Expand Up @@ -78,7 +74,7 @@ public org.apache.hadoop.hive.metastore.api.Table getTable(String dbName, String
}
}

return delegate.getTable(identity, dbName, tableName)
return delegate.getTable(dbName, tableName)
.map(value -> ThriftMetastoreUtil.toMetastoreApiTable(value, NO_PRIVILEGES))
.orElse(null);
}
Expand Down
Loading

0 comments on commit 262c6ec

Please sign in to comment.