Skip to content

Commit

Permalink
[AUDIT-759] Revert [ENG-8115][HUDI-7624] Fixing index tagging duration (
Browse files Browse the repository at this point in the history
apache#669)

Co-authored-by: Igor Demura <[email protected]>
  • Loading branch information
idemura and Igor Demura authored Apr 24, 2024
1 parent e2c193a commit e572349
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType,
* @return Write Status
*/
public O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
if (result.isCommitted()) {
// Perform post commit operations.
if (result.getFinalizeDuration().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class HoodieMetrics {
public String finalizeTimerName = null;
public String compactionTimerName = null;
public String indexTimerName = null;
public String preWriteTimerName = null;
private String conflictResolutionTimerName = null;
private String conflictResolutionSuccessCounterName = null;
private String conflictResolutionFailureCounterName = null;
Expand All @@ -83,7 +82,6 @@ public class HoodieMetrics {
private Timer logCompactionTimer = null;
private Timer clusteringTimer = null;
private Timer indexTimer = null;
private Timer preWriteTimer = null;
private Timer conflictResolutionTimer = null;
private Counter conflictResolutionSuccessCounter = null;
private Counter conflictResolutionFailureCounter = null;
Expand All @@ -104,7 +102,6 @@ public HoodieMetrics(HoodieWriteConfig config) {
this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
this.logCompactionTimerName = getMetricsName("timer", HoodieTimeline.LOG_COMPACTION_ACTION);
this.indexTimerName = getMetricsName("timer", "index");
this.preWriteTimerName = getMetricsName("timer", "pre_write");
this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution");
this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success");
this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure");
Expand Down Expand Up @@ -184,13 +181,6 @@ public Timer.Context getIndexCtx() {
return indexTimer == null ? null : indexTimer.time();
}

public Timer.Context getPreWriteTimerCtx() {
if (config.isMetricsOn() && preWriteTimer == null) {
preWriteTimer = createTimer(preWriteTimerName);
}
return preWriteTimer == null ? null : preWriteTimer.time();
}

public Timer.Context getConflictResolutionCtx() {
if (config.isLockingMetricsEnabled() && conflictResolutionTimer == null) {
conflictResolutionTimer = createTimer(conflictResolutionTimerName);
Expand Down Expand Up @@ -311,13 +301,6 @@ public void updateIndexMetrics(final String action, final long durationInMs) {
}
}

public void updatePreWriteMetrics(final String action, final long durationInMs) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending preWrite metrics (%s.duration, %d)", action, durationInMs));
metrics.registerGauge(getMetricsName("pre_write", String.format("%s.duration", action)), durationInMs);
}
}

@VisibleForTesting
public String getMetricsName(String action, String metric) {
if (config == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class HoodieWriteMetadata<O> {

private O writeStatuses;
private Option<Duration> indexLookupDuration = Option.empty();
private Option<Long> preWriteDurationMs = Option.empty();

// Will be set when auto-commit happens
private boolean isCommitted;
Expand All @@ -60,9 +59,6 @@ public <T> HoodieWriteMetadata<T> clone(T transformedWriteStatuses) {
if (indexLookupDuration.isPresent()) {
newMetadataInstance.setIndexLookupDuration(indexLookupDuration.get());
}
if (preWriteDurationMs.isPresent()) {
newMetadataInstance.setPreWriteDurationMs(preWriteDurationMs.get());
}
newMetadataInstance.setCommitted(isCommitted);
newMetadataInstance.setCommitMetadata(commitMetadata);
if (writeStats.isPresent()) {
Expand Down Expand Up @@ -136,14 +132,6 @@ public void setIndexLookupDuration(Duration indexLookupDuration) {
this.indexLookupDuration = Option.ofNullable(indexLookupDuration);
}

public Option<Long> getPreWriteDurationMs() {
return preWriteDurationMs;
}

public void setPreWriteDurationMs(Long preWriteDurationMs) {
this.preWriteDurationMs = Option.of(preWriteDurationMs);
}

public Map<String, List<String>> getPartitionToReplaceFileIds() {
return partitionToReplaceFileIds.orElse(Collections.emptyMap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -112,10 +111,6 @@ public BaseCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig c

public abstract HoodieWriteMetadata<O> execute(I inputRecords);

public HoodieWriteMetadata<O> execute(I inputRecords, Option<HoodieTimer> preWriteTimer) {
return this.execute(inputRecords);
}

/**
* Save the workload profile in an intermediate file (here re-using commit files) This is useful when performing
* rollback for MOR tables. Only updates are recorded in the workload profile metadata since updates to log blocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

public abstract class BaseWriteHelper<T, I, K, O, R> extends ParallelismHelper<I> {
import java.time.Duration;
import java.time.Instant;

protected HoodieTimer preWriteTimer = null; // time taken from dedup -> tag location -> building workload profile
public abstract class BaseWriteHelper<T, I, K, O, R> extends ParallelismHelper<I> {

protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer> partitionNumberExtractor) {
super(partitionNumberExtractor);
Expand All @@ -48,19 +47,21 @@ public HoodieWriteMetadata<O> write(String instantTime,
BaseCommitActionExecutor<T, I, K, O, R> executor,
WriteOperationType operationType) {
try {
preWriteTimer = HoodieTimer.start();
// De-dupe/merge if needed
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords, configuredShuffleParallelism, table);

Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
if (table.getIndex().requiresTagging(operationType)) {
// perform index loop up to get existing location of records
context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());
taggedRecords = tag(dedupedRecords, context, table);
}
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());

HoodieWriteMetadata<O> result = executor.execute(taggedRecords, Option.of(preWriteTimer));
HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
result.setIndexLookupDuration(indexLookupDuration);
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,6 @@ public void testTimerCtxandGauges() throws InterruptedException {
}
}

// PreWrite metrics
timer = hoodieMetrics.getPreWriteTimerCtx();
Thread.sleep(5); // Ensure timer duration is > 0
hoodieMetrics.updatePreWriteMetrics("some_action", hoodieMetrics.getDurationInMs(timer.stop()));
metricName = hoodieMetrics.getMetricsName("pre_write", "some_action.duration");
msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
assertTrue(msec > 0);

// Rollback metrics
timer = hoodieMetrics.getRollbackCtx();
Thread.sleep(5); // Ensure timer duration is > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String inst
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records));
HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
if (result.getPreWriteDurationMs().isPresent()) {
metrics.updatePreWriteMetrics(LOOKUP_STR, result.getPreWriteDurationMs().get());
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
return postWrite(resultRDD, instantTime, table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -158,11 +157,6 @@ private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieReco

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
return this.execute(inputRecords, Option.empty());
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords, Option<HoodieTimer> preWriteTimer) {
// Cache the tagged records, so we don't end up computing both
JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
Expand All @@ -180,11 +174,6 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec
WorkloadProfile workloadProfile =
new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), operationType, table.getIndex().canIndexLogFiles());
LOG.debug("Input workload profile :" + workloadProfile);
Long preWriteDurationMs = null;
if (preWriteTimer.isPresent()) {
preWriteDurationMs = preWriteTimer.get().endTimer();
LOG.info("Pre write timer " + preWriteDurationMs);
}

// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(workloadProfile);
Expand All @@ -194,9 +183,6 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec
HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
updateIndexAndCommitIfNeeded(writeStatuses, result);
if (preWriteTimer.isPresent()) {
result.setPreWriteDurationMs(preWriteDurationMs);
}
return result;
}

Expand Down

0 comments on commit e572349

Please sign in to comment.