Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource #3436

Merged
merged 24 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7955422
[GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryB…
ZihanLi58 Nov 23, 2021
ad140d2
add unit test
ZihanLi58 Nov 23, 2021
22ad917
address comments to make high/low watermark optional
ZihanLi58 Nov 23, 2021
c541824
Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecSto…
phet Oct 25, 2021
9643f15
[GOBBLIN-1557] Make KafkaSource getFilteredTopics method protected (#…
jobar Oct 27, 2021
b1e1156
[GOBBLIN-1567] do not set a custom maxConnLifetime for sql connection…
arjun4084346 Oct 27, 2021
614c214
[GOBBLIN-1568] Exponential backoff for Salesforce bulk api polling (#…
williamwjs Oct 28, 2021
a33eb2f
set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any ret…
arjun4084346 Nov 3, 2021
738e7e8
[GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to …
phet Nov 3, 2021
5c39233
Tag metrics with proxy url if available (#3423)
Will-Lo Nov 4, 2021
ecbe466
remove use of deprecated helix class (#3424)
arjun4084346 Nov 8, 2021
4d6e4fa
[GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failur…
ZihanLi58 Nov 10, 2021
7829349
[GOBBLIN-1552] determine flow status correctly when dag manager is di…
arjun4084346 Nov 10, 2021
1c3b568
[GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc …
arjun4084346 Nov 10, 2021
b058034
do not delete data while dropping a hive table because data is delete…
arjun4084346 Nov 18, 2021
adc7c88
[GOBBLIN-1574] Added whitelist for iceberg tables to add new partitio…
vikrambohra Nov 18, 2021
1267f3d
[GOBBLIN-1577] change the multiplier used in ExponentialWaitStrategy …
arjun4084346 Nov 19, 2021
c4d39fb
[GOBBLIN-1580] Check table exists instead of call create table direct…
ZihanLi58 Nov 23, 2021
f7bdf3f
[GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeli…
ZihanLi58 Nov 23, 2021
9f186b7
[GOBBLIN-1576] skip appending record count to staging file if present…
vikrambohra Nov 23, 2021
8315039
fix the NPE in dagManager
ZihanLi58 Nov 24, 2021
20f38cb
Merge branch 'master' into GOBBLIN-1582
ZihanLi58 Nov 24, 2021
3ac49b1
fix quota check issue in dagManager
ZihanLi58 Nov 24, 2021
54d15f3
address comments
ZihanLi58 Nov 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.slf4j.MDC;

import com.google.common.base.Optional;
Expand Down Expand Up @@ -233,6 +234,8 @@ protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, SourceStat
extract.setFullTrue(System.currentTimeMillis());
}

Optional<Long> highestWaterMark = Optional.absent();
Optional<Long> lowestWaterMark = Optional.absent();
for (Partition partition : partitions) {
WorkUnit workunit = WorkUnit.create(extract);
workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName());
Expand All @@ -241,6 +244,14 @@ protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, SourceStat
addLineageSourceInfo(state, sourceEntity, workunit);
partition.serialize(workunit);
workUnits.add(workunit);
highestWaterMark = highestWaterMark.isPresent() ?
highestWaterMark.transform(hw -> Math.max(hw, partition.getHighWatermark())) : Optional.of(partition.getHighWatermark());
lowestWaterMark = lowestWaterMark.isPresent() ?
lowestWaterMark.transform(lw -> Math.min(lw, partition.getLowWatermark())) : Optional.of(partition.getLowWatermark());
}
if(highestWaterMark.isPresent() && lowestWaterMark.isPresent()) {
state.appendToListProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, highestWaterMark.get()));
state.appendToListProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, lowestWaterMark.get()));
}

return workUnits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.gobblin.metrics.event.EventSubmitter;

import lombok.AllArgsConstructor;
import org.apache.gobblin.metrics.event.TimingEvent;


@AllArgsConstructor
Expand Down Expand Up @@ -77,6 +78,8 @@ private void submitJobStateEvent(JobState jobState) {
jobMetadataBuilder.put(METADATA_JOB_COMPLETED_TASKS, Integer.toString(jobState.getCompletedTasks()));
jobMetadataBuilder.put(METADATA_JOB_LAUNCHER_TYPE, jobState.getLauncherType().toString());
jobMetadataBuilder.put(METADATA_JOB_TRACKING_URL, jobState.getTrackingURL().or(UNKNOWN_VALUE));
jobMetadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, ""));
jobMetadataBuilder.put(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, ""));
Comment on lines +81 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may not be an issue... just curious: when do we use "" and when instead UNKNOWN_VALUE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me I think UNKONW_VALUE is not expected in most cases and indicate there is something wrong, but "" just indicate we don't set it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable

jobMetadataBuilder.put(EventSubmitter.EVENT_TYPE, JOB_STATE);

this.eventSubmitter.submit(JOB_STATE, jobMetadataBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ private void checkQuota(DagNode<JobExecutionPlan> dagNode) throws IOException {

if (proxyUser != null) {
proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
proxyUserCheck = proxyQuotaIncrement < 0; // proxy user quota check failed
proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds
if (!proxyUserCheck) {
requesterMessage.append(String.format(
"Quota exceeded for proxy user %s on executor %s : quota=%s, runningJobs=%d%n",
Expand All @@ -985,7 +985,7 @@ private void checkQuota(DagNode<JobExecutionPlan> dagNode) throws IOException {
.map(ServiceRequester::getName).distinct().collect(Collectors.toList());
for (String requester : uniqueRequesters) {
int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
boolean thisRequesterCheck = userQuotaIncrement < 0; // user quota check failed
boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds
usersQuotaIncrement.add(requester);
requesterCheck = requesterCheck && thisRequesterCheck;
if (!thisRequesterCheck) {
Expand Down Expand Up @@ -1112,6 +1112,9 @@ private void releaseQuota(DagNode<JobExecutionPlan> dagNode) {

private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) {
Integer currentCount;
if (user == null) {
return;
}
do {
currentCount = quotaMap.get(user);
} while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class StressTestingSource implements Source<String, byte[]> {
public static final int DEFAULT_NUM_RECORDS = 1;
public static final String MEM_ALLOC_BYTES_KEY = CONFIG_NAMESPACE + "." + "memAllocBytes";
public static final int DEFAULT_MEM_ALLOC_BYTES = 8;
public static final String THROW_EXCEPTION = CONFIG_NAMESPACE + "." + "throwException";
public static final boolean DEFAULT_THROW_EXCEPTION = false;

private static final long INVALID_TIME = -1;

Expand Down Expand Up @@ -94,6 +96,7 @@ public static class ExtractorImpl implements Extractor<String, byte[]> {
private final int numRecords;
private final int memAllocBytes;
private final Random random;
private final boolean throwException;

public ExtractorImpl(WorkUnitState state) {
this.random = new Random();
Expand All @@ -113,6 +116,7 @@ public ExtractorImpl(WorkUnitState state) {
// num records only takes effect if the duration is not specified
this.numRecords = this.endTime == INVALID_TIME ? state.getPropAsInt(NUM_RECORDS_KEY, DEFAULT_NUM_RECORDS) : 0;
this.memAllocBytes = state.getPropAsInt(MEM_ALLOC_BYTES_KEY, DEFAULT_MEM_ALLOC_BYTES);
this.throwException = state.getPropAsBoolean(THROW_EXCEPTION, DEFAULT_THROW_EXCEPTION);
}

@Override
Expand All @@ -134,6 +138,9 @@ public byte[] readRecord(byte[] reuse) throws DataRecordException, IOException {
// If an end time is configured then it is used as the stopping point otherwise the record count limit is used
if ((this.endTime != INVALID_TIME && System.currentTimeMillis() > this.endTime) ||
(this.numRecords > 0 && this.recordsEmitted >= this.numRecords)) {
if (this.throwException) {
throw new IOException("This is one test exception");
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,37 @@ public void testRunDuration() throws DataRecordException, IOException {
Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) < (1000000),
"Time spent " + timeSpentMicro);
}

@Test
public void testThrowException() throws DataRecordException, IOException {
final int MEM_ALLOC_BYTES = 100;
final int NUM_WORK_UNITS = 1;
final int SLEEP_TIME_MICRO = 1000;
final int NUM_RECORDS = 30; // this config is ignored since the duration is set
final int RUN_DURATION_SECS = 5;

SourceState state = new SourceState();
state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO);
state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
state.setProp(StressTestingSource.RUN_DURATION_KEY, RUN_DURATION_SECS);
state.setProp(StressTestingSource.THROW_EXCEPTION, true);

StressTestingSource source = new StressTestingSource();

List<WorkUnit> wus = source.getWorkunits(state);
Assert.assertEquals(wus.size(), NUM_WORK_UNITS);

WorkUnit wu = wus.get(0);
WorkUnitState wuState = new WorkUnitState(wu, state);
Extractor<String, byte[]> extractor = source.getExtractor(wuState);

Assert.expectThrows(IOException.class, () -> {
byte[] record;
while ((record = extractor.readRecord(null)) != null) {
Assert.assertEquals(record.length, 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, but for clarity I might replace with: Assert.fail("should have thrown!")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I want to test whether the job can fail as expected. But assert.fail will not test anything and directly fail the test which is not what I want. Please let me know if I miss something here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured with THROW_EXCEPTION == true it would throw in the first extractor.readRecord(null) call. therefore the Assert.fail immediately alerts us to begin debugging if that doesn't happen. it's also self-documenting, so future maintainers know, "that loop body should never run".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually... looks like it throws just prior to the (final) extractor.readRecord(null) call that would first return null. so in that case, yes, leave as is and ignore my Assert.fail recommendation.

}
});
}
}