Skip to content

Commit

Permalink
Track Glue API calls that were untracked
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and losipiuk committed Feb 22, 2022
1 parent 707e231 commit 1f7a537
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static <Request, Result> Stream<Result> getPaginatedResults(
Function<Request, Result> submission,
Request request,
BiConsumer<Request, String> setNextToken,
Function<Result, String> extractNextToken)
Function<Result, String> extractNextToken,
GlueMetastoreApiStats stats)
{
requireNonNull(submission, "submission is null");
requireNonNull(request, "request is null");
Expand All @@ -57,7 +58,7 @@ protected Result computeNext()
}

setNextToken.accept(request, nextToken);
Result result = submission.apply(request);
Result result = stats.call(() -> submission.apply(request));
firstRequest = false;
nextToken = extractNextToken.apply(result);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package io.trino.plugin.hive.metastore.glue;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.metrics.RequestMetricCollector;
import com.amazonaws.services.glue.AWSGlueAsync;
Expand Down Expand Up @@ -291,18 +293,17 @@ public Optional<Database> getDatabase(String databaseName)
public List<String> getAllDatabases()
{
try {
return stats.getGetDatabases().call(() -> {
List<String> databaseNames = getPaginatedResults(
glueClient::getDatabases,
new GetDatabasesRequest().withCatalogId(catalogId),
GetDatabasesRequest::setNextToken,
GetDatabasesResult::getNextToken)
.map(GetDatabasesResult::getDatabaseList)
.flatMap(List::stream)
.map(com.amazonaws.services.glue.model.Database::getName)
.collect(toImmutableList());
return databaseNames;
});
List<String> databaseNames = getPaginatedResults(
glueClient::getDatabases,
new GetDatabasesRequest().withCatalogId(catalogId),
GetDatabasesRequest::setNextToken,
GetDatabasesResult::getNextToken,
stats.getGetDatabases())
.map(GetDatabasesResult::getDatabaseList)
.flatMap(List::stream)
.map(com.amazonaws.services.glue.model.Database::getName)
.collect(toImmutableList());
return databaseNames;
}
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
Expand Down Expand Up @@ -370,10 +371,10 @@ public void updateTableStatistics(String databaseName, String tableName, AcidTra
final Map<String, String> statisticsParameters = updateStatisticsParameters(table.getParameters(), updatedStatistics.getBasicStatistics());
tableInput.setParameters(statisticsParameters);
table = Table.builder(table).setParameters(statisticsParameters).build();
glueClient.updateTable(new UpdateTableRequest()
stats.getUpdateTable().call(() -> glueClient.updateTable(new UpdateTableRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
.withTableInput(tableInput));
.withTableInput(tableInput)));
columnStatisticsProvider.updateTableColumnStatistics(table, updatedStatistics.getColumnStatistics());
}
catch (EntityNotFoundException e) {
Expand Down Expand Up @@ -426,11 +427,13 @@ private void updatePartitionStatisticsBatch(Table table, Map<String, Function<Pa
List<Future<BatchUpdatePartitionResult>> partitionUpdateRequestsFutures = new ArrayList<>();
partitionUpdateRequestsPartitioned.forEach(partitionUpdateRequestsPartition -> {
// Update basic statistics
long startTimestamp = System.currentTimeMillis();
partitionUpdateRequestsFutures.add(glueClient.batchUpdatePartitionAsync(new BatchUpdatePartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withEntries(partitionUpdateRequestsPartition)));
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withEntries(partitionUpdateRequestsPartition),
new StatsRecordingAsyncHandler(stats.getBatchUpdatePartition(), startTimestamp)));
});

try {
Expand All @@ -448,21 +451,20 @@ private void updatePartitionStatisticsBatch(Table table, Map<String, Function<Pa
public List<String> getAllTables(String databaseName)
{
try {
return stats.getGetTables().call(() -> {
List<String> tableNames = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken)
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(tableFilter)
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return tableNames;
});
List<String> tableNames = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken,
stats.getGetTables())
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(tableFilter)
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return tableNames;
}
catch (EntityNotFoundException e) {
// database does not exist
Expand All @@ -484,21 +486,20 @@ public synchronized List<String> getTablesWithParameter(String databaseName, Str
public List<String> getAllViews(String databaseName)
{
try {
return stats.getGetAllViews().call(() -> {
List<String> views = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken)
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(table -> VIRTUAL_VIEW.name().equals(table.getTableType()))
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return views;
});
List<String> views = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken,
stats.getGetAllViews())
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
.filter(table -> VIRTUAL_VIEW.name().equals(table.getTableType()))
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
return views;
}
catch (EntityNotFoundException e) {
// database does not exist
Expand Down Expand Up @@ -836,27 +837,25 @@ private List<Partition> getPartitions(Table table, String expression)
private List<Partition> getPartitions(Table table, String expression, @Nullable Segment segment)
{
try {
return stats.getGetPartitions().call(() -> {
// Reuse immutable field instances opportunistically between partitions
GluePartitionConverter converter = new GluePartitionConverter(table);

List<Partition> partitions = getPaginatedResults(
glueClient::getPartitions,
new GetPartitionsRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withExpression(expression)
.withSegment(segment)
.withMaxResults(AWS_GLUE_GET_PARTITIONS_MAX_RESULTS),
GetPartitionsRequest::setNextToken,
GetPartitionsResult::getNextToken)
.map(GetPartitionsResult::getPartitions)
.flatMap(List::stream)
.map(converter)
.collect(toImmutableList());
return partitions;
});
// Reuse immutable field instances opportunistically between partitions
GluePartitionConverter converter = new GluePartitionConverter(table);
List<Partition> partitions = getPaginatedResults(
glueClient::getPartitions,
new GetPartitionsRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withExpression(expression)
.withSegment(segment)
.withMaxResults(AWS_GLUE_GET_PARTITIONS_MAX_RESULTS),
GetPartitionsRequest::setNextToken,
GetPartitionsResult::getNextToken,
stats.getGetPartitions())
.map(GetPartitionsResult::getPartitions)
.flatMap(List::stream)
.map(converter)
.collect(toImmutableList());
return partitions;
}
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
Expand Down Expand Up @@ -920,11 +919,13 @@ private List<Partition> batchGetPartition(Table table, List<String> partitionNam
while (!pendingPartitions.isEmpty()) {
List<Future<BatchGetPartitionResult>> batchGetPartitionFutures = new ArrayList<>();
for (List<PartitionValueList> partitions : Lists.partition(pendingPartitions, BATCH_GET_PARTITION_MAX_PAGE_SIZE)) {
long startTimestamp = System.currentTimeMillis();
batchGetPartitionFutures.add(glueClient.batchGetPartitionAsync(new BatchGetPartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(table.getDatabaseName())
.withTableName(table.getTableName())
.withPartitionsToGet(partitions)));
.withPartitionsToGet(partitions),
new StatsRecordingAsyncHandler(stats.getGetPartitions(), startTimestamp)));
}
pendingPartitions.clear();

Expand Down Expand Up @@ -965,11 +966,14 @@ public void addPartitions(String databaseName, String tableName, List<PartitionW

for (List<PartitionWithStatistics> partitionBatch : Lists.partition(partitions, BATCH_CREATE_PARTITION_MAX_PAGE_SIZE)) {
List<PartitionInput> partitionInputs = mappedCopy(partitionBatch, partition -> GlueInputConverter.convertPartition(partition));
futures.add(glueClient.batchCreatePartitionAsync(new BatchCreatePartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
.withTableName(tableName)
.withPartitionInputList(partitionInputs)));
long startTime = System.currentTimeMillis();
futures.add(glueClient.batchCreatePartitionAsync(
new BatchCreatePartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(databaseName)
.withTableName(tableName)
.withPartitionInputList(partitionInputs),
new StatsRecordingAsyncHandler(stats.getBatchCreatePartition(), startTime)));
}

for (Future<BatchCreatePartitionResult> future : futures) {
Expand Down Expand Up @@ -1126,4 +1130,29 @@ public Set<HivePrivilegeInfo> listTablePrivileges(String databaseName, String ta
{
return ImmutableSet.of();
}

static class StatsRecordingAsyncHandler<Request extends AmazonWebServiceRequest, Result>
implements AsyncHandler<Request, Result>
{
private final GlueMetastoreApiStats stats;
private final long startTimeInMillis;

public StatsRecordingAsyncHandler(GlueMetastoreApiStats stats, long startTimeInMillis)
{
this.stats = requireNonNull(stats, "stats is null");
this.startTimeInMillis = startTimeInMillis;
}

@Override
public void onError(Exception e)
{
stats.recordCall(System.currentTimeMillis() - startTimeInMillis, true);
}

@Override
public void onSuccess(AmazonWebServiceRequest request, Object o)
{
stats.recordCall(System.currentTimeMillis() - startTimeInMillis, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ public CounterStat getTotalFailures()
return totalFailures;
}

public void recordCall(long executionTimeInMillis, boolean failure)
{
time.add(executionTimeInMillis, MILLISECONDS);
if (failure) {
totalFailures.update(1);
}
}

public interface ThrowingCallable<V, E extends Exception>
{
V call()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class GlueMetastoreStats
private final GlueMetastoreApiStats createPartitions = new GlueMetastoreApiStats();
private final GlueMetastoreApiStats deletePartition = new GlueMetastoreApiStats();
private final GlueMetastoreApiStats updatePartition = new GlueMetastoreApiStats();
private final GlueMetastoreApiStats batchUpdatePartition = new GlueMetastoreApiStats();
private final GlueMetastoreApiStats batchCreatePartition = new GlueMetastoreApiStats();
private final GlueMetastoreApiStats getColumnStatisticsForTable = new GlueMetastoreApiStats();
private final GlueMetastoreApiStats getColumnStatisticsForPartition = new GlueMetastoreApiStats();
private final GlueMetastoreApiStats updateColumnStatisticsForTable = new GlueMetastoreApiStats();
Expand Down Expand Up @@ -189,6 +191,20 @@ public GlueMetastoreApiStats getUpdatePartition()
return updatePartition;
}

@Managed
@Nested
public GlueMetastoreApiStats getBatchUpdatePartition()
{
return batchUpdatePartition;
}

@Managed
@Nested
public GlueMetastoreApiStats getBatchCreatePartition()
{
return batchCreatePartition;
}

@Managed
@Nested
public GlueMetastoreApiStats getGetColumnStatisticsForTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,14 @@ protected HiveMetastore createMetastore(File tempDir, HiveIdentity identity)
public void cleanupOrphanedDatabases()
{
long creationTimeMillisThreshold = currentTimeMillis() - DAYS.toMillis(1);
GlueHiveMetastore metastore = (GlueHiveMetastore) getMetastoreClient();
GlueMetastoreStats stats = metastore.getStats();
List<String> orphanedDatabases = getPaginatedResults(
glueClient::getDatabases,
new GetDatabasesRequest(),
GetDatabasesRequest::setNextToken,
GetDatabasesResult::getNextToken)
GetDatabasesResult::getNextToken,
stats.getGetDatabases())
.map(GetDatabasesResult::getDatabaseList)
.flatMap(List::stream)
.filter(database -> database.getName().startsWith(TEST_DATABASE_NAME_PREFIX) &&
Expand Down Expand Up @@ -1009,8 +1012,12 @@ public void testStatisticsPartitionedTableColumnModification()
.setColumnStatistics(columnStatistics).build();

createDummyPartitionedTable(tableName, columns);
GlueHiveMetastore metastoreClient = (GlueHiveMetastore) getMetastoreClient();
double countBefore = metastoreClient.getStats().getBatchUpdatePartition().getTime().getAllTime().getCount();

metastore.updatePartitionStatistics(tableName.getSchemaName(), tableName.getTableName(), "ds=2016-01-01", actualStatistics -> partitionStatistics);

assertThat(metastoreClient.getStats().getBatchUpdatePartition().getTime().getAllTime().getCount()).isEqualTo(countBefore + 1);
PartitionStatistics tableStatistics = new PartitionStatistics(createEmptyStatistics(), Map.of());
assertThat(metastore.getTableStatistics(tableName.getSchemaName(), tableName.getTableName()))
.isEqualTo(tableStatistics);
Expand Down

0 comments on commit 1f7a537

Please sign in to comment.