Skip to content

Commit

Permalink
[HUDI-4439] Fix Amazon CloudWatch reporter for metadata enabled tables (
Browse files Browse the repository at this point in the history
apache#6164)

Co-authored-by: Udit Mehrotra <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
  • Loading branch information
3 people authored and fengjian committed Apr 5, 2023
1 parent 7e97eda commit fa1aabb
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,12 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
builder.withProperties(properties);

if (writeConfig.isMetricsOn()) {
// Table Name is needed for metric reporters prefix
Properties commonProperties = new Properties();
commonProperties.put(HoodieWriteConfig.TBL_NAME.key(), tableName);

builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
.fromProperties(commonProperties)
.withReporterType(writeConfig.getMetricsReporterType().toString())
.withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
.on(true).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@

package org.apache.hudi.client.functional;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
Expand Down Expand Up @@ -81,14 +73,14 @@
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieHFileReader;
Expand All @@ -107,6 +99,15 @@
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
Expand Down Expand Up @@ -178,6 +179,19 @@ public static List<Arguments> tableTypeAndEnableOperationArgs() {
);
}

public static List<Arguments> tableOperationsTestArgs() {
return asList(
Arguments.of(COPY_ON_WRITE, true, true),
Arguments.of(COPY_ON_WRITE, true, false),
Arguments.of(COPY_ON_WRITE, false, true),
Arguments.of(COPY_ON_WRITE, false, false),
Arguments.of(MERGE_ON_READ, true, true),
Arguments.of(MERGE_ON_READ, true, false),
Arguments.of(MERGE_ON_READ, false, true),
Arguments.of(MERGE_ON_READ, false, false)
);
}

/**
* Metadata Table bootstrap scenarios.
*/
Expand Down Expand Up @@ -441,28 +455,34 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep
* Test various table operations sync to Metadata Table correctly.
*/
@ParameterizedTest
@MethodSource("tableTypeAndEnableOperationArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
init(tableType, true, enableFullScan, false, false);
doWriteInsertAndUpsert(testTable);
@MethodSource("tableOperationsTestArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan, boolean enableMetrics) throws Exception {
List<Long> commitTimeList = new ArrayList<>();
commitTimeList.add(Long.parseLong(HoodieActiveTimeline.createNewInstantTime()));
for (int i = 0; i < 8; i++) {
long nextCommitTime = getNextCommitTime(commitTimeList.get(commitTimeList.size() - 1));
commitTimeList.add(nextCommitTime);
}
init(tableType, true, enableFullScan, enableMetrics, false);
doWriteInsertAndUpsert(testTable, commitTimeList.get(0).toString(), commitTimeList.get(1).toString(), false);

// trigger an upsert
doWriteOperationAndValidate(testTable, "0000003");
doWriteOperationAndValidate(testTable, commitTimeList.get(2).toString());

// trigger compaction
if (MERGE_ON_READ.equals(tableType)) {
doCompactionAndValidate(testTable, "0000004");
doCompactionAndValidate(testTable, commitTimeList.get(3).toString());
}

// trigger an upsert
doWriteOperation(testTable, "0000005");
doWriteOperation(testTable, commitTimeList.get(4).toString());

// trigger clean
doCleanAndValidate(testTable, "0000006", singletonList("0000001"));
doCleanAndValidate(testTable, commitTimeList.get(5).toString(), singletonList(commitTimeList.get(0).toString()));

// trigger few upserts and validate
doWriteOperation(testTable, "0000007");
doWriteOperation(testTable, "0000008");
doWriteOperation(testTable, commitTimeList.get(6).toString());
doWriteOperation(testTable, commitTimeList.get(7).toString());
validateMetadata(testTable, emptyList(), true);
}

Expand Down

0 comments on commit fa1aabb

Please sign in to comment.