Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1703] avoid double quota increase for adhoc flows #3550

Merged
merged 11 commits into from
Oct 3, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled";

public static final String GOBBLIN_SERVICE_ADHOC_FLOW = GOBBLIN_SERVICE_PREFIX + "adhoc.flow";

public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
Expand Down Expand Up @@ -146,6 +148,13 @@ public class ServiceConfigKeys {
public static final String QUOTA_MANAGER_CLASS = GOBBLIN_SERVICE_PREFIX + "quotaManager.class";
public static final String DEFAULT_QUOTA_MANAGER = "org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager";

public static final String QUOTA_STORE_DB_TABLE_KEY = "quota.store.db.table";
public static final String DEFAULT_QUOTA_STORE_DB_TABLE = "quota_table";

public static final String RUNNING_DAG_IDS_DB_TABLE_KEY = "running.dag.ids.store.db.table";
public static final String DEFAULT_RUNNING_DAG_IDS_DB_TABLE = "running_dag_ids";


// Group Membership authentication service
public static final String GROUP_OWNERSHIP_SERVICE_CLASS = GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = "org.apache.gobblin.service.NoopGroupOwnershipService";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.google.common.io.ByteStreams;
import com.typesafe.config.Config;

import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
Expand Down Expand Up @@ -133,7 +132,7 @@ public Spec extractSpec(ResultSet rs) throws SQLException, IOException {
}


protected final DataSource dataSource;
protected final BasicDataSource dataSource;
protected final String tableName;
private final URI specStoreURI;
protected final SpecSerDe specSerDe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Map;
import java.util.Properties;

import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,6 +92,11 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
@Setter
protected boolean active;

private boolean warmStandbyEnabled;

@Inject
UserQuotaManager userQuotaManager;

public BaseFlowToJobSpecCompiler(Config config){
this(config,true);
}
Expand Down Expand Up @@ -119,6 +126,8 @@ public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean in
this.dataAuthorizationTimer = Optional.absent();
}

this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);

this.topologySpecMap = Maps.newConcurrentMap();
this.config = config;

Expand Down Expand Up @@ -181,6 +190,17 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {

// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);

if (this.warmStandbyEnabled &&
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
try {
userQuotaManager.checkQuota(dag.getStartNodes());
flowSpec.getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
} catch (IOException e) {
throw new RuntimeException(e);
}
}

// If dag is null then a compilation error has occurred
if (dag != null && !dag.isEmpty()) {
response = dag.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ public Dag<JobExecutionPlan> compileFlow(Spec spec) {
Instrumented.markMeter(flowCompilationSuccessFulMeter);
Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

if (Boolean.parseBoolean(flowSpec.getConfigAsProperties().getProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW))) {
for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getStartNodes()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we already change it to start nodes, should we remove the todo log on line 285 also change the logic in compiler onAddFlowSpec to check quota for all start nodes as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if we have running dag Ids, do we still need this? anyway we should be able to avoid double count using that map?

dagNode.getValue().getJobSpec().getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
}
}

return jobExecutionPlanDag;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
package org.apache.gobblin.service.modules.orchestration;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
Expand All @@ -30,10 +27,6 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;


Expand All @@ -49,8 +42,7 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private final Map<String, Integer> perUserQuota;
private final Map<String, Integer> perFlowGroupQuota;
// TODO : we might want to make this field implementation specific to be able to decide if the dag is already running or have been accepted
Set<String> runningDagIds = ConcurrentHashMap.newKeySet();

private final int defaultQuota;

public AbstractUserQuotaManager(Config config) {
Expand All @@ -69,155 +61,13 @@ public AbstractUserQuotaManager(Config config) {
this.perFlowGroupQuota = flowGroupMapBuilder.build();
}

// Implementations should return the current count and increase them by one
abstract int incrementJobCount(String key, CountType countType) throws IOException;

abstract void decrementJobCount(String key, CountType countType) throws IOException;

public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode);

// Throw errors for reach quota at the end to avoid inconsistent job counts
if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) {
// roll back the increased counts in this block
rollbackIncrements(dagNode);
throw new QuotaExceededException(quotaCheck.requesterMessage);
}
}

private void rollbackIncrements(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
List<String> usersQuotaIncrement = DagManagerUtils.getDistinctUniqueRequesters(DagManagerUtils.getSerializedRequesterList(dagNode));

decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), CountType.USER_COUNT);
decrementQuotaUsageForUsers(usersQuotaIncrement);
decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
}

protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
// Dag is already being tracked, no need to double increment for retries and multihop flows
if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode).toString())) {
return quotaCheck;
} else {
runningDagIds.add(DagManagerUtils.generateDagId(dagNode).toString());
}

String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
StringBuilder requesterMessage = new StringBuilder();

boolean proxyUserCheck;

if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT);
proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds
quotaCheck.setProxyUserCheck(proxyUserCheck);
if (!proxyUserCheck) {
// add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment
requesterMessage.append(String.format(
"Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
}
}

String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;

if (dagNode.getValue().getCurrentAttempts() <= 1) {
List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
for (String requester : uniqueRequesters) {
int userQuotaIncrement = incrementJobCountAndCheckQuota(
DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT);
boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds
requesterCheck = requesterCheck && thisRequesterCheck;
quotaCheck.setRequesterCheck(requesterCheck);
if (!thisRequesterCheck) {
requesterMessage.append(String.format(
"Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
}
}
}

boolean flowGroupCheck;

if (dagNode.getValue().getCurrentAttempts() <= 1) {
int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
flowGroupCheck = flowGroupQuotaIncrement >= 0;
quotaCheck.setFlowGroupCheck(flowGroupCheck);
if (!flowGroupCheck) {
requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n",
flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup)));
}
}

quotaCheck.setRequesterMessage(requesterMessage.toString());

return quotaCheck;
}

/**
* Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
* Returns true if the dag existed in the set of running dags and was removed successfully
*/
public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
if (!val) {
return false;
}

String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
if (proxyUser != null) {
String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
decrementJobCount(proxyUserKey, CountType.USER_COUNT);
}

String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);

String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
try {
for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode);
decrementJobCount(requesterKey, CountType.REQUESTER_COUNT);
}
} catch (IOException e) {
log.error("Failed to release quota for requester list " + serializedRequesters, e);
return false;
}

return true;
}

private int incrementJobCountAndCheckQuota(String key, int keyQuota, CountType countType) throws IOException {
int currentCount = incrementJobCount(key, countType);
if (currentCount >= keyQuota) {
return -currentCount;
} else {
return currentCount;
}
}

private void decrementQuotaUsageForUsers(List<String> requestersToDecreaseCount) throws IOException {
for (String requester : requestersToDecreaseCount) {
decrementJobCount(requester, CountType.REQUESTER_COUNT);
}
}
abstract boolean containsDagId(String dagId) throws IOException;

private int getQuotaForUser(String user) {
int getQuotaForUser(String user) {
return this.perUserQuota.getOrDefault(user, defaultQuota);
}

private int getQuotaForFlowGroup(String flowGroup) {
int getQuotaForFlowGroup(String flowGroup) {
return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
Expand Down Expand Up @@ -963,7 +964,11 @@ private void submitJob(DagNode<JobExecutionPlan> dagNode) {
// Run this spec on selected executor
SpecProducer<Spec> producer;
try {
quotaManager.checkQuota(dagNode);
if (!Boolean.parseBoolean(dagNode.getValue().getJobSpec().getConfigAsProperties().getProperty(
ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "false"))) {
quotaManager.checkQuota(Collections.singleton(dagNode));
}

producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
Expand Down
Loading