Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1703] avoid double quota increase for adhoc flows #3550

Merged
merged 11 commits into from
Oct 3, 2022

Conversation

arjun4084346
Copy link
Contributor

@arjun4084346 arjun4084346 commented Sep 7, 2022

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    When a gaas flow request comes, resource handler checks the quota right there.
    However, if the flow has runImmediately=true, the quota will be checked again when the first job starts. This should be avoided.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@codecov-commenter
Copy link

codecov-commenter commented Sep 7, 2022

Codecov Report

Merging #3550 (30049f6) into master (ae2ea19) will decrease coverage by 0.89%.
The diff coverage is 46.50%.

@@             Coverage Diff              @@
##             master    #3550      +/-   ##
============================================
- Coverage     47.76%   46.86%   -0.90%     
- Complexity     8557    10617    +2060     
============================================
  Files          1705     2111     +406     
  Lines         64976    82667   +17691     
  Branches       7036     9202    +2166     
============================================
+ Hits          31035    38742    +7707     
- Misses        31240    40363    +9123     
- Partials       2701     3562     +861     
Impacted Files Coverage Δ
.../org/apache/gobblin/service/ServiceConfigKeys.java 0.00% <ø> (ø)
...gobblin/runtime/spec_store/MysqlBaseSpecStore.java 89.16% <ø> (ø)
...lin/service/modules/flow/MultiHopFlowCompiler.java 67.25% <0.00%> (-1.62%) ⬇️
...odules/orchestration/AbstractUserQuotaManager.java 100.00% <ø> (+22.85%) ⬆️
...ervice/modules/flow/BaseFlowToJobSpecCompiler.java 60.68% <12.50%> (-3.54%) ⬇️
...e/modules/orchestration/MysqlUserQuotaManager.java 47.34% <31.38%> (-26.53%) ⬇️
...blin/service/modules/orchestration/DagManager.java 82.09% <50.00%> (-0.14%) ⬇️
...odules/orchestration/InMemoryUserQuotaManager.java 70.07% <73.03%> (+5.97%) ⬆️
.../modules/scheduler/GobblinServiceJobScheduler.java 64.62% <100.00%> (ø)
...e/gobblin/service/monitoring/GitConfigMonitor.java 95.23% <0.00%> (-4.77%) ⬇️
... and 412 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

quotaManager.get().checkQuota(dag.getNodes().get(0));
((FlowSpec) addedSpec).getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense if we added this in createFlowSpecForConfig() under FlowConfigLocalResourceHandler.java? Seems like this property can be used in a lot of areas outside of just this feature

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that function notifies the addSpec immediately after so I'm not sure we will use the value directly anywhere else. However, if we do use the in memory value of the Spec in other places, this property will be useful to add there. Btw shouldn't this value be set to false after the first iteration of an adhoc flow in the DagManager or Scheduler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the FlowSpec created by createFlowSpecForConfig will be persisted in the spec store. If we add the config there, it will be persisted permanently in the spec store. And then we will not be able to distinguish if it is a scheduled run or the adhoc run of a "schedule with runImmediately=true" kind of flows.
cc @ZihanLi58

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@umustafi the if block inside which this statement is present, will be false for the second iteration.
!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))
run immediately is set to false on service restart and if job schedule key is absent , the flow would be deleted from the spec store after the 1st iteration.

I will add a comment there explaining this.

