Skip to content

Commit

Permalink
Add debug support for offline table and partition
Browse files Browse the repository at this point in the history
  • Loading branch information
wenleix committed Sep 5, 2019
1 parent 3379b4f commit ecdbbb3
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.getVirtualBucketCount;
import static com.facebook.presto.hive.HiveSessionProperties.isBucketExecutionEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite;
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount;
import static com.facebook.presto.hive.HiveSessionProperties.isRespectTableFormat;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled;
Expand Down Expand Up @@ -335,7 +336,10 @@ public HiveTableHandle getTableHandle(ConnectorSession session, SchemaTableName
throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, format("Unexpected table %s present in Hive metastore", tableName));
}

verifyOnline(tableName, Optional.empty(), getProtectMode(table.get()), table.get().getParameters());
if (!isOfflineDataDebugModeEnabled(session)) {
verifyOnline(tableName, Optional.empty(), getProtectMode(table.get()), table.get().getParameters());
}

return new HiveTableHandle(tableName.getSchemaName(), tableName.getTableName());
}

Expand Down Expand Up @@ -420,7 +424,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
Predicate<Map<ColumnHandle, NullableValue>> targetPredicate = convertToPredicate(targetTupleDomain);
Constraint<ColumnHandle> targetConstraint = new Constraint<>(targetTupleDomain, targetPredicate);
Iterable<List<Object>> records = () ->
stream(partitionManager.getPartitionsIterator(metastore, sourceTableHandle, targetConstraint))
stream(partitionManager.getPartitionsIterator(metastore, sourceTableHandle, targetConstraint, session))
.map(hivePartition ->
IntStream.range(0, partitionColumns.size())
.mapToObj(fieldIdToColumnHandle::get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static com.facebook.presto.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_PARTITION_LIMIT;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxBucketsForGroupedExecution;
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.shouldIgnoreTableBucketing;
import static com.facebook.presto.hive.HiveUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.hive.HiveUtil.parsePartitionValue;
Expand Down Expand Up @@ -121,13 +122,13 @@ public HivePartitionManager(
this.domainCompactionThreshold = domainCompactionThreshold;
}

public Iterable<HivePartition> getPartitionsIterator(SemiTransactionalHiveMetastore metastore, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint)
public Iterable<HivePartition> getPartitionsIterator(SemiTransactionalHiveMetastore metastore, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint, ConnectorSession session)
{
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
TupleDomain<ColumnHandle> effectivePredicate = constraint.getSummary();

SchemaTableName tableName = hiveTableHandle.getSchemaTableName();
Table table = getTable(metastore, tableName);
Table table = getTable(metastore, tableName, isOfflineDataDebugModeEnabled(session));

List<HiveColumnHandle> partitionColumns = getPartitionKeyColumnHandles(table);

Expand All @@ -149,11 +150,11 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
TupleDomain<ColumnHandle> effectivePredicate = constraint.getSummary();

SchemaTableName tableName = hiveTableHandle.getSchemaTableName();
Table table = getTable(metastore, tableName);
Table table = getTable(metastore, tableName, isOfflineDataDebugModeEnabled(session));

List<HiveColumnHandle> partitionColumns = getPartitionKeyColumnHandles(table);

List<HivePartition> partitions = getPartitionsAsList(getPartitionsIterator(metastore, tableHandle, constraint).iterator());
List<HivePartition> partitions = getPartitionsAsList(getPartitionsIterator(metastore, tableHandle, constraint, session).iterator());

// never ignore table bucketing for temporary tables as those are created such explicitly by the engine request
boolean shouldIgnoreTableBucketing = !table.getTableType().equals(TEMPORARY_TABLE) && shouldIgnoreTableBucketing(session);
Expand Down Expand Up @@ -228,7 +229,7 @@ public HivePartitionResult getPartitions(SemiTransactionalHiveMetastore metastor
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
SchemaTableName tableName = hiveTableHandle.getSchemaTableName();

Table table = getTable(metastore, tableName);
Table table = getTable(metastore, tableName, isOfflineDataDebugModeEnabled(session));

List<HiveColumnHandle> partitionColumns = getPartitionKeyColumnHandles(table);
List<Type> partitionColumnTypes = partitionColumns.stream()
Expand Down Expand Up @@ -270,14 +271,18 @@ private Optional<HivePartition> parseValuesAndFilterPartition(
return Optional.of(partition);
}

private Table getTable(SemiTransactionalHiveMetastore metastore, SchemaTableName tableName)
private Table getTable(SemiTransactionalHiveMetastore metastore, SchemaTableName tableName, boolean offlineDataDebugModeEnabled)
{
Optional<Table> target = metastore.getTable(tableName.getSchemaName(), tableName.getTableName());
if (!target.isPresent()) {
throw new TableNotFoundException(tableName);
}
Table table = target.get();
verifyOnline(tableName, Optional.empty(), getProtectMode(table), table.getParameters());

if (!offlineDataDebugModeEnabled) {
verifyOnline(tableName, Optional.empty(), getProtectMode(table), table.getParameters());
}

return table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public final class HiveSessionProperties
public static final String PUSHDOWN_FILTER_ENABLED = "pushdown_filter_enabled";
public static final String VIRTUAL_BUCKET_COUNT = "virtual_bucket_count";
public static final String MAX_BUCKETS_FOR_GROUPED_EXECUTION = "max_buckets_for_grouped_execution";
public static final String OFFLINE_DATA_DEBUG_MODE_ENABLED = "offline_data_debug_mode_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -368,7 +369,12 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
MAX_BUCKETS_FOR_GROUPED_EXECUTION,
"maximum total buckets to allow using grouped execution",
hiveClientConfig.getMaxBucketsForGroupedExecution(),
false));
false),
booleanProperty(
OFFLINE_DATA_DEBUG_MODE_ENABLED,
"allow reading from tables or partitions that are marked as offline or not readable",
false,
true));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -622,6 +628,11 @@ public static int getVirtualBucketCount(ConnectorSession session)
return virtualBucketCount;
}

