diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java index 55b8282fe29..d9c251e6965 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java @@ -32,7 +32,9 @@ public class ServiceConfigKeys { public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled"; public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled"; public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled"; - public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled"; + + public static final String GOBBLIN_SERVICE_ADHOC_FLOW = GOBBLIN_SERVICE_PREFIX + "adhoc.flow"; + public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled"; public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled"; public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled"; @@ -146,6 +148,13 @@ public class ServiceConfigKeys { public static final String QUOTA_MANAGER_CLASS = GOBBLIN_SERVICE_PREFIX + "quotaManager.class"; public static final String DEFAULT_QUOTA_MANAGER = "org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager"; + public static final String QUOTA_STORE_DB_TABLE_KEY = "quota.store.db.table"; + public static final String DEFAULT_QUOTA_STORE_DB_TABLE = "quota_table"; + + public static final String RUNNING_DAG_IDS_DB_TABLE_KEY = "running.dag.ids.store.db.table"; + public static final String DEFAULT_RUNNING_DAG_IDS_DB_TABLE = "running_dag_ids"; + + // Group Membership authentication service public static final String GROUP_OWNERSHIP_SERVICE_CLASS = GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class"; public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = "org.apache.gobblin.service.NoopGroupOwnershipService"; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java index b2369588f4b..3e3a782018f 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java @@ -36,7 +36,6 @@ import com.google.common.io.ByteStreams; import com.typesafe.config.Config; -import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.broker.SharedResourcesBrokerFactory; @@ -133,7 +132,7 @@ public Spec extractSpec(ResultSet rs) throws SQLException, IOException { } - protected final DataSource dataSource; + protected final BasicDataSource dataSource; protected final String tableName; private final URI specStoreURI; protected final SpecSerDe specSerDe; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index f910e9e4ec7..976d12650ea 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -24,7 +24,9 @@ import java.util.Map; import java.util.Properties; +import javax.inject.Inject; import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +92,11 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { @Setter protected boolean active; + private boolean warmStandbyEnabled; + + @Inject + UserQuotaManager userQuotaManager; + public BaseFlowToJobSpecCompiler(Config config){ this(config,true); } @@ -119,6 +126,8 @@ public BaseFlowToJobSpecCompiler(Config config, Optional log, boolean in this.dataAuthorizationTimer = Optional.absent(); } + this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false); + this.topologySpecMap = Maps.newConcurrentMap(); this.config = config; @@ -181,6 +190,17 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) { // always try to compile the flow to verify if it is compilable Dag dag = this.compileFlow(flowSpec); + + if (this.warmStandbyEnabled && + (!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) { + try { + userQuotaManager.checkQuota(dag.getStartNodes()); + flowSpec.getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true"); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + // If dag is null then a compilation error has occurred if (dag != null && !dag.isEmpty()) { response = dag.toString(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index 3e6eb13dc9d..dc3d0bc4791 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -281,6 +281,12 @@ public Dag compileFlow(Spec spec) { Instrumented.markMeter(flowCompilationSuccessFulMeter); Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + if (Boolean.parseBoolean(flowSpec.getConfigAsProperties().getProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW))) { + for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) { + dagNode.getValue().getJobSpec().getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true"); + } + } + return jobExecutionPlanDag; } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java index fad079bf0da..b92d6d610a7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java @@ -18,10 +18,7 @@ package org.apache.gobblin.service.modules.orchestration; import java.io.IOException; -import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; @@ -30,10 +27,6 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.exception.QuotaExceededException; -import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.util.ConfigUtils; @@ -49,8 +42,7 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager { public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE; private final Map perUserQuota; private final Map perFlowGroupQuota; - // TODO : we might want to make this field implementation specific to be able to decide if the dag is already running or have been accepted - Set runningDagIds = ConcurrentHashMap.newKeySet(); + private final int defaultQuota; public AbstractUserQuotaManager(Config config) { @@ -69,155 +61,13 @@ public AbstractUserQuotaManager(Config config) { this.perFlowGroupQuota = flowGroupMapBuilder.build(); } - // Implementations should return the current count and increase them by one - abstract int incrementJobCount(String key, CountType countType) throws IOException; - - abstract void decrementJobCount(String key, CountType countType) throws IOException; - - public void checkQuota(Dag.DagNode dagNode) throws IOException { - QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode); - - // Throw errors for reach quota at the end to avoid inconsistent job counts - if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) { - // roll back the increased counts in this block - rollbackIncrements(dagNode); - throw new QuotaExceededException(quotaCheck.requesterMessage); - } - } - - private void rollbackIncrements(Dag.DagNode dagNode) throws IOException { - String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); - String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, ""); - List usersQuotaIncrement = DagManagerUtils.getDistinctUniqueRequesters(DagManagerUtils.getSerializedRequesterList(dagNode)); - - decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), CountType.USER_COUNT); - decrementQuotaUsageForUsers(usersQuotaIncrement); - decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT); - runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString()); - } - - protected QuotaCheck increaseAndCheckQuota(Dag.DagNode dagNode) throws IOException { - QuotaCheck quotaCheck = new QuotaCheck(true, true, true, ""); - // Dag is already being tracked, no need to double increment for retries and multihop flows - if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode).toString())) { - return quotaCheck; - } else { - runningDagIds.add(DagManagerUtils.generateDagId(dagNode).toString()); - } - - String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); - String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), - ConfigurationKeys.FLOW_GROUP_KEY, ""); - String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode); - StringBuilder requesterMessage = new StringBuilder(); - - boolean proxyUserCheck; - - if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) { - int proxyQuotaIncrement = incrementJobCountAndCheckQuota( - DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT); - proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds - quotaCheck.setProxyUserCheck(proxyUserCheck); - if (!proxyUserCheck) { - // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment - requesterMessage.append(String.format( - "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n", - proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser))); - } - } - - String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); - boolean requesterCheck = true; - - if (dagNode.getValue().getCurrentAttempts() <= 1) { - List uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters); - for (String requester : uniqueRequesters) { - int userQuotaIncrement = incrementJobCountAndCheckQuota( - DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT); - boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds - requesterCheck = requesterCheck && thisRequesterCheck; - quotaCheck.setRequesterCheck(requesterCheck); - if (!thisRequesterCheck) { - requesterMessage.append(String.format( - "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ", - requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester))); - } - } - } - - boolean flowGroupCheck; - - if (dagNode.getValue().getCurrentAttempts() <= 1) { - int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota( - DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT); - flowGroupCheck = flowGroupQuotaIncrement >= 0; - quotaCheck.setFlowGroupCheck(flowGroupCheck); - if (!flowGroupCheck) { - requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n", - flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), - Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup))); - } - } - - quotaCheck.setRequesterMessage(requesterMessage.toString()); - - return quotaCheck; - } - - /** - * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}. - * Returns true if the dag existed in the set of running dags and was removed successfully - */ - public boolean releaseQuota(Dag.DagNode dagNode) throws IOException { - boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString()); - if (!val) { - return false; - } - - String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); - if (proxyUser != null) { - String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode); - decrementJobCount(proxyUserKey, CountType.USER_COUNT); - } - - String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), - ConfigurationKeys.FLOW_GROUP_KEY, ""); - decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT); - - String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); - try { - for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) { - String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode); - decrementJobCount(requesterKey, CountType.REQUESTER_COUNT); - } - } catch (IOException e) { - log.error("Failed to release quota for requester list " + serializedRequesters, e); - return false; - } - - return true; - } - - private int incrementJobCountAndCheckQuota(String key, int keyQuota, CountType countType) throws IOException { - int currentCount = incrementJobCount(key, countType); - if (currentCount >= keyQuota) { - return -currentCount; - } else { - return currentCount; - } - } - - private void decrementQuotaUsageForUsers(List requestersToDecreaseCount) throws IOException { - for (String requester : requestersToDecreaseCount) { - decrementJobCount(requester, CountType.REQUESTER_COUNT); - } - } + abstract boolean containsDagId(String dagId) throws IOException; - private int getQuotaForUser(String user) { + int getQuotaForUser(String user) { return this.perUserQuota.getOrDefault(user, defaultQuota); } - private int getQuotaForFlowGroup(String flowGroup) { + int getQuotaForFlowGroup(String flowGroup) { return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index fcb2ff4c1bd..321f93b9ac0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -73,6 +73,7 @@ import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; @@ -963,7 +964,11 @@ private void submitJob(DagNode dagNode) { // Run this spec on selected executor SpecProducer producer; try { - quotaManager.checkQuota(dagNode); + if (!Boolean.parseBoolean(dagNode.getValue().getJobSpec().getConfigAsProperties().getProperty( + ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "false"))) { + quotaManager.checkQuota(Collections.singleton(dagNode)); + } + producer = DagManagerUtils.getSpecProducer(dagNode); TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get(). getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java index 438e86d204a..f4c9ab0238e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java @@ -21,13 +21,18 @@ import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; import static org.apache.gobblin.service.ExecutionStatus.RUNNING; @@ -42,9 +47,153 @@ public class InMemoryUserQuotaManager extends AbstractUserQuotaManager { private final Map flowGroupToJobCount = new ConcurrentHashMap<>(); private final Map requesterToJobCount = new ConcurrentHashMap<>(); + private final Set runningDagIds; + @Inject public InMemoryUserQuotaManager(Config config) { super(config); + this.runningDagIds = ConcurrentHashMap.newKeySet();; + } + + protected QuotaCheck increaseAndCheckQuota(Dag.DagNode dagNode) throws IOException { + QuotaCheck quotaCheck = new QuotaCheck(true, true, true, ""); + // Dag is already being tracked, no need to double increment for retries and multihop flows + if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) { + return quotaCheck; + } else { + addDagId(DagManagerUtils.generateDagId(dagNode).toString()); + } + + String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); + String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), + ConfigurationKeys.FLOW_GROUP_KEY, ""); + String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode); + StringBuilder requesterMessage = new StringBuilder(); + + boolean proxyUserCheck; + + if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) { + int proxyQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT); + proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds + quotaCheck.setProxyUserCheck(proxyUserCheck); + if (!proxyUserCheck) { + // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment + requesterMessage.append(String.format( + "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n", + proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser))); + } + } + + String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); + boolean requesterCheck = true; + + if (dagNode.getValue().getCurrentAttempts() <= 1) { + List uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters); + for (String requester : uniqueRequesters) { + int userQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT); + boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds + requesterCheck = requesterCheck && thisRequesterCheck; + quotaCheck.setRequesterCheck(requesterCheck); + if (!thisRequesterCheck) { + requesterMessage.append(String.format( + "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ", + requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester))); + } + } + } + + boolean flowGroupCheck; + + if (dagNode.getValue().getCurrentAttempts() <= 1) { + int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota( + DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT); + flowGroupCheck = flowGroupQuotaIncrement >= 0; + quotaCheck.setFlowGroupCheck(flowGroupCheck); + if (!flowGroupCheck) { + requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n", + flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), + Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup))); + } + } + + quotaCheck.setRequesterMessage(requesterMessage.toString()); + + return quotaCheck; + } + + protected void rollbackIncrements(Dag.DagNode dagNode) throws IOException { + String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); + String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, ""); + List usersQuotaIncrement = DagManagerUtils.getDistinctUniqueRequesters(DagManagerUtils.getSerializedRequesterList(dagNode)); + + decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), CountType.USER_COUNT); + decrementQuotaUsageForUsers(usersQuotaIncrement); + decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT); + removeDagId(DagManagerUtils.generateDagId(dagNode).toString()); + } + + private int incrementJobCountAndCheckQuota(String key, int keyQuota, CountType countType) throws IOException { + int currentCount = incrementJobCount(key, countType); + if (currentCount >= keyQuota) { + return -currentCount; + } else { + return currentCount; + } + } + + private void decrementQuotaUsageForUsers(List requestersToDecreaseCount) throws IOException { + for (String requester : requestersToDecreaseCount) { + decrementJobCount(requester, CountType.REQUESTER_COUNT); + } + } + + /** + * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}. + * Returns true if the dag existed in the set of running dags and was removed successfully + */ + public boolean releaseQuota(Dag.DagNode dagNode) throws IOException { + boolean val = removeDagId(DagManagerUtils.generateDagId(dagNode).toString()); + if (!val) { + return false; + } + + String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); + if (proxyUser != null) { + String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode); + decrementJobCount(proxyUserKey, CountType.USER_COUNT); + } + + String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), + ConfigurationKeys.FLOW_GROUP_KEY, ""); + decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT); + + String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); + try { + for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) { + String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode); + decrementJobCount(requesterKey, CountType.REQUESTER_COUNT); + } + } catch (IOException e) { + log.error("Failed to release quota for requester list " + serializedRequesters, e); + return false; + } + + return true; + } + + void addDagId(String dagId) { + this.runningDagIds.add(dagId); + } + + @Override + boolean containsDagId(String dagId) { + return this.runningDagIds.contains(dagId); + } + + boolean removeDagId(String dagId) { + return this.runningDagIds.remove(dagId); } public void init(Collection> dags) throws IOException { @@ -58,6 +207,17 @@ public void init(Collection> dags) throws IOException { } } + public void checkQuota(Collection> dagNodes) throws IOException { + for (Dag.DagNode dagNode : dagNodes) { + QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode); + if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) { + // roll back the increased counts in this block + rollbackIncrements(dagNode); + throw new QuotaExceededException(quotaCheck.requesterMessage); + } + } + } + private int incrementJobCount(String key, Map quotaMap) { Integer currentCount; // Modifications must be thread safe since DAGs on DagManagerThreads may update the quota for the same user @@ -86,7 +246,6 @@ private void decrementJobCount(String key, Map quotaMap) { } } - @Override int incrementJobCount(String user, CountType countType) throws IOException { switch (countType) { case USER_COUNT: @@ -100,7 +259,6 @@ int incrementJobCount(String user, CountType countType) throws IOException { } } - @Override void decrementJobCount(String user, CountType countType) throws IOException { switch (countType) { case USER_COUNT: diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java index 78f15cfba5d..1cf49c83fc6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java @@ -24,6 +24,7 @@ import java.sql.SQLException; import java.util.Collection; +import java.util.List; import org.apache.commons.dbcp.BasicDataSource; import com.google.common.annotations.VisibleForTesting; @@ -35,7 +36,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.util.ConfigUtils; @@ -47,12 +50,28 @@ @Slf4j @Singleton public class MysqlUserQuotaManager extends AbstractUserQuotaManager { - private final MysqlQuotaStore mysqlStore; + public final MysqlQuotaStore quotaStore; + public final RunningDagIdsStore runningDagIds; + @Inject public MysqlUserQuotaManager(Config config) throws IOException { super(config); - this.mysqlStore = createQuotaStore(config); + this.quotaStore = createQuotaStore(config); + this.runningDagIds = createRunningDagStore(config); + } + + void addDagId(Connection connection, String dagId) throws IOException { + this.runningDagIds.add(connection, dagId); + } + + @Override + boolean containsDagId(String dagId) throws IOException { + return this.runningDagIds.contains(dagId); + } + + boolean removeDagId(Connection connection, String dagId) throws IOException { + return this.runningDagIds.remove(connection, dagId); } // This implementation does not need to update quota usage when the service restarts or it's leadership status changes @@ -60,42 +79,191 @@ public void init(Collection> dags) { } @Override - int incrementJobCount(String user, CountType countType) throws IOException { - try { - return this.mysqlStore.increaseCount(user, countType); + public void checkQuota(Collection> dagNodes) throws IOException { + try (Connection connection = this.quotaStore.dataSource.getConnection()) { + connection.setAutoCommit(false); + + for (Dag.DagNode dagNode : dagNodes) { + QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode); + if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) { + connection.rollback(); + throw new QuotaExceededException(quotaCheck.requesterMessage); + } + } + connection.commit(); } catch (SQLException e) { throw new IOException(e); } } - @Override - void decrementJobCount(String user, CountType countType) throws IOException { + int incrementJobCount(Connection connection, String user, CountType countType) throws IOException, SQLException { + return this.quotaStore.increaseCount(connection, user, countType); + } + + void decrementJobCount(Connection connection,String user, CountType countType) throws IOException, SQLException { + this.quotaStore.decreaseCount(connection, user, countType); + } + + protected QuotaCheck increaseAndCheckQuota(Connection connection, Dag.DagNode dagNode) + throws SQLException, IOException { + QuotaCheck quotaCheck = new QuotaCheck(true, true, true, ""); + StringBuilder requesterMessage = new StringBuilder(); + + // Dag is already being tracked, no need to double increment for retries and multihop flows + if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) { + return quotaCheck; + } else { + addDagId(connection, DagManagerUtils.generateDagId(dagNode).toString()); + } + + String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); + String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), + ConfigurationKeys.FLOW_GROUP_KEY, ""); + String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode); + + boolean proxyUserCheck; + + if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) { + int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection, + DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT); + proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds + quotaCheck.setProxyUserCheck(proxyUserCheck); + if (!proxyUserCheck) { + // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment + requesterMessage.append(String.format( + "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n", + proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser))); + } + } + + String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); + boolean requesterCheck = true; + + if (dagNode.getValue().getCurrentAttempts() <= 1) { + List uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters); + for (String requester : uniqueRequesters) { + int userQuotaIncrement = incrementJobCountAndCheckQuota(connection, DagManagerUtils.getUserQuotaKey(requester, dagNode), + getQuotaForUser(requester), CountType.REQUESTER_COUNT); + boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds + requesterCheck = requesterCheck && thisRequesterCheck; + quotaCheck.setRequesterCheck(requesterCheck); + if (!thisRequesterCheck) { + requesterMessage.append(String.format("Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ", + requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester))); + } + } + } + + boolean flowGroupCheck; + + if (dagNode.getValue().getCurrentAttempts() <= 1) { + int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection, + DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT); + flowGroupCheck = flowGroupQuotaIncrement >= 0; + quotaCheck.setFlowGroupCheck(flowGroupCheck); + if (!flowGroupCheck) { + requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n", + flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup), + Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup))); + } + } + + quotaCheck.setRequesterMessage(requesterMessage.toString()); + + return quotaCheck; + } + + protected int incrementJobCountAndCheckQuota(Connection connection, String key, int keyQuota, CountType countType) + throws IOException, SQLException { + int currentCount = incrementJobCount(connection, key, countType); + if (currentCount >= keyQuota) { + return -currentCount; + } else { + return currentCount; + } + } + + /** + * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}. + * Returns true if the dag existed in the set of running dags and was removed successfully + */ + public boolean releaseQuota(Dag.DagNode dagNode) throws IOException { + Connection connection; try { - this.mysqlStore.decreaseCount(user, countType); + connection = this.quotaStore.dataSource.getConnection(); + connection.setAutoCommit(false); } catch (SQLException e) { throw new IOException(e); } + + try { + boolean val = removeDagId(connection, DagManagerUtils.generateDagId(dagNode).toString()); + if (!val) { + return false; + } + + String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null); + if (proxyUser != null) { + String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode); + decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT); + } + + String flowGroup = + ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, ""); + decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT); + + String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); + try { + for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) { + String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode); + decrementJobCount(connection, requesterKey, CountType.REQUESTER_COUNT); + } + } catch (IOException e) { + log.error("Failed to release quota for requester list " + serializedRequesters, e); + return false; + } + connection.commit(); + } catch (SQLException ex) { + throw new IOException(ex); + } finally { + try { + connection.close(); + } catch (SQLException ex) { + throw new IOException(ex); + } + } + + return true; } @VisibleForTesting int getCount(String name, CountType countType) throws IOException { - return this.mysqlStore.getCount(name, countType); + return this.quotaStore.getCount(name, countType); } /** - * Creating an instance of StateStore. + * Creating an instance of MysqlQuotaStore. */ protected MysqlQuotaStore createQuotaStore(Config config) throws IOException { - String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, - ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE); + String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY, + ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE); + + BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config); + + return new MysqlQuotaStore(basicDataSource, quotaStoreTableName); + } + + protected RunningDagIdsStore createRunningDagStore(Config config) throws IOException { + String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.RUNNING_DAG_IDS_DB_TABLE_KEY, + ServiceConfigKeys.DEFAULT_RUNNING_DAG_IDS_DB_TABLE); BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config); - return new MysqlQuotaStore(basicDataSource, stateStoreTableName); + return new RunningDagIdsStore(basicDataSource, quotaStoreTableName); } static class MysqlQuotaStore { - protected final DataSource dataSource; + protected final BasicDataSource dataSource; final String tableName; private final String GET_USER_COUNT; private final String GET_REQUESTER_COUNT; @@ -133,7 +301,9 @@ public MysqlQuotaStore(BasicDataSource dataSource, String tableName) try (Connection connection = dataSource.getConnection(); PreparedStatement createStatement = connection.prepareStatement(createQuotaTable)) { createStatement.executeUpdate(); } catch (SQLException e) { - throw new IOException("Failure creation table " + tableName, e); + log.warn("Failure in creating table {}. Validation query is set to {} Exception is {}", + tableName, this.dataSource.getValidationQuery(), e); + throw new IOException(e); } } @@ -158,13 +328,11 @@ int getCount(String name, CountType countType) throws IOException { } } - public int increaseCount(String name, CountType countType) throws IOException, SQLException { - Connection connection = dataSource.getConnection(); - connection.setAutoCommit(false); - + public int increaseCount(Connection connection, String name, CountType countType) throws IOException, SQLException { String selectStatement; String increaseStatement; + switch(countType) { case USER_COUNT: selectStatement = GET_USER_COUNT; @@ -189,27 +357,19 @@ public int increaseCount(String name, CountType countType) throws IOException, S statement2.setString(1, name); rs = statement1.executeQuery(); statement2.executeUpdate(); - connection.commit(); if (rs != null && rs.next()) { return rs.getInt(1); } else { return 0; } - } catch (SQLException e) { - connection.rollback(); - throw new IOException("Failure increasing count for user/flowGroup " + name, e); } finally { if (rs != null) { rs.close(); } - connection.close(); } } - public void decreaseCount(String name, CountType countType) throws IOException, SQLException { - Connection connection = dataSource.getConnection(); - connection.setAutoCommit(false); - + public void decreaseCount(Connection connection, String name, CountType countType) throws IOException, SQLException { String selectStatement; String decreaseStatement; @@ -241,18 +401,75 @@ public void decreaseCount(String name, CountType countType) throws IOException, rs = statement1.executeQuery(); statement2.executeUpdate(); statement3.executeUpdate(); - connection.commit(); if (rs != null && rs.next() && rs.getInt(1) == 0) { log.warn("Decrement job count was called for " + name + " when the count was already zero/absent."); } - } catch (SQLException e) { - connection.rollback(); - throw new IOException("Failure decreasing count from user/flowGroup " + name, e); } finally { if (rs != null) { rs.close(); } - connection.close(); + } + } + } + + static class RunningDagIdsStore { + protected final DataSource dataSource; + final String tableName; + private final String CONTAINS_DAG_ID; + private final String ADD_DAG_ID; + private final String REMOVE_DAG_ID; + + public RunningDagIdsStore(BasicDataSource dataSource, String tableName) + throws IOException { + this.dataSource = dataSource; + this.tableName = tableName; + + CONTAINS_DAG_ID = "SELECT EXISTS(SELECT * FROM " + tableName + " WHERE dagId = ?)" ; + ADD_DAG_ID = "INSERT INTO " + tableName + " (dagId) VALUES (?) "; + REMOVE_DAG_ID = "DELETE FROM " + tableName + " WHERE dagId = ?"; + + String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + " (dagId VARCHAR(500) CHARACTER SET latin1 NOT NULL, " + + "PRIMARY KEY (dagId), UNIQUE INDEX ind (dagId))"; + try (Connection connection = dataSource.getConnection(); PreparedStatement createStatement = connection.prepareStatement(createQuotaTable)) { + createStatement.executeUpdate(); + } catch (SQLException e) { + throw new IOException("Failure creation table " + tableName, e); + } + } + + /** + * returns true if the DagID is already present in the running dag store + */ + @VisibleForTesting + boolean contains(String dagId) throws IOException { + try (Connection connection = dataSource.getConnection(); + PreparedStatement queryStatement = connection.prepareStatement(CONTAINS_DAG_ID)) { + queryStatement.setString(1, dagId); + try (ResultSet rs = queryStatement.executeQuery()) { + rs.next(); + return rs.getBoolean(1); + } + } catch (Exception e) { + throw new IOException("Could not find if the dag " + dagId + " is already running.", e); + } + } + + public void add(Connection connection, String dagId) throws IOException { + try (PreparedStatement statement = connection.prepareStatement(ADD_DAG_ID)) { + statement.setString(1, dagId); + statement.executeUpdate(); + } catch (SQLException e) { + throw new IOException("Failure adding dag " + dagId, e); + } + } + + public boolean remove(Connection connection, String dagId) throws IOException { + try (PreparedStatement statement = connection.prepareStatement(REMOVE_DAG_ID)) { + statement.setString(1, dagId); + int count = statement.executeUpdate(); + return count == 1; + } catch (SQLException e) { + throw new IOException("Could not remove dag " + dagId, e); } } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java index 752956d11d6..dff5e0a793e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java @@ -42,7 +42,7 @@ public interface UserQuotaManager { * It also increases the quota usage for proxy user, requester and the flowGroup of the given DagNode by one. * @throws QuotaExceededException if the quota is exceeded */ - void checkQuota(Dag.DagNode dagNode) throws IOException; + void checkQuota(Collection> dagNode) throws IOException; /** * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}. diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 863144a6f6f..c997f060359 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -250,9 +250,8 @@ protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec spec) { Properties properties = spec.getConfigAsProperties(); properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"); Config config = ConfigFactory.parseProperties(properties); - FlowSpec flowSpec = new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties, + return new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties, spec.getTemplateURIs(), spec.getChildSpecs()); - return flowSpec; } @Override @@ -332,10 +331,12 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { // Check quota limits against run immediately flows or adhoc flows before saving the schedule // In warm standby mode, this quota check will happen on restli API layer when we accept the flow if (!this.warmStandbyEnabled && (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) { + // This block should be reachable only for the first execution for the adhoc flows (flows that either do not have a schedule or have runImmediately=true. if (quotaManager.isPresent()) { // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager try { - quotaManager.get().checkQuota(dag.getNodes().get(0)); + quotaManager.get().checkQuota(dag.getStartNodes()); + ((FlowSpec) addedSpec).getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true"); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java index 5e63a0f007a..2a0285aa946 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java @@ -20,6 +20,7 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; import java.io.IOException; +import java.util.Collections; import java.util.List; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.service.modules.flowgraph.Dag; @@ -61,9 +62,9 @@ public void testExceedsUserQuotaThrowsException() throws Exception { dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1); dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1); - this._quotaManager.checkQuota(dags.get(0).getNodes().get(0)); + this._quotaManager.checkQuota(Collections.singleton(dags.get(0).getNodes().get(0))); Assert.assertThrows(IOException.class, () -> { - this._quotaManager.checkQuota(dags.get(1).getNodes().get(0)); + this._quotaManager.checkQuota(Collections.singleton(dags.get(1).getNodes().get(0))); }); } @@ -76,7 +77,7 @@ public void testMultipleRemoveQuotasIdempotent() throws Exception { dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1); dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1); - this._quotaManager.checkQuota(dags.get(0).getNodes().get(0)); + this._quotaManager.checkQuota(Collections.singleton(dags.get(0).getNodes().get(0))); Assert.assertTrue(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0))); Assert.assertFalse(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0))); } @@ -91,9 +92,9 @@ public void testExceedsFlowGroupQuotaThrowsException() throws Exception { dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1); dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1); - this._quotaManager.checkQuota(dags.get(0).getNodes().get(0)); + this._quotaManager.checkQuota(Collections.singleton(dags.get(0).getNodes().get(0))); Assert.assertThrows(IOException.class, () -> { - this._quotaManager.checkQuota(dags.get(1).getNodes().get(0)); + this._quotaManager.checkQuota(Collections.singleton(dags.get(1).getNodes().get(0))); }); } @@ -115,20 +116,20 @@ public void testUserAndFlowGroupQuotaMultipleUsersAdd() throws Exception { dag3.getNodes().get(0).getValue().setCurrentAttempts(1); dag4.getNodes().get(0).getValue().setCurrentAttempts(1); - this._quotaManager.checkQuota(dag1.getNodes().get(0)); - this._quotaManager.checkQuota(dag2.getNodes().get(0)); + this._quotaManager.checkQuota(Collections.singleton(dag1.getNodes().get(0))); + this._quotaManager.checkQuota(Collections.singleton(dag2.getNodes().get(0))); // Should fail due to user quota Assert.assertThrows(IOException.class, () -> { - this._quotaManager.checkQuota(dag3.getNodes().get(0)); + this._quotaManager.checkQuota(Collections.singleton(dag3.getNodes().get(0))); }); // Should fail due to flowgroup quota Assert.assertThrows(IOException.class, () -> { - this._quotaManager.checkQuota(dag4.getNodes().get(0)); + this._quotaManager.checkQuota(Collections.singleton(dag4.getNodes().get(0))); }); // should pass due to quota being released this._quotaManager.releaseQuota(dag2.getNodes().get(0)); - this._quotaManager.checkQuota(dag3.getNodes().get(0)); - this._quotaManager.checkQuota(dag4.getNodes().get(0)); + this._quotaManager.checkQuota(Collections.singleton(dag3.getNodes().get(0))); + this._quotaManager.checkQuota(Collections.singleton(dag4.getNodes().get(0))); } } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java index 4bdd4559741..a8931738d48 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java @@ -19,6 +19,8 @@ import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -36,6 +38,7 @@ public class MysqlUserQuotaManagerTest { private static final String TABLE = "quotas"; private static final String PROXY_USER = "abora"; private MysqlUserQuotaManager quotaManager; + public static int INCREMENTS = 1000; @BeforeClass public void setUp() throws Exception { @@ -52,35 +55,63 @@ public void setUp() throws Exception { } @Test + public void testRunningDagStore() throws Exception { + String dagId = DagManagerUtils.generateDagId(DagManagerTest.buildDag("dagId", 1234L, "", 1).getNodes().get(0)).toString(); + Connection connection = this.quotaManager.quotaStore.dataSource.getConnection(); + Assert.assertFalse(this.quotaManager.containsDagId(dagId)); + this.quotaManager.addDagId(connection, dagId); + connection.commit(); + Assert.assertTrue(this.quotaManager.containsDagId(dagId)); + Assert.assertTrue(this.quotaManager.removeDagId(connection, dagId)); + connection.commit(); + Assert.assertFalse(this.quotaManager.containsDagId(dagId)); + Assert.assertFalse(this.quotaManager.removeDagId(connection, dagId)); + connection.commit(); + connection.close(); + } + + @Test public void testIncreaseCount() throws Exception { - int prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + Connection connection = this.quotaManager.quotaStore.dataSource.getConnection(); + int prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + connection.commit(); Assert.assertEquals(prevCount, 0); - prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + connection.commit(); Assert.assertEquals(prevCount, 1); Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 2); - prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + connection.commit(); Assert.assertEquals(prevCount, 0); - prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + connection.commit(); Assert.assertEquals(prevCount, 1); + connection.close(); } @Test(dependsOnMethods = "testIncreaseCount") public void testDecreaseCount() throws Exception { - this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + Connection connection = this.quotaManager.quotaStore.dataSource.getConnection(); + this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + connection.commit(); Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 1); - this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + connection.commit(); Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 0); - this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + connection.commit(); Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 0); - this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + connection.commit(); Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), 1); - this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT); + connection.commit(); // on count reduced to zero, the row should get deleted and the get call should return -1 instead of 0. Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), -1); } @@ -95,14 +126,15 @@ public ChangeCountRunnable(boolean increaseOrDecrease) { @Override public void run() { int i = 0; - while (i++ < 1000) { - try { + while (i++ < INCREMENTS) { + try (Connection connection = MysqlUserQuotaManagerTest.this.quotaManager.quotaStore.dataSource.getConnection();) { if (increaseOrDecrease) { - MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); } else { - MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); + MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT); } - } catch (IOException e) { + connection.commit(); + } catch (IOException | SQLException e) { Assert.fail("Thread got an exception.", e); } } @@ -111,14 +143,13 @@ public void run() { @Test(dependsOnMethods = "testDecreaseCount") public void testConcurrentChanges() throws IOException, InterruptedException { - Runnable increaseCountRunnable = new ChangeCountRunnable(true); - Runnable decreaseCountRunnable = new ChangeCountRunnable(false); - Thread thread1 = new Thread(increaseCountRunnable); - Thread thread2 = new Thread(increaseCountRunnable); - Thread thread3 = new Thread(increaseCountRunnable); - Thread thread4 = new Thread(decreaseCountRunnable); - Thread thread5 = new Thread(decreaseCountRunnable); - Thread thread6 = new Thread(decreaseCountRunnable); + int numOfThreads = 3; + Thread thread1 = new Thread(new ChangeCountRunnable(true)); + Thread thread2 = new Thread(new ChangeCountRunnable(true)); + Thread thread3 = new Thread(new ChangeCountRunnable(true)); + Thread thread4 = new Thread(new ChangeCountRunnable(false)); + Thread thread5 = new Thread(new ChangeCountRunnable(false)); + Thread thread6 = new Thread(new ChangeCountRunnable(false)); thread1.start(); thread2.start(); @@ -126,7 +157,8 @@ public void testConcurrentChanges() throws IOException, InterruptedException { thread1.join(); thread2.join(); thread3.join(); - Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 3000); + Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), + INCREMENTS * 3); thread4.start(); thread5.start(); thread6.start();