Skip to content

Commit

Permalink
[GOBBLIN-1649] Revert gobblin-1633 (#3510)
Browse files Browse the repository at this point in the history
  • Loading branch information
homatthew authored May 18, 2022
1 parent b726a60 commit deb4f48
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.gobblin.compaction.mapreduce.RecordKeyDedupReducerBase;
import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
Expand Down Expand Up @@ -182,6 +183,8 @@ public void onCompactionJobComplete(FileSystemDataset dataset) throws IOExceptio
this.configurator.getConfiguredJob().getJobID().toString());
compactionState.setProp(DUPLICATE_COUNT_TOTAL,
job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp(DUPLICATE_COUNT_TOTAL));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,15 @@

import org.apache.gobblin.compaction.action.CompactionCompleteAction;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.source.CompactionSource;
import org.apache.gobblin.compaction.suite.CompactionSuite;
import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.mapreduce.MRTask;
import org.apache.hadoop.fs.Path;



Expand Down Expand Up @@ -106,19 +101,14 @@ public void run() {
public void onMRTaskComplete (boolean isSuccess, Throwable throwable) {
if (isSuccess) {
try {
TaskState taskState = taskContext.getTaskState();
setCounterInfo(taskState);
setCounterInfo(taskContext.getTaskState());

List<CompactionCompleteAction> actions = this.suite.getCompactionCompleteActions();
for (CompactionCompleteAction action: actions) {
action.addEventSubmitter(eventSubmitter);
action.onCompactionJobComplete(dataset);
}
submitEvent(CompactionSlaEventHelper.COMPACTION_COMPLETED_EVENT_NAME);
if (dataset instanceof FileSystemDataset) {
commitRunStartTimeInfo(taskState, (FileSystemDataset) dataset);
}

super.onMRTaskComplete(true, null);
} catch (IOException e) {
submitEvent(CompactionSlaEventHelper.COMPACTION_FAILED_EVENT_NAME);
Expand All @@ -130,22 +120,6 @@ public void onMRTaskComplete (boolean isSuccess, Throwable throwable) {
}
}

/**
* Persist the run start time which is used to determine when the last successful compaction run started. This
* value is useful for limiting how often you recompact by verifying whether a dataset has recently been compacted.
* @param taskState
* @param dataset
* @throws IOException
*/
private static void commitRunStartTimeInfo(TaskState taskState, FileSystemDataset dataset) throws IOException {
CompactionPathParser.CompactionParserResult result = new CompactionPathParser(taskState).parse(dataset);
InputRecordCountHelper helper = new InputRecordCountHelper(taskState);
State compactionState = helper.loadState(new Path(result.getDstAbsoluteDir()));
compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
taskState.getProp(CompactionSource.COMPACTION_INIT_TIME));
helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
}

private void setCounterInfo(TaskState taskState)
throws IOException {

Expand Down

0 comments on commit deb4f48

Please sign in to comment.