Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource #3436

Merged
merged 24 commits into from
Nov 29, 2021

Conversation

ZihanLi58
Copy link
Contributor

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Fill low/high watermark info in SourceState for QueryBasedSource
    Foe stress test class, add function to make the job always fail

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    unit tests

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Comment on lines 250 to 253
state.appendToListProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD,
String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, highestWaterMark));
state.appendToListProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD,
String.format("%s.%s: %s", sourceEntity.getDatasetName(), sourceEntity.destTableName, lowestWaterMark));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could partitions ever be empty? if so, I recommend lowest, highest to be Optional and only write conditionally, rather than setting the properties to MAX_VALUE and -1.

Comment on lines +81 to +82
jobMetadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, ""));
jobMetadataBuilder.put(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, ""));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may not be an issue... just curious: when do we use "" and when instead UNKNOWN_VALUE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me I think UNKONW_VALUE is not expected in most cases and indicate there is something wrong, but "" just indicate we don't set it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable

Assert.expectThrows(IOException.class, () -> {
byte[] record;
while ((record = extractor.readRecord(null)) != null) {
Assert.assertEquals(record.length, 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, but for clarity I might replace with: Assert.fail("should have thrown!")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I want to test whether the job can fail as expected. But assert.fail will not test anything and directly fail the test which is not what I want. Please let me know if I miss something here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured with THROW_EXCEPTION == true it would throw in the first extractor.readRecord(null) call. therefore the Assert.fail immediately alerts us to begin debugging if that doesn't happen. it's also self-documenting, so future maintainers know, "that loop body should never run".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually... looks like it throws just prior to the (final) extractor.readRecord(null) call that would first return null. so in that case, yes, leave as is and ignore my Assert.fail recommendation.

@codecov-commenter
Copy link

codecov-commenter commented Nov 23, 2021

Codecov Report

Merging #3436 (54d15f3) into master (9053c3a) will decrease coverage by 0.00%.
The diff coverage is 21.05%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #3436      +/-   ##
============================================
- Coverage     46.57%   46.56%   -0.01%     
+ Complexity    10326    10321       -5     
============================================
  Files          2072     2072              
  Lines         80849    80865      +16     
  Branches       9020     9025       +5     
============================================
+ Hits          37655    37657       +2     
- Misses        39716    39733      +17     
+ Partials       3478     3475       -3     
Impacted Files Coverage Δ
...lin/source/extractor/extract/QueryBasedSource.java 40.37% <0.00%> (-1.79%) ⬇️
...blin/service/modules/orchestration/DagManager.java 71.00% <0.00%> (-0.23%) ⬇️
.../apache/gobblin/util/test/StressTestingSource.java 82.97% <66.66%> (+14.79%) ⬆️
...he/gobblin/runtime/JobExecutionEventSubmitter.java 100.00% <100.00%> (ø)
...in/java/org/apache/gobblin/cluster/HelixUtils.java 28.26% <0.00%> (-5.08%) ⬇️
.../modules/scheduler/GobblinServiceJobScheduler.java 63.68% <0.00%> (-1.50%) ⬇️
...lin/util/filesystem/FileSystemInstrumentation.java 100.00% <0.00%> (+7.14%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9053c3a...54d15f3. Read the comment docs.

@@ -241,6 +244,14 @@ public int hashCode() {
addLineageSourceInfo(state, sourceEntity, workunit);
partition.serialize(workunit);
workUnits.add(workunit);
highestWaterMark = highestWaterMark.isPresent() ?
Optional.of(Math.max(highestWaterMark.get(), partition.getHighWatermark())) : Optional.of(partition.getHighWatermark());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional.map would be more idiomatic

Comment on lines +81 to +82
jobMetadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, ""));
jobMetadataBuilder.put(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, ""));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable

Assert.expectThrows(IOException.class, () -> {
byte[] record;
while ((record = extractor.readRecord(null)) != null) {
Assert.assertEquals(record.length, 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured with THROW_EXCEPTION == true it would throw in the first extractor.readRecord(null) call. therefore the Assert.fail immediately alerts us to begin debugging if that doesn't happen. it's also self-documenting, so future maintainers know, "that loop body should never run".

phet and others added 20 commits November 24, 2021 13:25
…re` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (apache#3414)

* Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s

* Add missing file, `MysqlNonFlowSpecStoreTest`

* Fixup `MysqlNonFlowSpecStoreTest`

* Simplify implementaiton of `MysqlSpecStore.getSpecsImpl`.

* Rename `MysqlNonFlowSpecStore` to `MysqlBaseFlowSpecStore`.

* Aid maintainers with additional code comments
…pache#3408)

The method was originally private, and it is useful to be able to
override it in subclasses, to redefine how to get topics to be processed.

Change-Id: If94cda2f7a5e65e52e2453427c60f4abb932b3f8
…apache#3418)

* do not set a custom maxConnLifetime for sql connection

* address review comment
…pache#3420)

* Exponential backoff for Salesforce bulk api polling

* Read min and max wait time from prop with default
…file system storage (apache#3421)

* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage

* Streamline `JobSpecDeserializer` error handling, on review feedback.

* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.

* Fix javadoc slipup
…e will not affect other topics in the same container (apache#3419)

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container

* address comments

* change the way we set low watermark to have a better indicate for the watermark range of the snapshot

* address comments

* fix test error
…sabled (apache#3403)

* determine flow status based on the fact if dag manager is enabled
this is needed because when dag manager is not enabled, flow level events are not emitted
and cannot be used to determine flow status. in that case flow status has to be determined
by using job statuses.

store flow status in the FlowStatus

* address review comments

* address review comments

* removed a commented line
…and fix a sync… (apache#3415)

* codestyle changes, typo corrections, improved javadoc  and fix a synchronization issue

* address review comments

* add review comments

* address review comments

* address review comments

* fix bugsFixMain
apache#3426)

* [GOBBLIN-1574] Added whitelist for iceberg tables to add new partition column

* fix to failing test case

* Updated IncebergMetadataWriterTest to blacklist the test table from non-completeness tests

* moved dataset name update in tablemetadata

* Added newPartition checks in Table Metadata

* Fixed test case to include new_parition_enabled

Co-authored-by: Vikram Bohra <[email protected]>
…to a reasonable… (apache#3430)

* change the multiplier used in ExponentialWaitStrategy to 1 second. old multiplier 2ms was retrying too fast for some use cases

* .
…ly to make sure table exists (apache#3432)

* [hotfix] workaround to catch exception when iceberg does not support get metrics for non-union type

* address comments

* [GOBBLIN-1580]Check table exists instead of call create table directly to make sure table exists
…ne (apache#3425)

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container

* address comments

* address comments

* [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline
apache#3429)

* [GOBBLIN-1576] skip appending record count to staging file if present already

* fixed checkstyle

* fixed method

Co-authored-by: Vikram Bohra <[email protected]>
@@ -241,6 +244,14 @@ public int hashCode() {
addLineageSourceInfo(state, sourceEntity, workunit);
partition.serialize(workunit);
workUnits.add(workunit);
highestWaterMark = highestWaterMark.isPresent() ?
highestWaterMark.transform(l -> Math.max(l, partition.getHighWatermark())) : Optional.of(partition.getHighWatermark());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*nit maybe l should be renamed to hw ?
and in next line l should be `lw ?

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work, zihan!

@ZihanLi58 ZihanLi58 merged commit 374fd6f into apache:master Nov 29, 2021
jack-moseley pushed a commit to jack-moseley/gobblin that referenced this pull request Aug 24, 2022
…asedSource (apache#3436)

* [GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource

* add unit test

* address comments to make high/low watermark optional

* Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (apache#3414)

* Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s

* Add missing file, `MysqlNonFlowSpecStoreTest`

* Fixup `MysqlNonFlowSpecStoreTest`

* Simplify implementaiton of `MysqlSpecStore.getSpecsImpl`.

* Rename `MysqlNonFlowSpecStore` to `MysqlBaseFlowSpecStore`.

* Aid maintainers with additional code comments

* [GOBBLIN-1557] Make KafkaSource getFilteredTopics method protected (apache#3408)

The method was originally private, and it is useful to be able to
override it in subclasses, to redefine how to get topics to be processed.

Change-Id: If94cda2f7a5e65e52e2453427c60f4abb932b3f8

* [GOBBLIN-1567] do not set a custom maxConnLifetime for sql connection (apache#3418)

* do not set a custom maxConnLifetime for sql connection

* address review comment

* [GOBBLIN-1568] Exponential backoff for Salesforce bulk api polling (apache#3420)

* Exponential backoff for Salesforce bulk api polling

* Read min and max wait time from prop with default

* set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention job can use this information (apache#3422)

* [GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage (apache#3421)

* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage

* Streamline `JobSpecDeserializer` error handling, on review feedback.

* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.

* Fix javadoc slipup

* Tag metrics with proxy url if available (apache#3423)

* remove use of deprecated helix class (apache#3424)

codestyle changes

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container (apache#3419)

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container

* address comments

* change the way we set low watermark to have a better indicate for the watermark range of the snapshot

* address comments

* fix test error

* [GOBBLIN-1552] determine flow status correctly when dag manager is disabled (apache#3403)

* determine flow status based on the fact if dag manager is enabled
this is needed because when dag manager is not enabled, flow level events are not emitted
and cannot be used to determine flow status. in that case flow status has to be determined
by using job statuses.

store flow status in the FlowStatus

* address review comments

* address review comments

* removed a commented line

* [GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc  and fix a sync… (apache#3415)

* codestyle changes, typo corrections, improved javadoc  and fix a synchronization issue

* address review comments

* add review comments

* address review comments

* address review comments

* fix bugsFixMain

* do not delete data while dropping a hive table because data is deleted, if needed, separately (apache#3431)

* [GOBBLIN-1574] Added whitelist for iceberg tables to add new partitio… (apache#3426)

* [GOBBLIN-1574] Added whitelist for iceberg tables to add new partition column

* fix to failing test case

* Updated IncebergMetadataWriterTest to blacklist the test table from non-completeness tests

* moved dataset name update in tablemetadata

* Added newPartition checks in Table Metadata

* Fixed test case to include new_parition_enabled

Co-authored-by: Vikram Bohra <[email protected]>

* [GOBBLIN-1577] change the multiplier used in ExponentialWaitStrategy to a reasonable… (apache#3430)

* change the multiplier used in ExponentialWaitStrategy to 1 second. old multiplier 2ms was retrying too fast for some use cases

* .

* [GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (apache#3432)

* [hotfix] workaround to catch exception when iceberg does not support get metrics for non-union type

* address comments

* [GOBBLIN-1580]Check table exists instead of call create table directly to make sure table exists

* [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline (apache#3425)

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container

* address comments

* address comments

* [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline

* [GOBBLIN-1576] skip appending record count to staging file if present… (apache#3429)

* [GOBBLIN-1576] skip appending record count to staging file if present already

* fixed checkstyle

* fixed method

Co-authored-by: Vikram Bohra <[email protected]>

* fix the NPE in dagManager

* fix quota check issue in dagManager

* address comments

Co-authored-by: Kip Kohn <[email protected]>
Co-authored-by: Joseph Allemandou <[email protected]>
Co-authored-by: Arjun Singh Bora <[email protected]>
Co-authored-by: Jiashuo Wang <[email protected]>
Co-authored-by: William Lo <[email protected]>
Co-authored-by: vbohra <[email protected]>
Co-authored-by: Vikram Bohra <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants