Skip to content

Commit

Permalink
[GOBBLIN-1747] add job.name and job.id to kafka and compaction workun…
Browse files Browse the repository at this point in the history
…its (#3607)

* add job.name and job.id to kafka workunits

* fix unit test

* add job.name and job.id to compaction source workunits also

Co-authored-by: umustafi <[email protected]>
  • Loading branch information
arjun4084346 and umustafi authored Nov 23, 2022
1 parent 12ba403 commit c6d6c1b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void run() {
Iterators.transform (datasets.iterator(), new Function<Dataset, Callable<VerifiedDataset>>() {
@Override
public Callable<VerifiedDataset> apply(Dataset dataset) {
return new DatasetVerifier (dataset, workUnitIterator, suite.getDatasetsFinderVerifiers());
return new DatasetVerifier (dataset, workUnitIterator, suite.getDatasetsFinderVerifiers(), state);
}
});

Expand Down Expand Up @@ -312,6 +312,7 @@ private class DatasetVerifier implements Callable {
private Dataset dataset;
private CompactionWorkUnitIterator workUnitIterator;
private List<CompactionVerifier> verifiers;
private State state;

/**
* {@link VerifiedDataset} wraps original {@link Dataset} because if verification failed, we are able get original
Expand All @@ -321,7 +322,7 @@ public VerifiedDataset call() throws DatasetVerificationException {
try {
VerifiedResult result = this.verify(dataset);
if (result.allVerificationPassed) {
this.workUnitIterator.addWorkUnit(createWorkUnit(dataset));
this.workUnitIterator.addWorkUnit(createWorkUnit(dataset, state));
}
return new VerifiedDataset(dataset, result);
} catch (Exception e) {
Expand Down Expand Up @@ -423,11 +424,17 @@ protected void addWorkUnit (WorkUnit wu) {
}
}

protected WorkUnit createWorkUnit(Dataset dataset) throws IOException {
WorkUnit workUnit = new WorkUnit();
protected WorkUnit createWorkUnit(Dataset dataset, State state) throws IOException {
WorkUnit workUnit = WorkUnit.createEmpty();
TaskUtils.setTaskFactoryClass(workUnit, MRCompactionTaskFactory.class);
suite.save(dataset, workUnit);
workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
if (state.contains(ConfigurationKeys.JOB_NAME_KEY)) {
workUnit.setProp(ConfigurationKeys.JOB_NAME_KEY, state.getProp(ConfigurationKeys.JOB_NAME_KEY));
}
if (state.contains(ConfigurationKeys.JOB_ID_KEY)) {
workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, state.getProp(ConfigurationKeys.JOB_ID_KEY));
}
return workUnit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,12 @@ private void addSourceStatePropsToWorkUnit(WorkUnit workUnit, SourceState state)
workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD));
workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name()));
}
if (state.contains(ConfigurationKeys.JOB_NAME_KEY)) {
workUnit.setProp(ConfigurationKeys.JOB_NAME_KEY, state.getProp(ConfigurationKeys.JOB_NAME_KEY));
}
if (state.contains(ConfigurationKeys.JOB_ID_KEY)) {
workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, state.getProp(ConfigurationKeys.JOB_ID_KEY));
}
}

private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) {
Expand Down

0 comments on commit c6d6c1b

Please sign in to comment.