@@ -335,7 +335,9 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
if (quotaManager.isPresent()) {
// QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
try {
// todo : we should probably check quota for all of the start nodes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do u mean this for all hops in multi-hop flow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean for all the start nodes, not for all the nodes.
So, a Dag has n nodes, some of which can be startNodes, that means startNodes can start concurrently. So It makes more sense to check quota on all the start nodes when the ad hoc flow request comes, instead of just checking for it for the node with 0th index.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think quota is at flow level but not job level? If that's the case, checking once should be enough?

quotaManager.get().checkQuota(dag.getNodes().get(0));
((FlowSpec) addedSpec).getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that function notifies the addSpec immediately after so I'm not sure we will use the value directly anywhere else. However, if we do use the in memory value of the Spec in other places, this property will be useful to add there. Btw shouldn't this value be set to false after the first iteration of an adhoc flow in the DagManager or Scheduler?

@arjun4084346 arjun4084346 changed the title avoid double quota increase for adhoc flows [GOBBLIN-1703] avoid double quota increase for adhoc flows Sep 8, 2022
quotaManager.checkQuota(dagNode);
if (!Boolean.parseBoolean(dagNode.getValue().getJobSpec().getConfigAsProperties().getProperty(
ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "false"))) {
quotaManager.checkQuota(dagNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering do you want to put the flow id into running map of quota manager without increasing quota even it's adhoc flow? I feel it's necessary or for the next job in the dag, when you call checkQuota, you will increase the quota again? Am I missing anything here?

String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY,
ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE);

BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this way, you will use "state.store.db.url" as the key to mysql db, which is too general. I will suggest to check how mysqlSpecStore handle that, we should have a config prefix to get the config related to this mysql quota manager.

@@ -332,10 +332,13 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
// Check quota limits against run immediately flows or adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer when we accept the flow
if (!this.warmStandbyEnabled && (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
// This block should be reachable only for the first execution for the adhoc flows (flows that either do not have a schedule or have runImmediately=true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a reminder: As on line 334 we checked whether this.warmStandbyEnabled should set to be false, this will only be reached in the old mode, where we didn't enable message forwarding.
You should check quota on the compiler onAddSpec method when we enable message forwarding

@@ -335,7 +335,9 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
if (quotaManager.isPresent()) {
// QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
try {
// todo : we should probably check quota for all of the start nodes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think quota is at flow level but not job level? If that's the case, checking once should be enough?

@arjun4084346
Copy link
Contributor Author

Functionality of checking quota for every job is left unchanged that can be done in the other PR.
Handling parallel flow functionality is also left unchanged for the other PR.

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, main suggestion is we should come up with a way to make sure running dag map and quota table are consistent at any given time. We can even just have one table of (dagnodeId, proxyUser, group, requester) and do count to calculate the quota every time.

@@ -281,6 +281,13 @@ public Dag<JobExecutionPlan> compileFlow(Spec spec) {
Instrumented.markMeter(flowCompilationSuccessFulMeter);
Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

if (Boolean.parseBoolean(flowSpec.getConfigAsProperties().getProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW))) {
// todo : we should probably set it on all of the start nodes
for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getStartNodes()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we already change it to start nodes, should we remove the todo log on line 285 also change the logic in compiler onAddFlowSpec to check quota for all start nodes as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if we have running dag Ids, do we still need this? anyway we should be able to avoid double count using that map?

@@ -62,7 +80,7 @@ public void init(Collection<Dag<JobExecutionPlan>> dags) {
@Override
int incrementJobCount(String user, CountType countType) throws IOException {
try {
return this.mysqlStore.increaseCount(user, countType);
return this.quotaStore.increaseCount(user, countType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question here, shouldn't we change the increaseCount and addDagId to be one mysql transaction? Otherwise we will see discrepancy between these two table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably yes, but it was not being done together in InMemory version also.
Do you want me to change both?

DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
flowGroupCheck = flowGroupQuotaIncrement >= 0;
quotaCheck.setFlowGroupCheck(flowGroupCheck);
if (!flowGroupCheck) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For mysql quota, if we fail to check any of the quota type, shouldn't we directly rollback the change in this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean if user check fails, should we immediately stop without trying to check proxy/requesterService quota? I think we can, but the existing code would check all the quotas anyway to be able to form a complete error message (in requesterMessage) so I chose not to disturb that functionality

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean that, we still process to the end, but before commit the change, we check whether all three succeed, if not, instead of commit, we should revert the change at this point. But we still return all check result to user.

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this change, looks good to me overall, just one minor comment, please also fix the compile/test error

public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) throws IOException {
for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode);
if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to roll back for previous dag node as well in this case.

Copy link
Contributor

@ZihanLi58 ZihanLi58 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, thanks for addressing all my comments

@ZihanLi58 ZihanLi58 merged commit 0c3f6d1 into apache:master Oct 3, 2022
phet pushed a commit to phet/gobblin that referenced this pull request Dec 3, 2022
* avoid double quota increase for adhoc flows

* corrected some config names

* address review comments, made runningDagIds map implementation specific

* address review comments

* address review comments

* fix checkstyle

* make checking quota for multiple dag nodes atomic

* fix unit test

* remove unused code

* remove unused imports

* address review comments
phet added a commit to phet/gobblin that referenced this pull request Dec 5, 2022
* upstream/master:
  move dataset handler code before cleaning up staging data (apache#3594)
  [GOBBLIN-1730] Include flow execution id when try to cancel/submit job using SimpleKafkaSpecProducer (apache#3588)
  [GOBBLIN-1734] make DestinationDatasetHandler work on streaming sources (apache#3592)
  give option to cancel helix workflow through Delete API (apache#3580)
  [GOBBLIN-1728] Fix YarnService incorrect container allocation behavior (apache#3586)
  Support multiple node types in shared flowgraph, fix logs (apache#3590)
  Search for dummy file in writer directory (apache#3589)
  Use root cause for checking if exception is transient (apache#3585)
  [GOBBLIN-1724] Support a shared flowgraph layout in GaaS (apache#3583)
  [GOBBLIN-1731] Enable HiveMetadataWriter to override table schema lit… (apache#3587)
  [GOBBLIN-1726] Avro 1.9 upgrade of Gobblin OSS (apache#3581)
  [GOBBLIN-1725] Fix bugs in gaas warm standby mode (apache#3582)
  [GOBBLIN-1718] Define DagActionStoreMonitor to listen for kill/resume… (apache#3572)
  Add log line for committing/retrieving watermarks in streaming (apache#3578)
  [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta (apache#3575)
  Ignore AlreadyExistsException in hive writer (apache#3579)
  Fail GMIP container for known transient exceptions to avoid data loss (apache#3576)
  GOBBLIN-1715: Support vectorized row batch pooling (apache#3574)
  [GOBBLIN-1696] Implement file based flowgraph that detects changes to the underlying… (apache#3548)
  GOBBLIN-1719 Replace moveToTrash with moveToAppropriateTrash for hadoop trash (apache#3573)
  [GOBBLIN-1703] avoid double quota increase for adhoc flows (apache#3550)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants