Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1569] Add RDBMS-backed MysqlJobCatalog, as alternative to file system storage #3421

Merged
merged 5 commits into from
Nov 3, 2021

Conversation

phet
Copy link
Contributor

@phet phet commented Oct 29, 2021

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):

Add RDBMS-backed MysqlJobCatalog, as alternative to file system storage

Present storage bindings for JobCatalog are in-memory or file system. While the latter is durable, it requires an NFS-like capability for cross-instance sharing.

Support alternative multi-instance backing through MySQL, precluding the need for NFS.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

New unit tests

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

phet added 2 commits October 25, 2021 18:09
* upstream/master:
  Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (apache#3414)
  [GOBBLIN-1563]Collect more information to analyze the RC for some job cannot emit kafka events to update job status (apache#3416)
  [GOBBLIN-1521] Create local mode of streaming kafka job to help user quickly onboard (apache#3372)
  [GOBBLIN-1559] Support wildcard for input paths (apache#3410)
  [GOBBLIN-1561]Improve error message when flow compilation fails (apache#3412)
  [GOBBLIN-1556]Add shutdown logic in FsJobConfigurationManager (apache#3407)
  [GOBBLIN-1542] Integrate with Helix API to add/remove task from a running helix job (apache#3393)
Comment on lines +67 to +70
// for backward compatibility... (is this needed for `JobSpec`, or only for (inspiration) `FlowSpec`???)
properties = new Properties();
try {
properties.load(new StringReader(jsonObject.get(JobSpecSerializer.JOB_SPEC_CONFIG_AS_PROPERTIES_KEY).getAsString()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this originated (with the comment "for backward compatibility") in FlowSpecDeserializer... is it necessary here too or a historical quirk specific to the former?

@codecov-commenter
Copy link

codecov-commenter commented Oct 29, 2021

Codecov Report

Attention: Patch coverage is 57.43879% with 226 lines in your changes missing coverage. Please review.

Project coverage is 46.55%. Comparing base (384dc09) to head (7175a4a).
Report is 573 commits behind head on master.

Files with missing lines Patch % Lines
...pache/gobblin/cluster/GobblinHelixJobLauncher.java 49.51% 52 Missing ⚠️
...ache/gobblin/runtime/api/FlowSpecSearchObject.java 46.77% 17 Missing and 16 partials ⚠️
...e/gobblin/runtime/job_catalog/MysqlJobCatalog.java 60.86% 24 Missing and 3 partials ⚠️
...in/java/org/apache/gobblin/cluster/HelixUtils.java 0.00% 17 Missing ⚠️
...rg/apache/gobblin/runtime/AbstractJobLauncher.java 0.00% 11 Missing and 3 partials ⚠️
.../java/org/apache/gobblin/runtime/api/FlowSpec.java 7.69% 12 Missing ⚠️
...obblin/runtime/spec_serde/JobSpecDeserializer.java 47.82% 11 Missing and 1 partial ⚠️
...blin/service/FlowConfigV2ResourceLocalHandler.java 0.00% 11 Missing ⚠️
...gobblin/runtime/spec_store/MysqlBaseSpecStore.java 91.58% 8 Missing and 1 partial ⚠️
...org/apache/gobblin/stream/WorkUnitChangeEvent.java 0.00% 6 Missing ⚠️
... and 9 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3421      +/-   ##
============================================
+ Coverage     46.52%   46.55%   +0.02%     
- Complexity    10246    10302      +56     
============================================
  Files          2063     2070       +7     
  Lines         80475    80751     +276     
  Branches       8989     9009      +20     
============================================
+ Hits          37443    37591     +148     
- Misses        39567    39684     +117     
- Partials       3465     3476      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines 49 to 61
try {
templateURI = Optional.ofNullable(jsonObject.get(JobSpecSerializer.JOB_SPEC_TEMPLATE_URI_KEY).getAsString()).map(s -> {
try {
return new URI(s);
} catch (URISyntaxException e) {
LOGGER.warn(String.format("error deserializing '%s' as a URI",
jsonObject.get(JobSpecSerializer.JOB_SPEC_TEMPLATE_URI_KEY).getAsString()), e);
throw new RuntimeException(e);
}
});
} catch (RuntimeException e) {
templateURI = Optional.empty();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map function seems unnecessarily complicated, couldn't we just do:

    try {
      templateURI = Optional.ofNullable(new URI(jsonObject.get(JobSpecSerializer.JOB_SPEC_TEMPLATE_URI_KEY).getAsString()));
    } catch (URISyntaxException e) {
       LOGGER.warn(String.format("error deserializing '%s' as a URI",
              jsonObject.get(JobSpecSerializer.JOB_SPEC_TEMPLATE_URI_KEY).getAsString()), e);
      templateURI = Optional.empty();
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first off, I agree it's unsatisfying. (I welcome any tips toward finessing java's checked exceptions!)

the transformation you suggest doesn't protect against passing null to the URI ctor. that's why I was working (via map) within the Optional. still, you did helped spot a NPE bug from the parens closed in the wrong place. we need:

Optional.ofNullable(jsonObj.get(...)).map

not the potentially explosive:

Optional.ofNullable(jsonObj.get(...).getAsString()).map

in addition, I'll use Optional.flatMap to eliminate the second level of exception handling. I'll push those corrections, and you please tell me if you see any other potential improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I see, yeah I think the current way looks fine, it is good to not have the two levels of exception handling.

public class MysqlJobCatalog extends JobCatalogBase implements MutableJobCatalog {

private static final Logger LOGGER = LoggerFactory.getLogger(MysqlJobCatalog.class);
public static final String DB_CONFIG_PREFIX = "mysqlJobCatalog";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name does not match the value it holds, right? how about MYSQL_JOB_CATALOG or similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's named after its purpose/function, to read clearly when used; e.g. from the unit test:

    Config config = ConfigBuilder.create()
        .addPrimitive(ConfigurationKeys.METRICS_ENABLED_KEY, "true")
        .addPrimitive(MysqlJobCatalog.DB_CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
...

what do you think?

JobSpec jobSpec = (JobSpec) jobSpecStore.getSpec(jobURI);
jobSpecStore.deleteSpec(jobURI);
this.mutableMetrics.updateRemoveJobTime(startTime);
this.listeners.onDeleteJob(jobURI, jobSpec.getVersion());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems supporting a feature (spec versioning) which may have bugs, and is not being used is costing us more spec store calls. Should we remove this in future, or at least provide a version of APIs without the version (that passes to the existing API with a default version)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lack the history on this one. I'm also not following what additional spec store calls arise as a result.
(overall I'm not opposed to streamlining the functionality on offer...)
please either provide more background context, or, if that effort would be a separate follow-on, we could save the discussion for later.

/**
* {@link SpecSerDe} that serializes as Json using {@link Gson}. Note that currently only {@link JobSpec}s are supported.
*/
public class GsonJobSpecSerDe implements SpecSerDe {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There must be a way to reuse GsonFlowSpecSerDe , can we use generics in the interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion... I guess I took the prior impl at face value, but I was able to re-work w/ generics into an abstract base class doing the heavy lifting.

since java lacks a type class resolution capability, we still need to create a specific, named derivation to supply the serializer and the deserializer. handily, that non-generic class name also imparts the ability to refer to the type within HOCON or even simple properties-based config.

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@ZihanLi58 ZihanLi58 merged commit b1aed5c into apache:master Nov 3, 2021
arjun4084346 pushed a commit to arjun4084346/gobblin that referenced this pull request Nov 4, 2021
…file system storage (apache#3421)

* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage

* Streamline `JobSpecDeserializer` error handling, on review feedback.

* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.

* Fix javadoc slipup
arjun4084346 pushed a commit to arjun4084346/gobblin that referenced this pull request Nov 4, 2021
…file system storage (apache#3421)

* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage

* Streamline `JobSpecDeserializer` error handling, on review feedback.

* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.

* Fix javadoc slipup
umustafi pushed a commit to umustafi/gobblin that referenced this pull request Nov 22, 2021
…file system storage (apache#3421)

* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage

* Streamline `JobSpecDeserializer` error handling, on review feedback.

* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.

* Fix javadoc slipup
ZihanLi58 pushed a commit to ZihanLi58/incubator-gobblin that referenced this pull request Nov 24, 2021
…file system storage (apache#3421)

* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage

* Streamline `JobSpecDeserializer` error handling, on review feedback.

* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.

* Fix javadoc slipup
ZihanLi58 added a commit that referenced this pull request Nov 29, 2021
…asedSource (#3436)

* [GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource

* add unit test

* address comments to make high/low watermark optional

* Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (#3414)

* Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s

* Add missing file, `MysqlNonFlowSpecStoreTest`

* Fixup `MysqlNonFlowSpecStoreTest`

* Simplify implementaiton of `MysqlSpecStore.getSpecsImpl`.

* Rename `MysqlNonFlowSpecStore` to `MysqlBaseFlowSpecStore`.

* Aid maintainers with additional code comments

* [GOBBLIN-1557] Make KafkaSource getFilteredTopics method protected (#3408)

The method was originally private, and it is useful to be able to
override it in subclasses, to redefine how to get topics to be processed.

Change-Id: If94cda2f7a5e65e52e2453427c60f4abb932b3f8

* [GOBBLIN-1567] do not set a custom maxConnLifetime for sql connection (#3418)

* do not set a custom maxConnLifetime for sql connection

* address review comment

* [GOBBLIN-1568] Exponential backoff for Salesforce bulk api polling (#3420)

* Exponential backoff for Salesforce bulk api polling

* Read min and max wait time from prop with default

* set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention job can use this information (#3422)

* [GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage (#3421)

* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage

* Streamline `JobSpecDeserializer` error handling, on review feedback.

* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.

* Fix javadoc slipup

* Tag metrics with proxy url if available (#3423)

* remove use of deprecated helix class (#3424)

codestyle changes

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container (#3419)

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container

* address comments

* change the way we set low watermark to have a better indicate for the watermark range of the snapshot

* address comments

* fix test error

* [GOBBLIN-1552] determine flow status correctly when dag manager is disabled (#3403)

* determine flow status based on the fact if dag manager is enabled
this is needed because when dag manager is not enabled, flow level events are not emitted
and cannot be used to determine flow status. in that case flow status has to be determined
by using job statuses.

store flow status in the FlowStatus

* address review comments

* address review comments

* removed a commented line

* [GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc  and fix a sync… (#3415)

* codestyle changes, typo corrections, improved javadoc  and fix a synchronization issue

* address review comments

* add review comments

* address review comments

* address review comments

* fix bugsFixMain

* do not delete data while dropping a hive table because data is deleted, if needed, separately (#3431)

* [GOBBLIN-1574] Added whitelist for iceberg tables to add new partitio… (#3426)

* [GOBBLIN-1574] Added whitelist for iceberg tables to add new partition column

* fix to failing test case

* Updated IncebergMetadataWriterTest to blacklist the test table from non-completeness tests

* moved dataset name update in tablemetadata

* Added newPartition checks in Table Metadata

* Fixed test case to include new_parition_enabled

Co-authored-by: Vikram Bohra <[email protected]>

* [GOBBLIN-1577] change the multiplier used in ExponentialWaitStrategy to a reasonable… (#3430)

* change the multiplier used in ExponentialWaitStrategy to 1 second. old multiplier 2ms was retrying too fast for some use cases

* .

* [GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (#3432)

* [hotfix] workaround to catch exception when iceberg does not support get metrics for non-union type

* address comments

* [GOBBLIN-1580]Check table exists instead of call create table directly to make sure table exists

* [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline (#3425)

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container

* address comments

* address comments

* [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline

* [GOBBLIN-1576] skip appending record count to staging file if present… (#3429)

* [GOBBLIN-1576] skip appending record count to staging file if present already

* fixed checkstyle

* fixed method

Co-authored-by: Vikram Bohra <[email protected]>

* fix the NPE in dagManager

* fix quota check issue in dagManager

* address comments

Co-authored-by: Kip Kohn <[email protected]>
Co-authored-by: Joseph Allemandou <[email protected]>
Co-authored-by: Arjun Singh Bora <[email protected]>
Co-authored-by: Jiashuo Wang <[email protected]>
Co-authored-by: William Lo <[email protected]>
Co-authored-by: vbohra <[email protected]>
Co-authored-by: Vikram Bohra <[email protected]>
jack-moseley pushed a commit to jack-moseley/gobblin that referenced this pull request Aug 24, 2022
…file system storage (apache#3421)

* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage

* Streamline `JobSpecDeserializer` error handling, on review feedback.

* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.

* Fix javadoc slipup
jack-moseley pushed a commit to jack-moseley/gobblin that referenced this pull request Aug 24, 2022
…asedSource (apache#3436)

* [GOBBLIN-1582] Fill low/high watermark info in SourceState for QueryBasedSource

* add unit test

* address comments to make high/low watermark optional

* Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (apache#3414)

* Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s

* Add missing file, `MysqlNonFlowSpecStoreTest`

* Fixup `MysqlNonFlowSpecStoreTest`

* Simplify implementaiton of `MysqlSpecStore.getSpecsImpl`.

* Rename `MysqlNonFlowSpecStore` to `MysqlBaseFlowSpecStore`.

* Aid maintainers with additional code comments

* [GOBBLIN-1557] Make KafkaSource getFilteredTopics method protected (apache#3408)

The method was originally private, and it is useful to be able to
override it in subclasses, to redefine how to get topics to be processed.

Change-Id: If94cda2f7a5e65e52e2453427c60f4abb932b3f8

* [GOBBLIN-1567] do not set a custom maxConnLifetime for sql connection (apache#3418)

* do not set a custom maxConnLifetime for sql connection

* address review comment

* [GOBBLIN-1568] Exponential backoff for Salesforce bulk api polling (apache#3420)

* Exponential backoff for Salesforce bulk api polling

* Read min and max wait time from prop with default

* set RETENTION_DATASET_ROOT in CleanableIcebergDataset so that any retention job can use this information (apache#3422)

* [GOBBLIN-1569] Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage (apache#3421)

* Add RDBMS-backed `MysqlJobCatalog`, as alternative to file system storage

* Streamline `JobSpecDeserializer` error handling, on review feedback.

* Refactor `GsonJobSpecSerDe` into a reusable `GenericGsonSpecSerDe`.

* Fix javadoc slipup

* Tag metrics with proxy url if available (apache#3423)

* remove use of deprecated helix class (apache#3424)

codestyle changes

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container (apache#3419)

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container

* address comments

* change the way we set low watermark to have a better indicate for the watermark range of the snapshot

* address comments

* fix test error

* [GOBBLIN-1552] determine flow status correctly when dag manager is disabled (apache#3403)

* determine flow status based on the fact if dag manager is enabled
this is needed because when dag manager is not enabled, flow level events are not emitted
and cannot be used to determine flow status. in that case flow status has to be determined
by using job statuses.

store flow status in the FlowStatus

* address review comments

* address review comments

* removed a commented line

* [GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc  and fix a sync… (apache#3415)

* codestyle changes, typo corrections, improved javadoc  and fix a synchronization issue

* address review comments

* add review comments

* address review comments

* address review comments

* fix bugsFixMain

* do not delete data while dropping a hive table because data is deleted, if needed, separately (apache#3431)

* [GOBBLIN-1574] Added whitelist for iceberg tables to add new partitio… (apache#3426)

* [GOBBLIN-1574] Added whitelist for iceberg tables to add new partition column

* fix to failing test case

* Updated IncebergMetadataWriterTest to blacklist the test table from non-completeness tests

* moved dataset name update in tablemetadata

* Added newPartition checks in Table Metadata

* Fixed test case to include new_parition_enabled

Co-authored-by: Vikram Bohra <[email protected]>

* [GOBBLIN-1577] change the multiplier used in ExponentialWaitStrategy to a reasonable… (apache#3430)

* change the multiplier used in ExponentialWaitStrategy to 1 second. old multiplier 2ms was retrying too fast for some use cases

* .

* [GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (apache#3432)

* [hotfix] workaround to catch exception when iceberg does not support get metrics for non-union type

* address comments

* [GOBBLIN-1580]Check table exists instead of call create table directly to make sure table exists

* [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline (apache#3425)

* [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container

* address comments

* address comments

* [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline

* [GOBBLIN-1576] skip appending record count to staging file if present… (apache#3429)

* [GOBBLIN-1576] skip appending record count to staging file if present already

* fixed checkstyle

* fixed method

Co-authored-by: Vikram Bohra <[email protected]>

* fix the NPE in dagManager

* fix quota check issue in dagManager

* address comments

Co-authored-by: Kip Kohn <[email protected]>
Co-authored-by: Joseph Allemandou <[email protected]>
Co-authored-by: Arjun Singh Bora <[email protected]>
Co-authored-by: Jiashuo Wang <[email protected]>
Co-authored-by: William Lo <[email protected]>
Co-authored-by: vbohra <[email protected]>
Co-authored-by: Vikram Bohra <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants