Skip to content

Commit

Permalink
[GOBBLIN-1773] Fix bugs in quota manager (#3636)
Browse files Browse the repository at this point in the history
* address comments

* use connectionmanager when httpclient is not cloesable

* [GOBBLIN-1773] Fix bug in quota manager of gobblinservice where we increase quota twice for run-immediately flow

* change typo

* fix unit test

* fix the dead lock issue and use shared data source

* address comments

---------

Co-authored-by: Zihan Li <[email protected]>
  • Loading branch information
ZihanLi58 and Zihan Li authored Feb 9, 2023
1 parent 13faea4 commit 8cb523d
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ protected String getCreateJobStateTableTemplate() {
* @param config the properties used for datasource instantiation
* @return
*/
public static DataSource newDataSource(Config config) {
static DataSource newDataSource(Config config) {
HikariDataSource dataSource = new HikariDataSource();
PasswordManager passwordManager = PasswordManager.getInstance(ConfigUtils.configToProperties(config));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@

import javax.sql.DataSource;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
Expand Down Expand Up @@ -67,7 +68,8 @@ public MysqlDagActionStore(Config config) throws IOException {
this.tableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);

this.dataSource = MysqlStateStore.newDataSource(config);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());;
try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
createStatement.executeUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,20 @@ public static FlowSpec initFlowSpec(String specStore, URI uri){
* Create FLowSpec with specified URI and SpecStore location.
*/
public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName){
return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty());
return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty(), false);
}

public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs) {
public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs, boolean isAdhoc) {
Properties properties = new Properties();
properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
properties.put("job.name", flowName);
properties.put("job.group", flowGroup);
properties.put("specStore.fs.dir", specStore);
properties.put("specExecInstance.capabilities", "source:destination");
properties.put("job.schedule", "0 0 0 ? * * 2050");
if (!isAdhoc) {
properties.put("job.schedule", "0 0 0 ? * * 2050");
}
Config defaults = ConfigUtils.propertiesToConfig(properties);
Config config = additionalConfigs.withFallback(defaults);
SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
}

if (FlowCatalog.isCompileSuccessful(response) && this.userQuotaManager.isPresent() && !flowSpec.isExplain() &&
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
try {
// We only check quota for adhoc flow, since we don't have the execution id for run-immediately flow
userQuotaManager.get().checkQuota(dag.getStartNodes());
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
Expand All @@ -63,6 +64,7 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
log.info("Going to initialize mysqlUserQuotaManager");
Config quotaStoreConfig;
if (config.hasPath(CONFIG_PREFIX)) {
quotaStoreConfig = config.getConfig(CONFIG_PREFIX).withFallback(config);
Expand Down Expand Up @@ -225,10 +227,6 @@ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOExce
decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT);
}

String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);

String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
try {
for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
Expand All @@ -239,6 +237,10 @@ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOExce
log.error("Failed to release quota for requester list " + serializedRequesters, e);
return false;
}

String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
connection.commit();
} catch (SQLException ex) {
throw new IOException(ex);
Expand All @@ -265,7 +267,8 @@ protected MysqlQuotaStore createQuotaStore(Config config) throws IOException {
String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY,
ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE);

DataSource dataSource = MysqlStateStore.newDataSource(config);
DataSource dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());

return new MysqlQuotaStore(dataSource, quotaStoreTableName);
}
Expand All @@ -274,7 +277,8 @@ protected RunningDagIdsStore createRunningDagStore(Config config) throws IOExcep
String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.RUNNING_DAG_IDS_DB_TABLE_KEY,
ServiceConfigKeys.DEFAULT_RUNNING_DAG_IDS_DB_TABLE);

DataSource dataSource = MysqlStateStore.newDataSource(config);
DataSource dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());;

return new RunningDagIdsStore(dataSource, quotaStoreTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,6 +102,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
@Setter
private FlowStatusGenerator flowStatusGenerator;

private UserQuotaManager quotaManager;


private final ClassAliasResolver<SpecCompiler> aliasResolver;

private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
Expand Down Expand Up @@ -150,6 +154,8 @@ public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Op
}
this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
}

@Inject
Expand Down Expand Up @@ -247,6 +253,13 @@ public void orchestrate(Spec spec) throws Exception {
+ "concurrent executions are disabled for this flow.", flowGroup, flowName);
conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
Instrumented.markMeter(this.skippedFlowsMeter);
if (!((FlowSpec)spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
// For ad-hoc flow, we might already increase quota, we need to decrease here
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
for(Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
quotaManager.releaseQuota(dagNode);
}
}

// Send FLOW_FAILED event
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,11 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(response);
}

// Check quota limits against run immediately flows or adhoc flows before saving the schedule
// Check quota limits against adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer when we accept the flow
if (!this.warmStandbyEnabled && (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
// This block should be reachable only for the first execution for the adhoc flows (flows that either do not have a schedule or have runImmediately=true.
if (!this.warmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
// This block should be reachable only for the execution for the adhoc flows
// For flow that has scheduler but run-immediately set to be true, we won't check the quota as we will use a different execution id later
if (quotaManager.isPresent()) {
// QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
serviceLauncher.start();

FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), "flowName0", "group1",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true")));
ConfigFactory.empty(), true);
FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), "flowName1", "group1",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true")));
ConfigFactory.empty(), true);

Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);
Expand Down

0 comments on commit 8cb523d

Please sign in to comment.