public static boolean isOfflineDataDebugModeEnabled(ConnectorSession session)
{
return session.getProperty(OFFLINE_DATA_DEBUG_MODE_ENABLED, Boolean.class);
}

public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden)
{
return new PropertyMetadata<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID;
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getProtectMode;
import static com.facebook.presto.hive.metastore.MetastoreUtil.makePartName;
import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyOnline;
Expand Down Expand Up @@ -170,10 +171,12 @@ public ConnectorSplitSource getSplits(
Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

if (!isOfflineDataDebugModeEnabled(session)) {
// verify table is not marked as non-readable
String tableNotReadable = table.getParameters().get(OBJECT_NOT_READABLE);
if (!isNullOrEmpty(tableNotReadable)) {
throw new HiveNotReadableException(tableName, Optional.empty(), tableNotReadable);
String tableNotReadable = table.getParameters().get(OBJECT_NOT_READABLE);
if (!isNullOrEmpty(tableNotReadable)) {
throw new HiveNotReadableException(tableName, Optional.empty(), tableNotReadable);
}
}

// get partitions
Expand Down Expand Up @@ -209,7 +212,7 @@ public ConnectorSplitSource getSplits(
if (bucketHandle.isPresent() && !bucketHandle.get().isVirtuallyBucketed()) {
hiveBucketProperty = bucketHandle.map(HiveBucketHandle::toTableBucketProperty);
}
Iterable<HivePartitionMetadata> hivePartitions = getPartitionMetadata(metastore, table, tableName, partitions, hiveBucketProperty);
Iterable<HivePartitionMetadata> hivePartitions = getPartitionMetadata(metastore, table, tableName, partitions, hiveBucketProperty, session);

HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
table,
Expand Down Expand Up @@ -287,7 +290,13 @@ public CounterStat getHighMemorySplitSource()
return highMemorySplitSourceCounter;
}

private Iterable<HivePartitionMetadata> getPartitionMetadata(SemiTransactionalHiveMetastore metastore, Table table, SchemaTableName tableName, List<HivePartition> hivePartitions, Optional<HiveBucketProperty> bucketProperty)
private Iterable<HivePartitionMetadata> getPartitionMetadata(
SemiTransactionalHiveMetastore metastore,
Table table,
SchemaTableName tableName,
List<HivePartition> hivePartitions,
Optional<HiveBucketProperty> bucketProperty,
ConnectorSession session)
{
if (hivePartitions.isEmpty()) {
return ImmutableList.of();
Expand Down Expand Up @@ -326,13 +335,15 @@ private Iterable<HivePartitionMetadata> getPartitionMetadata(SemiTransactionalHi
}
String partName = makePartName(table.getPartitionColumns(), partition.getValues());

// verify partition is online
verifyOnline(tableName, Optional.of(partName), getProtectMode(partition), partition.getParameters());
if (!isOfflineDataDebugModeEnabled(session)) {
// verify partition is online
verifyOnline(tableName, Optional.of(partName), getProtectMode(partition), partition.getParameters());

// verify partition is not marked as non-readable
String partitionNotReadable = partition.getParameters().get(OBJECT_NOT_READABLE);
if (!isNullOrEmpty(partitionNotReadable)) {
throw new HiveNotReadableException(tableName, Optional.of(partName), partitionNotReadable);
// verify partition is not marked as non-readable
String partitionNotReadable = partition.getParameters().get(OBJECT_NOT_READABLE);
if (!isNullOrEmpty(partitionNotReadable)) {
throw new HiveNotReadableException(tableName, Optional.of(partName), partitionNotReadable);
}
}

// Verify that the partition schema matches the table schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.predicate.ValueSet;
import com.facebook.presto.spi.security.ConnectorIdentity;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.TableStatistics;
Expand All @@ -93,6 +94,7 @@
import com.facebook.presto.spi.type.SqlTimestamp;
import com.facebook.presto.spi.type.SqlVarbinary;
import com.facebook.presto.spi.type.StandardTypes;
import com.facebook.presto.spi.type.TimeZoneKey;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.gen.JoinCompiler;
Expand Down Expand Up @@ -128,6 +130,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
Expand Down Expand Up @@ -160,6 +163,7 @@
import static com.facebook.presto.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
import static com.facebook.presto.hive.HiveMetadata.PRESTO_VERSION_NAME;
import static com.facebook.presto.hive.HiveMetadata.convertToPredicate;
import static com.facebook.presto.hive.HiveSessionProperties.OFFLINE_DATA_DEBUG_MODE_ENABLED;
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.JSON;
Expand Down Expand Up @@ -841,6 +845,71 @@ protected ConnectorSession newSession()
return new TestingConnectorSession(new HiveSessionProperties(getHiveClientConfig(), new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties());
}

protected ConnectorSession newSession(Map<String, Object> extraProperties)
{
ConnectorSession session = newSession();
return new ConnectorSession() {
@Override
public String getQueryId()
{
return session.getQueryId();
}

@Override
public Optional<String> getSource()
{
return session.getSource();
}

@Override
public ConnectorIdentity getIdentity()
{
return session.getIdentity();
}

@Override
public TimeZoneKey getTimeZoneKey()
{
return session.getTimeZoneKey();
}

@Override
public Locale getLocale()
{
return session.getLocale();
}

@Override
public Optional<String> getTraceToken()
{
return session.getTraceToken();
}

@Override
public long getStartTime()
{
return session.getStartTime();
}

@Override
public boolean isLegacyTimestamp()
{
return session.isLegacyTimestamp();
}

@Override
public <T> T getProperty(String name, Class<T> type)
{
Object value = extraProperties.get(name);
if (value != null) {
return type.cast(value);
}

return session.getProperty(name, type);
}
};
}

protected Transaction newTransaction()
{
return new HiveTransaction(transactionManager, metadataFactory.get());
Expand Down Expand Up @@ -1473,6 +1542,22 @@ public void testGetPartitionSplitsTableOfflinePartition()
assertEquals(e.getPartition(), "ds=2012-12-30");
}
}

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession(ImmutableMap.of(OFFLINE_DATA_DEBUG_MODE_ENABLED, true));

ConnectorTableHandle tableHandle = getTableHandle(metadata, tableOfflinePartition);
assertNotNull(tableHandle);

ColumnHandle dsColumn = metadata.getColumnHandles(session, tableHandle).get("ds");
assertNotNull(dsColumn);

Domain domain = Domain.singleValue(createUnboundedVarcharType(), utf8Slice("2012-12-30"));
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(ImmutableMap.of(dsColumn, domain));
List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, new Constraint<>(tupleDomain), Optional.empty());
getSplitCount(splitManager.getSplits(transaction.getTransactionHandle(), session, getOnlyElement(tableLayoutResults).getTableLayout().getHandle(), SPLIT_SCHEDULING_CONTEXT));
}
}

@Test
Expand All @@ -1499,6 +1584,20 @@ public void testGetPartitionSplitsTableNotReadablePartition()
assertEquals(e.getPartition(), Optional.empty());
}
}

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession(ImmutableMap.of(OFFLINE_DATA_DEBUG_MODE_ENABLED, true));

ConnectorTableHandle tableHandle = getTableHandle(metadata, tableNotReadable);
assertNotNull(tableHandle);

ColumnHandle dsColumn = metadata.getColumnHandles(session, tableHandle).get("ds");
assertNotNull(dsColumn);

List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, Constraint.alwaysTrue(), Optional.empty());
getSplitCount(splitManager.getSplits(transaction.getTransactionHandle(), session, getOnlyElement(tableLayoutResults).getTableLayout().getHandle(), SPLIT_SCHEDULING_CONTEXT));
}
}

@Test
Expand Down

0 comments on commit ecdbbb3

Please sign in to comment.