Skip to content

Commit

Permalink
(fixup) Apply 2nd review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Sep 17, 2019
1 parent 67a9c02 commit 525e514
Show file tree
Hide file tree
Showing 41 changed files with 1,053 additions and 1,321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.HiveHdfsConfiguration;
import io.prestosql.plugin.hive.HivePlugin;
import io.prestosql.plugin.hive.authentication.HiveContext;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.authentication.NoHdfsAuthentication;
import io.prestosql.plugin.hive.metastore.Database;
import io.prestosql.plugin.hive.metastore.file.FileHiveMetastore;
Expand Down Expand Up @@ -84,9 +84,8 @@ private static DistributedQueryRunner createQueryRunner()
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hiveConfig, new NoHdfsAuthentication());

FileHiveMetastore metastore = new FileHiveMetastore(hdfsEnvironment, baseDir.toURI().toString(), "test");
HiveContext hiveContext = new HiveContext(SESSION);
metastore.createDatabase(
hiveContext,
new HiveIdentity(SESSION),
Database.builder()
.setDatabaseName("default")
.setOwnerName("public")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.airlift.slice.Slices;
import io.prestosql.plugin.hive.LocationService.WriteInfo;
import io.prestosql.plugin.hive.PartitionUpdate.UpdateMode;
import io.prestosql.plugin.hive.authentication.HiveContext;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.classloader.ThreadContextClassLoader;
Expand Down Expand Up @@ -112,8 +112,7 @@ private void doCreateEmptyPartition(ConnectorSession session, String schema, Str
.map(String.class::cast)
.collect(toImmutableList());

HiveContext context = new HiveContext(session);
if (metastore.getPartition(context, schema, table, partitionStringValues).isPresent()) {
if (metastore.getPartition(new HiveIdentity(session), schema, table, partitionStringValues).isPresent()) {
throw new PrestoException(ALREADY_EXISTS, "Partition already exists");
}
String partitionName = FileUtils.makePartName(actualPartitionColumnNames, partitionStringValues);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airlift.event.client.EventClient;
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider;
import io.prestosql.plugin.hive.metastore.SortingColumn;
Expand Down Expand Up @@ -146,7 +147,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
handle.getLocationHandle(),
locationService,
session.getQueryId(),
new HivePageSinkMetadataProvider(session, handle.getPageSinkMetadata(), memoizeMetastore(metastore, perTransactionMetastoreCacheMaximumSize)),
new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(), memoizeMetastore(metastore, perTransactionMetastoreCacheMaximumSize), new HiveIdentity(session)),
typeManager,
hdfsEnvironment,
pageSorter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airlift.slice.Slice;
import io.prestosql.plugin.hive.authentication.HiveContext;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -118,9 +117,8 @@ public HivePartitionManager(
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

public HivePartitionResult getPartitions(ConnectorSession session, SemiTransactionalHiveMetastore metastore, ConnectorTableHandle tableHandle, Constraint constraint)
public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastore, HiveIdentity identity, ConnectorTableHandle tableHandle, Constraint constraint)
{
HiveContext context = new HiveContext(session);
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
TupleDomain<ColumnHandle> effectivePredicate = constraint.getSummary()
.intersect(hiveTableHandle.getEnforcedConstraint());
Expand All @@ -133,7 +131,7 @@ public HivePartitionResult getPartitions(ConnectorSession session, SemiTransacti
return new HivePartitionResult(partitionColumns, ImmutableList.of(), none(), none(), none(), hiveBucketHandle, Optional.empty());
}

Table table = metastore.getTable(context, tableName.getSchemaName(), tableName.getTableName())
Table table = metastore.getTable(identity, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketFilter> bucketFilter = getHiveBucketFilter(table, effectivePredicate);
Expand Down Expand Up @@ -162,7 +160,7 @@ public HivePartitionResult getPartitions(ConnectorSession session, SemiTransacti
.collect(toImmutableList());
}
else {
List<String> partitionNames = getFilteredPartitionNames(context, metastore, tableName, partitionColumns, effectivePredicate);
List<String> partitionNames = getFilteredPartitionNames(identity, metastore, tableName, partitionColumns, effectivePredicate);
partitionsIterable = () -> partitionNames.stream()
// Apply extra filters which could not be done by getFilteredPartitionNames
.map(partitionName -> parseValuesAndFilterPartition(tableName, partitionName, partitionColumns, partitionTypes, effectivePredicate, predicate))
Expand Down Expand Up @@ -235,10 +233,10 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio
handle.getAnalyzePartitionValues());
}

public List<HivePartition> getOrLoadPartitions(ConnectorSession session, SemiTransactionalHiveMetastore metastore, HiveTableHandle table)
public List<HivePartition> getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveIdentity identity, HiveTableHandle table)
{
return table.getPartitions().orElseGet(() ->
getPartitionsAsList(getPartitions(session, metastore, table, new Constraint(table.getEnforcedConstraint()))));
getPartitionsAsList(getPartitions(metastore, identity, table, new Constraint(table.getEnforcedConstraint()))));
}

private static TupleDomain<HiveColumnHandle> toCompactTupleDomain(TupleDomain<ColumnHandle> effectivePredicate, int threshold)
Expand Down Expand Up @@ -290,7 +288,7 @@ private boolean partitionMatches(List<HiveColumnHandle> partitionColumns, TupleD
return constraint.test(partition.getKeys());
}

private List<String> getFilteredPartitionNames(HiveContext context, SemiTransactionalHiveMetastore metastore, SchemaTableName tableName, List<HiveColumnHandle> partitionKeys, TupleDomain<ColumnHandle> effectivePredicate)
private List<String> getFilteredPartitionNames(HiveIdentity identity, SemiTransactionalHiveMetastore metastore, SchemaTableName tableName, List<HiveColumnHandle> partitionKeys, TupleDomain<ColumnHandle> effectivePredicate)
{
checkArgument(effectivePredicate.getDomains().isPresent());

Expand Down Expand Up @@ -353,7 +351,7 @@ else if (type instanceof TinyintType
}

// fetch the partition names
return metastore.getPartitionNamesByParts(context, tableName.getSchemaName(), tableName.getTableName(), filter)
return metastore.getPartitionNamesByParts(identity, tableName.getSchemaName(), tableName.getTableName(), filter)
.orElseThrow(() -> new TableNotFoundException(tableName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.prestosql.plugin.hive.authentication.HiveContext;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.Column;
import io.prestosql.plugin.hive.metastore.Partition;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
Expand Down Expand Up @@ -176,7 +176,7 @@ public ConnectorSplitSource getSplits(

// get table metadata
SemiTransactionalHiveMetastore metastore = metastoreProvider.apply((HiveTransactionHandle) transaction);
Table table = metastore.getTable(new HiveContext(session), tableName.getSchemaName(), tableName.getTableName())
Table table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

// verify table is not marked as non-readable
Expand All @@ -186,7 +186,7 @@ public ConnectorSplitSource getSplits(
}

// get partitions
List<HivePartition> partitions = partitionManager.getOrLoadPartitions(session, metastore, hiveTable);
List<HivePartition> partitions = partitionManager.getOrLoadPartitions(metastore, new HiveIdentity(session), hiveTable);

// short circuit if we don't have any partitions
if (partitions.isEmpty()) {
Expand Down Expand Up @@ -279,7 +279,7 @@ private Iterable<HivePartitionMetadata> getPartitionMetadata(ConnectorSession se
Iterable<List<HivePartition>> partitionNameBatches = partitionExponentially(hivePartitions, minPartitionBatchSize, maxPartitionBatchSize);
Iterable<List<HivePartitionMetadata>> partitionBatches = transform(partitionNameBatches, partitionBatch -> {
Map<String, Optional<Partition>> batch = metastore.getPartitionsByNames(
new HiveContext(session),
new HiveIdentity(session),
tableName.getSchemaName(),
tableName.getTableName(),
Lists.transform(partitionBatch, HivePartition::getPartitionId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.prestosql.plugin.hive.authentication.HiveContext;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.Column;
import io.prestosql.plugin.hive.metastore.Partition;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
Expand Down Expand Up @@ -106,11 +106,11 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName
{
SyncMode syncMode = toSyncMode(mode);
HdfsContext hdfsContext = new HdfsContext(session, schemaName, tableName);
HiveContext hiveContext = new HiveContext(session);
HiveIdentity identity = new HiveIdentity(session);
SemiTransactionalHiveMetastore metastore = ((HiveMetadata) hiveMetadataFactory.get()).getMetastore();
SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName);

Table table = metastore.getTable(hiveContext, schemaName, tableName)
Table table = metastore.getTable(identity, schemaName, tableName)
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
if (table.getPartitionColumns().isEmpty()) {
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Table is not partitioned: " + schemaTableName);
Expand All @@ -122,7 +122,7 @@ private void doSyncPartitionMetadata(ConnectorSession session, String schemaName

try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, tableLocation);
List<String> partitionsInMetastore = metastore.getPartitionNames(hiveContext, schemaName, tableName)
List<String> partitionsInMetastore = metastore.getPartitionNames(identity, schemaName, tableName)
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
List<String> partitionsInFileSystem = listDirectory(fileSystem, fileSystem.getFileStatus(tableLocation), table.getPartitionColumns(), table.getPartitionColumns().size()).stream()
.map(fileStatus -> fileStatus.getPath().toUri())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,19 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public final class HiveContext
public final class HiveIdentity
{
private final String username;

public HiveContext(ConnectorIdentity identity)
public HiveIdentity(ConnectorSession session)
{
requireNonNull(identity, "identity is null");
this.username = requireNonNull(identity.getUser(), "identity.getUser() is null");
this(requireNonNull(session, "session is null").getIdentity());
}

public HiveContext(ConnectorSession session)
public HiveIdentity(ConnectorIdentity identity)
{
requireNonNull(session, "session is null");
this.username = requireNonNull(session.getIdentity().getUser(), "session.getIdentity().getUser() is null");
requireNonNull(identity, "identity is null");
this.username = requireNonNull(identity.getUser(), "identity.getUser() is null");
}

public String getUsername()
Expand All @@ -60,7 +59,7 @@ public boolean equals(Object o)
return false;
}

HiveContext other = (HiveContext) o;
HiveIdentity other = (HiveIdentity) o;
return Objects.equals(username, other.username);
}

Expand Down
Loading

0 comments on commit 525e514

Please sign in to comment.