diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java index 5d5330df09b..d86f9ef6fcb 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java @@ -27,6 +27,7 @@ import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.metrics.event.TimingEvent; import org.slf4j.MDC; import com.google.common.base.Optional; @@ -233,6 +234,8 @@ protected List generateWorkUnits(SourceEntity sourceEntity, SourceStat extract.setFullTrue(System.currentTimeMillis()); } + Optional highestWaterMark = Optional.absent(); + Optional lowestWaterMark = Optional.absent(); for (Partition partition : partitions) { WorkUnit workunit = WorkUnit.create(extract); workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName()); @@ -241,6 +244,14 @@ protected List generateWorkUnits(SourceEntity sourceEntity, SourceStat addLineageSourceInfo(state, sourceEntity, workunit); partition.serialize(workunit); workUnits.add(workunit); + highestWaterMark = highestWaterMark.isPresent() ? + highestWaterMark.transform(hw -> Math.max(hw, partition.getHighWatermark())) : Optional.of(partition.getHighWatermark()); + lowestWaterMark = lowestWaterMark.isPresent() ? + lowestWaterMark.transform(lw -> Math.min(lw, partition.getLowWatermark())) : Optional.of(partition.getLowWatermark()); + } + if(highestWaterMark.isPresent() && lowestWaterMark.isPresent()) { + state.appendToListProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, highestWaterMark.get())); + state.appendToListProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, lowestWaterMark.get())); } return workUnits; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java index 3a3caad2f9c..9b6d18c1d19 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java @@ -36,6 +36,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter; import lombok.AllArgsConstructor; +import org.apache.gobblin.metrics.event.TimingEvent; @AllArgsConstructor @@ -77,6 +78,8 @@ private void submitJobStateEvent(JobState jobState) { jobMetadataBuilder.put(METADATA_JOB_COMPLETED_TASKS, Integer.toString(jobState.getCompletedTasks())); jobMetadataBuilder.put(METADATA_JOB_LAUNCHER_TYPE, jobState.getLauncherType().toString()); jobMetadataBuilder.put(METADATA_JOB_TRACKING_URL, jobState.getTrackingURL().or(UNKNOWN_VALUE)); + jobMetadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "")); + jobMetadataBuilder.put(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "")); jobMetadataBuilder.put(EventSubmitter.EVENT_TYPE, JOB_STATE); this.eventSubmitter.submit(JOB_STATE, jobMetadataBuilder.build()); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 4b304e5ce72..bfc1f7a350a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -969,7 +969,7 @@ private void checkQuota(DagNode dagNode) throws IOException { if (proxyUser != null) { proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode); - proxyUserCheck = proxyQuotaIncrement < 0; // proxy user quota check failed + proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds if (!proxyUserCheck) { requesterMessage.append(String.format( "Quota exceeded for proxy user %s on executor %s : quota=%s, runningJobs=%d%n", @@ -985,7 +985,7 @@ private void checkQuota(DagNode dagNode) throws IOException { .map(ServiceRequester::getName).distinct().collect(Collectors.toList()); for (String requester : uniqueRequesters) { int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode); - boolean thisRequesterCheck = userQuotaIncrement < 0; // user quota check failed + boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds usersQuotaIncrement.add(requester); requesterCheck = requesterCheck && thisRequesterCheck; if (!thisRequesterCheck) { @@ -1112,6 +1112,9 @@ private void releaseQuota(DagNode dagNode) { private void decrementQuotaUsage(Map quotaMap, String user) { Integer currentCount; + if (user == null) { + return; + } do { currentCount = quotaMap.get(user); } while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1)); diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java index 5d70219cb26..3fd10d005bc 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/test/StressTestingSource.java @@ -54,6 +54,8 @@ public class StressTestingSource implements Source { public static final int DEFAULT_NUM_RECORDS = 1; public static final String MEM_ALLOC_BYTES_KEY = CONFIG_NAMESPACE + "." + "memAllocBytes"; public static final int DEFAULT_MEM_ALLOC_BYTES = 8; + public static final String THROW_EXCEPTION = CONFIG_NAMESPACE + "." + "throwException"; + public static final boolean DEFAULT_THROW_EXCEPTION = false; private static final long INVALID_TIME = -1; @@ -94,6 +96,7 @@ public static class ExtractorImpl implements Extractor { private final int numRecords; private final int memAllocBytes; private final Random random; + private final boolean throwException; public ExtractorImpl(WorkUnitState state) { this.random = new Random(); @@ -113,6 +116,7 @@ public ExtractorImpl(WorkUnitState state) { // num records only takes effect if the duration is not specified this.numRecords = this.endTime == INVALID_TIME ? state.getPropAsInt(NUM_RECORDS_KEY, DEFAULT_NUM_RECORDS) : 0; this.memAllocBytes = state.getPropAsInt(MEM_ALLOC_BYTES_KEY, DEFAULT_MEM_ALLOC_BYTES); + this.throwException = state.getPropAsBoolean(THROW_EXCEPTION, DEFAULT_THROW_EXCEPTION); } @Override @@ -134,6 +138,9 @@ public byte[] readRecord(byte[] reuse) throws DataRecordException, IOException { // If an end time is configured then it is used as the stopping point otherwise the record count limit is used if ((this.endTime != INVALID_TIME && System.currentTimeMillis() > this.endTime) || (this.numRecords > 0 && this.recordsEmitted >= this.numRecords)) { + if (this.throwException) { + throw new IOException("This is one test exception"); + } return null; } diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java index 08ce4082af0..65775476508 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java @@ -168,4 +168,37 @@ public void testRunDuration() throws DataRecordException, IOException { Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) < (1000000), "Time spent " + timeSpentMicro); } + + @Test + public void testThrowException() throws DataRecordException, IOException { + final int MEM_ALLOC_BYTES = 100; + final int NUM_WORK_UNITS = 1; + final int SLEEP_TIME_MICRO = 1000; + final int NUM_RECORDS = 30; // this config is ignored since the duration is set + final int RUN_DURATION_SECS = 5; + + SourceState state = new SourceState(); + state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS); + state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES); + state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO); + state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS); + state.setProp(StressTestingSource.RUN_DURATION_KEY, RUN_DURATION_SECS); + state.setProp(StressTestingSource.THROW_EXCEPTION, true); + + StressTestingSource source = new StressTestingSource(); + + List wus = source.getWorkunits(state); + Assert.assertEquals(wus.size(), NUM_WORK_UNITS); + + WorkUnit wu = wus.get(0); + WorkUnitState wuState = new WorkUnitState(wu, state); + Extractor extractor = source.getExtractor(wuState); + + Assert.expectThrows(IOException.class, () -> { + byte[] record; + while ((record = extractor.readRecord(null)) != null) { + Assert.assertEquals(record.length, 100); + } + }); + } }