Skip to content

Commit

Permalink
fix the dead lock issue and use shared data source
Browse files Browse the repository at this point in the history
  • Loading branch information
Zihan Li committed Feb 7, 2023
1 parent d5e22ea commit b9e6fab
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@

import javax.sql.DataSource;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
Expand Down Expand Up @@ -67,7 +68,8 @@ public MysqlDagActionStore(Config config) throws IOException {
this.tableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);

this.dataSource = MysqlStateStore.newDataSource(config);
this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());;
try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
createStatement.executeUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
Expand All @@ -63,6 +64,7 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
log.info("Going to initialize mysqlUserQuotaManager");
Config quotaStoreConfig;
if (config.hasPath(CONFIG_PREFIX)) {
quotaStoreConfig = config.getConfig(CONFIG_PREFIX).withFallback(config);
Expand Down Expand Up @@ -225,10 +227,6 @@ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOExce
decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT);
}

String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);

String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
try {
for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
Expand All @@ -239,6 +237,10 @@ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOExce
log.error("Failed to release quota for requester list " + serializedRequesters, e);
return false;
}

String flowGroup =
ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
connection.commit();
} catch (SQLException ex) {
throw new IOException(ex);
Expand All @@ -265,7 +267,8 @@ protected MysqlQuotaStore createQuotaStore(Config config) throws IOException {
String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY,
ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE);

DataSource dataSource = MysqlStateStore.newDataSource(config);
DataSource dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());

return new MysqlQuotaStore(dataSource, quotaStoreTableName);
}
Expand All @@ -274,7 +277,8 @@ protected RunningDagIdsStore createRunningDagStore(Config config) throws IOExcep
String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.RUNNING_DAG_IDS_DB_TABLE_KEY,
ServiceConfigKeys.DEFAULT_RUNNING_DAG_IDS_DB_TABLE);

DataSource dataSource = MysqlStateStore.newDataSource(config);
DataSource dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());;

return new RunningDagIdsStore(dataSource, quotaStoreTableName);
}
Expand Down

0 comments on commit b9e6fab

Please sign in to comment.