From a18fe162082450e9938e23a237d43c2c97b7816a Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Fri, 25 Mar 2022 10:22:56 -0700 Subject: [PATCH 1/7] address comments --- .../source/extractor/extract/restapi/RestApiConnector.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java index 6cdc8b7741a..73fcd05a119 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java @@ -149,6 +149,8 @@ protected HttpClient getHttpClient() { .createClient(); if (httpClient instanceof Closeable) { this.closer.register((Closeable)httpClient); + } else { + log.warn("httpClient is not closable, we will not be able to handle the resources close, please make sure the implementation handle it correctly"); } } return this.httpClient; From 4fff7c52d4999c534d394f795f81c7bc8ed55bc1 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Fri, 25 Mar 2022 10:53:41 -0700 Subject: [PATCH 2/7] use connectionmanager when httpclient is not cloesable --- .../source/extractor/extract/restapi/RestApiConnector.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java index 73fcd05a119..6cdc8b7741a 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiConnector.java @@ -149,8 +149,6 @@ protected HttpClient getHttpClient() { .createClient(); if (httpClient instanceof Closeable) { this.closer.register((Closeable)httpClient); - } else { - log.warn("httpClient is not closable, we will not be able to handle the resources close, please make sure the implementation handle it correctly"); } } return this.httpClient; From 3fa7fba756c0e6a3b3a5398ebc8859945e4b52b0 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Fri, 27 Jan 2023 16:17:51 -0800 Subject: [PATCH 3/7] [GOBBLIN-1773] Fix bug in quota manager of gobblinservice where we increase quota twice for run-immediately flow --- .../modules/flow/BaseFlowToJobSpecCompiler.java | 3 ++- .../modules/orchestration/Orchestrator.java | 14 ++++++++++++++ .../scheduler/GobblinServiceJobScheduler.java | 5 +++-- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index 5b06f1eb472..c571690bc79 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -203,8 +203,9 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) { } if (FlowCatalog.isCompileSuccessful(response) && this.userQuotaManager.isPresent() && !flowSpec.isExplain() && - (!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) { + flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { try { + // We only check quota for adhoc flow, since we don't have the execution id for run-immediately flow userQuotaManager.get().checkQuota(dag.getStartNodes()); } catch (IOException e) { throw new RuntimeException(e); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 9ba8abd387d..54c6fe47ed4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.gobblin.util.PropertiesUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +103,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { @Setter private FlowStatusGenerator flowStatusGenerator; + private UserQuotaManager quotaManager; + + private final ClassAliasResolver aliasResolver; private Map flowGauges = Maps.newHashMap(); @@ -150,6 +155,8 @@ public Orchestrator(Config config, Optional topologyCatalog, Op } this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED); + quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class, + ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config); } @Inject @@ -247,6 +254,13 @@ public void orchestrate(Spec spec) throws Exception { + "concurrent executions are disabled for this flow.", flowGroup, flowName); conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED); Instrumented.markMeter(this.skippedFlowsMeter); + if (!((FlowSpec)spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { + // For ad-hoc flow, we might already increase quota, we need to decrease here + Dag jobExecutionPlanDag = specCompiler.compileFlow(spec); + for(Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) { + quotaManager.releaseQuota(dagNode); + } + } // Send FLOW_FAILED event Map flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 3f079106252..5222b84e9bb 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -330,8 +330,9 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { // Check quota limits against run immediately flows or adhoc flows before saving the schedule // In warm standby mode, this quota check will happen on restli API layer when we accept the flow - if (!this.warmStandbyEnabled && (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) { - // This block should be reachable only for the first execution for the adhoc flows (flows that either do not have a schedule or have runImmediately=true. + if (!this.warmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { + // This block should be reachable only for the execution for the adhoc flows + // For flow that has scheduler but run-immediately set to be true, we won't check teh quota as we will use a different execution id later if (quotaManager.isPresent()) { // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager try { From acd20fcb48588855902af83919e5a1cc7636fbbc Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Mon, 6 Feb 2023 17:12:15 -0800 Subject: [PATCH 4/7] change typo --- .../gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index c571690bc79..528b4d387ab 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -203,7 +203,7 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) { } if (FlowCatalog.isCompileSuccessful(response) && this.userQuotaManager.isPresent() && !flowSpec.isExplain() && - flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { + !flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { try { // We only check quota for adhoc flow, since we don't have the execution id for run-immediately flow userQuotaManager.get().checkQuota(dag.getStartNodes()); From d5e22eaf713d6c6bd7fb323dd4f865011bfcdd42 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Tue, 7 Feb 2023 11:36:50 -0800 Subject: [PATCH 5/7] fix unit test --- .../gobblin/runtime/spec_catalog/FlowCatalogTest.java | 8 +++++--- .../service/modules/orchestration/Orchestrator.java | 1 - .../modules/scheduler/GobblinServiceJobSchedulerTest.java | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java index f79d8501a65..3e0bb1bb18c 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java @@ -111,10 +111,10 @@ public static FlowSpec initFlowSpec(String specStore, URI uri){ * Create FLowSpec with specified URI and SpecStore location. */ public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName){ - return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty()); + return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty(), false); } - public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs) { + public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs, boolean isAdhoc) { Properties properties = new Properties(); properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName); properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup); @@ -122,7 +122,9 @@ public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, properties.put("job.group", flowGroup); properties.put("specStore.fs.dir", specStore); properties.put("specExecInstance.capabilities", "source:destination"); - properties.put("job.schedule", "0 0 0 ? * * 2050"); + if (!isAdhoc) { + properties.put("job.schedule", "0 0 0 ? * * 2050"); + } Config defaults = ConfigUtils.propertiesToConfig(properties); Config config = additionalConfigs.withFallback(defaults); SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 54c6fe47ed4..7dbc55011b8 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.gobblin.util.PropertiesUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 107892243f8..1c67f33e0d5 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -317,9 +317,9 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception { serviceLauncher.start(); FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), "flowName0", "group1", - ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true"))); + ConfigFactory.empty(), true); FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), "flowName1", "group1", - ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true"))); + ConfigFactory.empty(), true); Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class); SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class); From b9e6fab16c2f0bfe95cef06712de2277d28edbf3 Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Tue, 7 Feb 2023 13:55:18 -0800 Subject: [PATCH 6/7] fix the dead lock issue and use shared data source --- .../dag_action_store/MysqlDagActionStore.java | 6 ++++-- .../orchestration/MysqlUserQuotaManager.java | 18 +++++++++++------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java index caa5ddda73c..ce2d1b92380 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java @@ -30,8 +30,9 @@ import javax.sql.DataSource; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; import org.apache.gobblin.runtime.api.DagActionStore; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.util.ConfigUtils; @@ -67,7 +68,8 @@ public MysqlDagActionStore(Config config) throws IOException { this.tableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE); - this.dataSource = MysqlStateStore.newDataSource(config); + this.dataSource = MysqlDataSourceFactory.get(config, + SharedResourcesBrokerFactory.getImplicitBroker());; try (Connection connection = dataSource.getConnection(); PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { createStatement.executeUpdate(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java index 8f5c57e49e0..cdbce8e5e03 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java @@ -37,9 +37,10 @@ import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.exception.QuotaExceededException; -import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; import org.apache.gobblin.runtime.metrics.RuntimeMetrics; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.Dag; @@ -63,6 +64,7 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager { @Inject public MysqlUserQuotaManager(Config config) throws IOException { super(config); + log.info("Going to initialize mysqlUserQuotaManager"); Config quotaStoreConfig; if (config.hasPath(CONFIG_PREFIX)) { quotaStoreConfig = config.getConfig(CONFIG_PREFIX).withFallback(config); @@ -225,10 +227,6 @@ public boolean releaseQuota(Dag.DagNode dagNode) throws IOExce decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT); } - String flowGroup = - ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, ""); - decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT); - String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode); try { for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) { @@ -239,6 +237,10 @@ public boolean releaseQuota(Dag.DagNode dagNode) throws IOExce log.error("Failed to release quota for requester list " + serializedRequesters, e); return false; } + + String flowGroup = + ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, ""); + decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT); connection.commit(); } catch (SQLException ex) { throw new IOException(ex); @@ -265,7 +267,8 @@ protected MysqlQuotaStore createQuotaStore(Config config) throws IOException { String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY, ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE); - DataSource dataSource = MysqlStateStore.newDataSource(config); + DataSource dataSource = MysqlDataSourceFactory.get(config, + SharedResourcesBrokerFactory.getImplicitBroker()); return new MysqlQuotaStore(dataSource, quotaStoreTableName); } @@ -274,7 +277,8 @@ protected RunningDagIdsStore createRunningDagStore(Config config) throws IOExcep String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.RUNNING_DAG_IDS_DB_TABLE_KEY, ServiceConfigKeys.DEFAULT_RUNNING_DAG_IDS_DB_TABLE); - DataSource dataSource = MysqlStateStore.newDataSource(config); + DataSource dataSource = MysqlDataSourceFactory.get(config, + SharedResourcesBrokerFactory.getImplicitBroker());; return new RunningDagIdsStore(dataSource, quotaStoreTableName); } From e38658538b75bc876e21cce4e4e09dd7a96b6fcd Mon Sep 17 00:00:00 2001 From: Zihan Li Date: Wed, 8 Feb 2023 14:09:27 -0800 Subject: [PATCH 7/7] address comments --- .../java/org/apache/gobblin/metastore/MysqlStateStore.java | 2 +- .../service/modules/scheduler/GobblinServiceJobScheduler.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java index c2bc1e68a67..85d5cbc5b9e 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java @@ -201,7 +201,7 @@ protected String getCreateJobStateTableTemplate() { * @param config the properties used for datasource instantiation * @return */ - public static DataSource newDataSource(Config config) { + static DataSource newDataSource(Config config) { HikariDataSource dataSource = new HikariDataSource(); PasswordManager passwordManager = PasswordManager.getInstance(ConfigUtils.configToProperties(config)); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 5222b84e9bb..9c365e54fbc 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -328,11 +328,11 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(response); } - // Check quota limits against run immediately flows or adhoc flows before saving the schedule + // Check quota limits against adhoc flows before saving the schedule // In warm standby mode, this quota check will happen on restli API layer when we accept the flow if (!this.warmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { // This block should be reachable only for the execution for the adhoc flows - // For flow that has scheduler but run-immediately set to be true, we won't check teh quota as we will use a different execution id later + // For flow that has scheduler but run-immediately set to be true, we won't check the quota as we will use a different execution id later if (quotaManager.isPresent()) { // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager try {