Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1773] Fix bugs in quota manager #3636

Merged
merged 7 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ protected String getCreateJobStateTableTemplate() {
* @param config the properties used for datasource instantiation
* @return
*/
public static DataSource newDataSource(Config config) {
static DataSource newDataSource(Config config) {
HikariDataSource dataSource = new HikariDataSource();
PasswordManager passwordManager = PasswordManager.getInstance(ConfigUtils.configToProperties(config));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@

import javax.sql.DataSource;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
Expand Down Expand Up @@ -67,7 +68,8 @@ public MysqlDagActionStore(Config config) throws IOException {
this.tableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);

this.dataSource = MysqlStateStore.newDataSource(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we deem this function not safe to use as it does not read from the shared threadpool, would it be appropriate to make MySqlStateStore.newDataSource package private?

this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());;
try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
createStatement.executeUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,20 @@ public static FlowSpec initFlowSpec(String specStore, URI uri){
* Create FLowSpec with specified URI and SpecStore location.
*/
public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName){
return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty());
return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty(), false);
}

public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs) {
public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs, boolean isAdhoc) {
Properties properties = new Properties();
properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
properties.put("job.name", flowName);
properties.put("job.group", flowGroup);
properties.put("specStore.fs.dir", specStore);
properties.put("specExecInstance.capabilities", "source:destination");
properties.put("job.schedule", "0 0 0 ? * * 2050");
if (!isAdhoc) {
properties.put("job.schedule", "0 0 0 ? * * 2050");
}
Config defaults = ConfigUtils.propertiesToConfig(properties);
Config config = additionalConfigs.withFallback(defaults);
SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
}

if (FlowCatalog.isCompileSuccessful(response) && this.userQuotaManager.isPresent() && !flowSpec.isExplain() &&
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
try {
// We only check quota for adhoc flow, since we don't have the execution id for run-immediately flow
userQuotaManager.get().checkQuota(dag.getStartNodes());
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
Expand All @@ -63,6 +64,7 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
log.info("Going to initialize mysqlUserQuotaManager");
Config quotaStoreConfig;
if (config.hasPath(CONFIG_PREFIX)) {
quotaStoreConfig = config.getConfig(CONFIG_PREFIX).withFallback(config);
Expand Down Expand Up @@ -225,10 +227,6 @@ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOExce
decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT);
}

String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);

String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
try {
for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
Expand All @@ -239,6 +237,10 @@ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOExce
log.error("Failed to release quota for requester list " + serializedRequesters, e);
return false;
}

String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
connection.commit();
} catch (SQLException ex) {
throw new IOException(ex);
Expand All @@ -265,7 +267,8 @@ protected MysqlQuotaStore createQuotaStore(Config config) throws IOException {
String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY,
ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE);

DataSource dataSource = MysqlStateStore.newDataSource(config);
DataSource dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());

return new MysqlQuotaStore(dataSource, quotaStoreTableName);
}
Expand All @@ -274,7 +277,8 @@ protected RunningDagIdsStore createRunningDagStore(Config config) throws IOExcep
String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.RUNNING_DAG_IDS_DB_TABLE_KEY,
ServiceConfigKeys.DEFAULT_RUNNING_DAG_IDS_DB_TABLE);

DataSource dataSource = MysqlStateStore.newDataSource(config);
DataSource dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());;

return new RunningDagIdsStore(dataSource, quotaStoreTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,6 +102,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
@Setter
private FlowStatusGenerator flowStatusGenerator;

private UserQuotaManager quotaManager;


private final ClassAliasResolver<SpecCompiler> aliasResolver;

private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
Expand Down Expand Up @@ -150,6 +154,8 @@ public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Op
}
this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
}

@Inject
Expand Down Expand Up @@ -247,6 +253,13 @@ public void orchestrate(Spec spec) throws Exception {
+ "concurrent executions are disabled for this flow.", flowGroup, flowName);
conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
Instrumented.markMeter(this.skippedFlowsMeter);
if (!((FlowSpec)spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can come in a later PR but we should have a static function to check if a FlowSpec is adhoc or not in the FlowSpec class

// For ad-hoc flow, we might already increase quota, we need to decrease here
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
for(Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
quotaManager.releaseQuota(dagNode);
}
}

// Send FLOW_FAILED event
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,11 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(response);
}

// Check quota limits against run immediately flows or adhoc flows before saving the schedule
// Check quota limits against adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer when we accept the flow
Copy link
Contributor

@umustafi umustafi Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik FlowConfigV2ResourceLocalHandler and FlowCatalog are not doing quota check on adding the spec. Where do we do the check in API layer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In warmStandyby mode, scheduler is not the listener when we add flow spec to flowCatalog. But compiler is the listener as defined here. And we check quota here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see did not notice the difference in warm standby mode.

if (!this.warmStandbyEnabled && (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
// This block should be reachable only for the first execution for the adhoc flows (flows that either do not have a schedule or have runImmediately=true.
if (!this.warmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
// This block should be reachable only for the execution for the adhoc flows
// For flow that has scheduler but run-immediately set to be true, we won't check the quota as we will use a different execution id later
if (quotaManager.isPresent()) {
// QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
serviceLauncher.start();

FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), "flowName0", "group1",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true")));
ConfigFactory.empty(), true);
FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), "flowName1", "group1",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true")));
ConfigFactory.empty(), true);

Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);
Expand Down