From a217af07df2fb941e5ba0b8df2e8b9079a558727 Mon Sep 17 00:00:00 2001 From: William Lo Date: Tue, 31 May 2022 01:07:03 -0700 Subject: [PATCH 01/14] Add e2e tests and set http response code for quota exceeded --- .../FlowConfigV2ResourceLocalHandler.java | 14 ++- .../runtime/spec_catalog/FlowCatalog.java | 14 ++- .../runtime/spec_catalog/FlowCatalogTest.java | 18 ++-- .../core/GobblinServiceGuiceModule.java | 3 + .../modules/core/GobblinServiceManager.java | 5 ++ .../orchestration/QuotaExceededException.java | 27 ++++++ .../orchestration/UserQuotaManager.java | 24 +++-- .../scheduler/GobblinServiceJobScheduler.java | 25 +++++- .../service/GobblinServiceManagerTest.java | 23 ++++- .../orchestration/UserQuotaManagerTest.java | 2 - .../GobblinServiceJobSchedulerTest.java | 90 +++++++++++++++++-- 11 files changed, 210 insertions(+), 35 deletions(-) create mode 100644 gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java index b7249c1a246..79fa00b86fd 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -39,6 +39,9 @@ import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; + + @Slf4j public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsV2ResourceHandler { @@ -60,9 +63,8 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL } log.info(createLog); FlowSpec flowSpec = createFlowSpecForConfig(flowConfig); - FlowStatusId flowStatusId = new FlowStatusId() - .setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY)) - .setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY)); + FlowStatusId flowStatusId = + new FlowStatusId().setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY)).setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY)); if (flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { flowStatusId.setFlowExecutionId(Long.valueOf(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))); } else { @@ -77,6 +79,8 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL } Map responseMap = this.flowCatalog.put(flowSpec, triggerListener); + // Values is either true, false, or an exception class + message + AddSpecResponse response = responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")); HttpStatus httpStatus; if (flowConfig.hasExplain() && flowConfig.isExplain()) { @@ -88,8 +92,10 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL addSpecResponse != null && addSpecResponse.getValue() != null ? StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : ""); flowConfig.setProperties(props); httpStatus = HttpStatus.S_200_OK; - } else if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) { + } else if (Boolean.parseBoolean(response.getValue())) { httpStatus = HttpStatus.S_201_CREATED; + } else if (response.getValue().contains(QuotaExceededException.class.getSimpleName())) { + throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, response.getValue()); } else { throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, getErrorMessage(flowSpec)); } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index c9c17b02739..93e43d6176e 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -29,6 +29,7 @@ import java.util.Properties; import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -360,8 +361,12 @@ public Map put(Spec spec, boolean triggerListener) { responseMap.put(entry.getKey().getName(), entry.getValue().getResult()); } } + AddSpecResponse schedulerResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null)); - if (isCompileSuccessful(responseMap)) { + if (isCompileSuccessful(schedulerResponse.getValue())) { + if (schedulerResponse.getValue().contains(QuotaExceededException.class.getSimpleName())) { + responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>(schedulerResponse.getValue())); + } synchronized (syncObject) { try { if (!flowSpec.isExplain()) { @@ -384,13 +389,6 @@ public Map put(Spec spec, boolean triggerListener) { return responseMap; } - public static boolean isCompileSuccessful(Map responseMap) { - // If we cannot get the response from the scheduler, assume that the flow failed compilation - AddSpecResponse addSpecResponse = responseMap.getOrDefault( - ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null)); - return isCompileSuccessful(addSpecResponse.getValue()); - } - public static boolean isCompileSuccessful(String dag) { return dag != null && !dag.contains(ConfigException.class.getSimpleName()); } diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java index 319a0f384a3..46b13203617 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java @@ -21,6 +21,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import java.io.File; import java.net.URI; import java.util.Collection; @@ -105,19 +106,24 @@ public static FlowSpec initFlowSpec(String specStore, URI uri){ return initFlowSpec(specStore, uri, "flowName"); } - /** - * Create FLowSpec with specified URI and SpecStore location. - */ + /** + * Create FLowSpec with specified URI and SpecStore location. + */ public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName){ + return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty()); + } + + public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs) { Properties properties = new Properties(); properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName); + properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup); properties.put("job.name", flowName); - properties.put("job.group", flowName); + properties.put("job.group", flowGroup); properties.put("specStore.fs.dir", specStore); properties.put("specExecInstance.capabilities", "source:destination"); properties.put("job.schedule", "0 0 0 ? * * 2050"); - Config config = ConfigUtils.propertiesToConfig(properties); - + Config defaults = ConfigUtils.propertiesToConfig(properties); + Config config = additionalConfigs.withFallback(defaults); SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config); FlowSpec.Builder flowSpecBuilder = null; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java index 1a1bce80409..efb03339c46 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java @@ -19,6 +19,7 @@ import java.util.Objects; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.helix.HelixManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,6 +195,8 @@ public void configure(Binder binder) { binder.bind(Orchestrator.class); binder.bind(SchedulerService.class); binder.bind(GobblinServiceJobScheduler.class); + OptionalBinder.newOptionalBinder(binder, UserQuotaManager.class); + binder.bind(UserQuotaManager.class); } if (serviceConfig.isGitConfigMonitorEnabled()) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index c7fc958d01f..d35eef64b90 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -31,6 +31,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.ObjectUtils; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -197,6 +198,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri @Inject protected ServiceDatabaseManager databaseManager; + @Inject(optional=true) + @Getter + protected Optional quotaManager; + protected Optional helixLeaderGauges; @Inject(optional = true) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java new file mode 100644 index 00000000000..946a7a4d200 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; + + +public class QuotaExceededException extends IOException { + + public QuotaExceededException(String message) { + super(QuotaExceededException.class.getSimpleName() + ": " + message); + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java index c49cdfc7f21..eaa99b306fd 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service.modules.orchestration; import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; import com.typesafe.config.Config; import java.io.IOException; import java.util.HashSet; @@ -25,6 +26,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.service.RequesterService; @@ -40,6 +42,7 @@ * is exceeded, then the execution will fail without running on the underlying executor */ @Slf4j +@Singleton public class UserQuotaManager { public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota"; public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota"; @@ -54,7 +57,8 @@ public class UserQuotaManager { Map runningDagIds = new ConcurrentHashMap<>(); private final int defaultQuota; - UserQuotaManager(Config config) { + @Inject + public UserQuotaManager(Config config) { this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA); ImmutableMap.Builder userMapBuilder = ImmutableMap.builder(); ImmutableMap.Builder flowGroupMapBuilder = ImmutableMap.builder(); @@ -72,9 +76,9 @@ public class UserQuotaManager { /** * Checks if the dagNode exceeds the statically configured user quota for both the proxy user, requester user, and flowGroup - * @throws IOException if the quota is exceeded, and logs a statement + * @throws QuotaExceededException if the quota is exceeded, and logs a statement */ - public void checkQuota(Dag.DagNode dagNode, boolean onInit) throws IOException { + public void checkQuota(Dag.DagNode dagNode, boolean onInit) throws QuotaExceededException { // Dag is already being tracked, no need to double increment for retries and multihop flows if (isDagCurrentlyRunning(dagNode)) { return; @@ -103,8 +107,16 @@ public void checkQuota(Dag.DagNode dagNode, boolean onInit) th boolean requesterCheck = true; if (serializedRequesters != null) { - List uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream() - .map(ServiceRequester::getName).distinct().collect(Collectors.toList()); + List uniqueRequesters; + try { + uniqueRequesters = RequesterService.deserialize(serializedRequesters) + .stream() + .map(ServiceRequester::getName) + .distinct() + .collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException("Could not process requesters due to ", e); + } for (String requester : uniqueRequesters) { int userQuotaIncrement = incrementJobCountAndCheckQuota( DagManagerUtils.getUserQuotaKey(requester, dagNode), requesterToJobCount, dagNode, getQuotaForUser(requester)); @@ -135,7 +147,7 @@ public void checkQuota(Dag.DagNode dagNode, boolean onInit) th decrementQuotaUsage(flowGroupToJobCount, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode)); decrementQuotaUsageForUsers(usersQuotaIncrement); runningDagIds.remove(DagManagerUtils.generateDagId(dagNode)); - throw new IOException(requesterMessage.toString()); + throw new QuotaExceededException(requesterMessage.toString()); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index ffd2f15a497..659b2b74b34 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -54,6 +54,8 @@ import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; +import org.apache.gobblin.service.modules.orchestration.QuotaExceededException; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.utils.InjectionNames; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; @@ -91,6 +93,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata protected final Optional flowCatalog; protected final Optional helixManager; protected final Orchestrator orchestrator; + protected final Optional quotaManager; @Getter protected final Map scheduledFlowSpecs; @Getter @@ -119,7 +122,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata @Inject public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName, Config config, Optional helixManager, Optional flowCatalog, Optional topologyCatalog, - Orchestrator orchestrator, SchedulerService schedulerService, Optional log) throws Exception { + Orchestrator orchestrator, SchedulerService schedulerService, Optional quotaManager, Optional log) throws Exception { super(ConfigUtils.configToProperties(config), schedulerService); _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); @@ -130,14 +133,15 @@ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser this.scheduledFlowSpecs = Maps.newHashMap(); this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED) && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED); + this.quotaManager = quotaManager; } public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator, Optional helixManager, - Optional flowCatalog, Optional topologyCatalog, Optional dagManager, + Optional flowCatalog, Optional topologyCatalog, Optional dagManager, Optional quotaManager, SchedulerService schedulerService, Optional log) throws Exception { this(serviceName, config, helixManager, flowCatalog, topologyCatalog, - new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, log); + new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, quotaManager, log); } public synchronized void setActive(boolean isActive) { @@ -336,10 +340,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { } if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) { _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec); + try { + if (quotaManager.isPresent()) { + quotaManager.get().checkQuota(dag.getNodes().get(0), false); + } + } catch (QuotaExceededException e) { + return new AddSpecResponse<>(e.getMessage()); + } this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null)); } } else { _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec); + try { + if (quotaManager.isPresent()) { + quotaManager.get().checkQuota(dag.getNodes().get(0), false); + } + } catch (QuotaExceededException e) { + _log.info(e.toString()); + return new AddSpecResponse<>(e.toString()); + } this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null)); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java index 2572fda66b3..13e607d1981 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java @@ -27,6 +27,8 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; +import org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.hadoop.fs.Path; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jgit.api.Git; @@ -162,7 +164,7 @@ public void setup() throws Exception { serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false); serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, MockedSpecCompiler.class.getCanonicalName()); - + serviceCoreProperties.put(UserQuotaManager.PER_USER_QUOTA, "testUser:1"); transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, "10000"); // Create a bare repository @@ -308,6 +310,25 @@ public void testRunOnceJob() throws Exception { } @Test (dependsOnMethods = "testRunOnceJob") + public void testRunQuotaExceeds() throws Exception { + Map props = flowProperties; + props.put(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, "testUser"); + FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(props)); + + this.flowConfigClient.createFlowConfig(flowConfig); + + FlowConfig flowConfig2 = new FlowConfig().setId(TEST_FLOW_ID2) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(props)); + + try { + this.flowConfigClient.createFlowConfig(flowConfig2); + } catch (RestLiResponseException e) { + Assert.assertEquals(e.getStatus(), HttpStatus.SERVICE_UNAVAILABLE_503); + } + } + + @Test (dependsOnMethods = "testRunQuotaExceeds") public void testExplainJob() throws Exception { FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME)) .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)).setExplain(true); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java index 98ebc6bca1f..9fc9438dbb5 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java @@ -52,8 +52,6 @@ public void testExceedsQuotaOnStartup() throws Exception { this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), true); // Should not be throwing the exception this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), true); - - // TODO: add verification when adding a public method for getting the current count and quota per user } @Test diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index a9323971bf9..6cd1a75053b 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -21,26 +21,38 @@ import com.google.common.io.Files; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; import java.io.File; import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecCatalogListener; +import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flow.MockedSpecCompiler; import org.apache.gobblin.service.modules.flow.SpecCompiler; +import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest; +import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; import org.apache.gobblin.testing.AssertWithBackoff; import org.apache.gobblin.util.ConfigUtils; @@ -48,6 +60,7 @@ import org.mockito.invocation.Invocation; import org.mockito.stubbing.Answer; import org.testng.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.apache.gobblin.runtime.spec_catalog.FlowCatalog.*; @@ -61,6 +74,11 @@ public class GobblinServiceJobSchedulerTest { private static final String TEST_SCHEDULE = "0 1/0 * ? * *"; private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template"; + private Config quotaConfig; + @BeforeClass + public void setUp() { + this.quotaConfig = ConfigFactory.empty().withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA, ConfigValueFactory.fromAnyRef("group1:1")); + } /** * Test whenever JobScheduler is calling setActive, the FlowSpec is loading into scheduledFlowSpecs (eventually) */ @@ -90,10 +108,11 @@ public void testJobSchedulerInit() throws Exception { Assert.assertEquals(flowCatalog.getSpecs().size(), 2); Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class); + UserQuotaManager quotaManager = new UserQuotaManager(quotaConfig); // Mock a GaaS scheduler. TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler", - ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, null); + ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(quotaManager), null); SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class); Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler); @@ -179,7 +198,7 @@ public void testJobSchedulerInitWithFailedSpec() throws Exception { // Mock a GaaS scheduler. TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler", - ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, null); + ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), null); SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class); Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler); @@ -242,7 +261,7 @@ public void testJobSchedulerUnschedule() throws Exception { SchedulerService schedulerService = new SchedulerService(new Properties()); // Mock a GaaS scheduler. TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler", - ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService ); + ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), schedulerService ); schedulerService.startAsync().awaitRunning(); scheduler.startUp(); @@ -283,14 +302,60 @@ public boolean apply(Void input) { Assert.assertEquals(schedulerService.getScheduler().getJobGroupNames().size(), 0); } + @Test + public void testJobSchedulerAddFlowQuotaExceeded() throws Exception { + File specDir = Files.createTempDir(); + + Properties properties = new Properties(); + properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath()); + FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties)); + ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest"); + + + serviceLauncher.addService(flowCatalog); + serviceLauncher.start(); + + FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), "flowName0", "group1", + ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true"))); + FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), "flowName1", "group1", + ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true"))); + + Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class); + SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class); + when(mockOrchestrator.getSpecCompiler()).thenReturn(mockSpecCompiler); + Dag mockDag0 = this.buildDag(flowSpec0.getConfig(), "0"); + Dag mockDag1 = this.buildDag(flowSpec1.getConfig(), "1"); + when(mockSpecCompiler.compileFlow(flowSpec0)).thenReturn(mockDag0); + when(mockSpecCompiler.compileFlow(flowSpec1)).thenReturn(mockDag1); + + SchedulerService schedulerService = new SchedulerService(new Properties()); + // Mock a GaaS scheduler. + GobblinServiceJobScheduler scheduler = new GobblinServiceJobScheduler("testscheduler", + ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new UserQuotaManager(quotaConfig)), Optional.absent()); + + schedulerService.startAsync().awaitRunning(); + scheduler.startUp(); + scheduler.setActive(true); + + scheduler.onAddSpec(flowSpec0); //Ignore the response for this request + AddSpecResponse response1 = scheduler.onAddSpec(flowSpec1); + + Assert.assertEquals(response1.getValue(), "QuotaExceededException: Quota exceeded for flowgroup group1 on executor jobExecutor : quota=1, requests above quota=1\n"); + // set scheduler to be inactive and unschedule flows + scheduler.setActive(false); + + Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0); + Assert.assertEquals(schedulerService.getScheduler().getJobGroupNames().size(), 0); + } + class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler { public boolean isCompilerHealthy = false; private boolean hasScheduler = false; public TestGobblinServiceJobScheduler(String serviceName, Config config, - Optional flowCatalog, Optional topologyCatalog, Orchestrator orchestrator, + Optional flowCatalog, Optional topologyCatalog, Orchestrator orchestrator, Optional quotaManager, SchedulerService schedulerService) throws Exception { - super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, Optional.absent()); + super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent()); if (schedulerService != null) { hasScheduler = true; } @@ -318,4 +383,19 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse(addedSpec.getDescription()); } } + + Dag buildDag(Config additionalConfig, String id) throws URISyntaxException { + Config config = ConfigFactory.empty(). + withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); + + config = additionalConfig.withFallback(config); + List jobExecutionPlans = new ArrayList<>(); + JobSpec js = JobSpec.builder("test_job_" + id).withVersion(id).withConfig(config). + withTemplate(new URI("job_" + id)).build(); + SpecExecutor specExecutor = InMemorySpecExecutor.createDummySpecExecutor(new URI("jobExecutor")); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); + jobExecutionPlan.setCurrentAttempts(1); + jobExecutionPlans.add(jobExecutionPlan); + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } } \ No newline at end of file From b64ea2e3536ffe000626d6654c745f0f63ae695b Mon Sep 17 00:00:00 2001 From: William Lo Date: Tue, 31 May 2022 12:55:16 -0700 Subject: [PATCH 02/14] cleanup --- .../runtime/spec_catalog/FlowCatalog.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 93e43d6176e..906d121978d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -364,17 +364,18 @@ public Map put(Spec spec, boolean triggerListener) { AddSpecResponse schedulerResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null)); if (isCompileSuccessful(schedulerResponse.getValue())) { - if (schedulerResponse.getValue().contains(QuotaExceededException.class.getSimpleName())) { - responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>(schedulerResponse.getValue())); - } synchronized (syncObject) { try { - if (!flowSpec.isExplain()) { - long startTime = System.currentTimeMillis(); - specStore.addSpec(spec); - metrics.updatePutSpecTime(startTime); + if (schedulerResponse.getValue().contains(QuotaExceededException.class.getSimpleName())) { + responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>(schedulerResponse.getValue())); + } else { + if (!flowSpec.isExplain()) { + long startTime = System.currentTimeMillis(); + specStore.addSpec(spec); + metrics.updatePutSpecTime(startTime); + } + responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true")); } - responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true")); } catch (IOException e) { throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e); } finally { From 503b654eca733394d162a8c4d612fa8dd2883a0b Mon Sep 17 00:00:00 2001 From: William Lo Date: Tue, 31 May 2022 14:17:01 -0700 Subject: [PATCH 03/14] Fix checkstyle test --- .../modules/scheduler/GobblinServiceJobSchedulerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 6cd1a75053b..07a7677ca64 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.api.FlowSpec; From 14cb9708f5a9a7d1d7c9162e5d611d5b064e0e42 Mon Sep 17 00:00:00 2001 From: William Lo Date: Tue, 31 May 2022 14:48:03 -0700 Subject: [PATCH 04/14] Improve guard against schedule change if quota is exceeded --- .../orchestration/QuotaExceededException.java | 2 +- .../scheduler/GobblinServiceJobScheduler.java | 27 +++++++++---------- .../GobblinServiceJobSchedulerTest.java | 7 ++--- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java index 946a7a4d200..68da12418aa 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java @@ -22,6 +22,6 @@ public class QuotaExceededException extends IOException { public QuotaExceededException(String message) { - super(QuotaExceededException.class.getSimpleName() + ": " + message); + super(message); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 659b2b74b34..e1805a0a289 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -326,6 +326,18 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(response); } + // Check quota limits against run immediately flows or adhoc flows before saving the schedule + if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) { + try { + if (quotaManager.isPresent()) { + quotaManager.get().checkQuota(dag.getNodes().get(0), false); + } + } catch (QuotaExceededException e) { + _log.info(e.toString()); + return new AddSpecResponse<>(e.toString()); + } + } + // todo : we should probably not schedule a flow if it is a runOnce flow this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec); @@ -340,25 +352,10 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { } if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) { _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec); - try { - if (quotaManager.isPresent()) { - quotaManager.get().checkQuota(dag.getNodes().get(0), false); - } - } catch (QuotaExceededException e) { - return new AddSpecResponse<>(e.getMessage()); - } this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null)); } } else { _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec); - try { - if (quotaManager.isPresent()) { - quotaManager.get().checkQuota(dag.getNodes().get(0), false); - } - } catch (QuotaExceededException e) { - _log.info(e.toString()); - return new AddSpecResponse<>(e.toString()); - } this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null)); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 07a7677ca64..d908a4bb785 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -338,13 +338,14 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception { scheduler.onAddSpec(flowSpec0); //Ignore the response for this request AddSpecResponse response1 = scheduler.onAddSpec(flowSpec1); + Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); - Assert.assertEquals(response1.getValue(), "QuotaExceededException: Quota exceeded for flowgroup group1 on executor jobExecutor : quota=1, requests above quota=1\n"); + Assert.assertEquals(response1.getValue(), "org.apache.gobblin.service.modules.orchestration.QuotaExceededException: Quota exceeded for flowgroup group1 on executor jobExecutor : quota=1, requests above quota=1\n"); + // Second flow should not be added to scheduled flows since it was rejected + Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); // set scheduler to be inactive and unschedule flows scheduler.setActive(false); - Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0); - Assert.assertEquals(schedulerService.getScheduler().getJobGroupNames().size(), 0); } class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler { From bf11a575e3bd821836932359b7acc2d0ec70a908 Mon Sep 17 00:00:00 2001 From: William Lo Date: Wed, 1 Jun 2022 23:23:25 -0700 Subject: [PATCH 05/14] Fix bug relating to exception propagation and scheduler not checking quota due to current attempt number --- .../gobblin/service/FlowConfigV2ResourceLocalHandler.java | 2 +- .../org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java | 2 +- .../service/modules/orchestration/UserQuotaManager.java | 3 ++- .../service/modules/scheduler/GobblinServiceJobScheduler.java | 2 +- .../modules/scheduler/GobblinServiceJobSchedulerTest.java | 3 +-- .../org/apache/gobblin/exception}/QuotaExceededException.java | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) rename {gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration => gobblin-utility/src/main/java/org/apache/gobblin/exception}/QuotaExceededException.java (94%) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java index 79fa00b86fd..40efd5582f4 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -36,10 +36,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; -import org.apache.hadoop.hdfs.protocol.QuotaExceededException; @Slf4j diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 906d121978d..248ee405bec 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -29,7 +29,7 @@ import java.util.Properties; import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.gobblin.exception.QuotaExceededException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java index eaa99b306fd..3e354001953 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java @@ -29,6 +29,7 @@ import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.service.RequesterService; import org.apache.gobblin.service.ServiceRequester; import org.apache.gobblin.service.modules.flowgraph.Dag; @@ -160,7 +161,7 @@ public void checkQuota(Dag.DagNode dagNode, boolean onInit) th */ private int incrementJobCountAndCheckQuota(String key, Map quotaMap, Dag.DagNode dagNode, int quotaForKey) { // Only increment job count for first attempt, since job is considered running between retries - if (dagNode.getValue().getCurrentAttempts() != 1) { + if (dagNode.getValue().getCurrentAttempts() > 1) { return 0; } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index e1805a0a289..bebc9411ebc 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -35,6 +35,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.metrics.MetricContext; @@ -54,7 +55,6 @@ import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; -import org.apache.gobblin.service.modules.orchestration.QuotaExceededException; import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.utils.InjectionNames; diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index d908a4bb785..e8b51fda826 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -340,7 +340,7 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception { AddSpecResponse response1 = scheduler.onAddSpec(flowSpec1); Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); - Assert.assertEquals(response1.getValue(), "org.apache.gobblin.service.modules.orchestration.QuotaExceededException: Quota exceeded for flowgroup group1 on executor jobExecutor : quota=1, requests above quota=1\n"); + Assert.assertEquals(response1.getValue(), "org.apache.gobblin.exception.QuotaExceededException: Quota exceeded for flowgroup group1 on executor jobExecutor : quota=1, requests above quota=1\n"); // Second flow should not be added to scheduled flows since it was rejected Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); // set scheduler to be inactive and unschedule flows @@ -394,7 +394,6 @@ Dag buildDag(Config additionalConfig, String id) throws URISyn withTemplate(new URI("job_" + id)).build(); SpecExecutor specExecutor = InMemorySpecExecutor.createDummySpecExecutor(new URI("jobExecutor")); JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); - jobExecutionPlan.setCurrentAttempts(1); jobExecutionPlans.add(jobExecutionPlan); return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java b/gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java similarity index 94% rename from gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java rename to gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java index 68da12418aa..6ab63d150ee 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/QuotaExceededException.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gobblin.service.modules.orchestration; +package org.apache.gobblin.exception; import java.io.IOException; From 268bcaab5a4c7a903609aebfdcb30222050345a5 Mon Sep 17 00:00:00 2001 From: William Lo Date: Tue, 7 Jun 2022 16:54:55 -0700 Subject: [PATCH 06/14] Address review comments --- .../org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java | 2 ++ .../service/modules/scheduler/GobblinServiceJobScheduler.java | 1 + .../modules/scheduler/GobblinServiceJobSchedulerTest.java | 3 ++- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 248ee405bec..a2015d691e2 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -363,9 +363,11 @@ public Map put(Spec spec, boolean triggerListener) { } AddSpecResponse schedulerResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null)); + // Check that the flow configuration is valid and matches to a corresponding edge if (isCompileSuccessful(schedulerResponse.getValue())) { synchronized (syncObject) { try { + // Even if the flow is valid, reject the flow if there are not enough resources available to run it if (schedulerResponse.getValue().contains(QuotaExceededException.class.getSimpleName())) { responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>(schedulerResponse.getValue())); } else { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index bebc9411ebc..42dabf89b9d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -330,6 +330,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) { try { if (quotaManager.isPresent()) { + // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager quotaManager.get().checkQuota(dag.getNodes().get(0), false); } } catch (QuotaExceededException e) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index e8b51fda826..5c67723677e 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Properties; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; @@ -340,7 +341,7 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception { AddSpecResponse response1 = scheduler.onAddSpec(flowSpec1); Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); - Assert.assertEquals(response1.getValue(), "org.apache.gobblin.exception.QuotaExceededException: Quota exceeded for flowgroup group1 on executor jobExecutor : quota=1, requests above quota=1\n"); + Assert.assertTrue(response1.getValue().contains(QuotaExceededException.class.getSimpleName())); // Second flow should not be added to scheduled flows since it was rejected Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); // set scheduler to be inactive and unschedule flows From 28f1f4f938fa1553a709543a4d8c4250c2ddccc7 Mon Sep 17 00:00:00 2001 From: William Lo Date: Sun, 26 Jun 2022 19:10:11 -0700 Subject: [PATCH 07/14] Refactor based on review feedback --- .../FlowConfigResourceLocalHandler.java | 18 ++++++++-- .../FlowConfigV2ResourceLocalHandler.java | 17 +++++++--- .../runtime/api/MutableSpecCatalog.java | 2 +- .../runtime/api/SpecCatalogListener.java | 2 +- .../runtime/spec_catalog/FlowCatalog.java | 28 ++++++++-------- .../runtime/spec_catalog/FlowCatalogTest.java | 33 +++++++++++++++++-- .../modules/core/GitConfigMonitor.java | 2 +- .../orchestration/DagManagerUtils.java | 17 ++++++++++ .../orchestration/UserQuotaManager.java | 14 ++------ .../scheduler/GobblinServiceJobScheduler.java | 11 +++---- .../orchestration/OrchestratorTest.java | 2 +- .../GobblinServiceJobSchedulerTest.java | 11 +++---- .../util/callbacks/CallbackResult.java | 3 +- 13 files changed, 106 insertions(+), 54 deletions(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java index 8430acf1ed8..58195a94b39 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java @@ -42,6 +42,7 @@ import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.ContextAwareMeter; import org.apache.gobblin.metrics.MetricContext; @@ -133,7 +134,13 @@ public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerLis if (!flowConfig.hasSchedule() && this.flowCatalog.exists(flowSpec.getUri())) { return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_409_CONFLICT); } else { - this.flowCatalog.put(flowSpec, triggerListener); + try { + this.flowCatalog.put(flowSpec, triggerListener); + } catch (Throwable e) { + if (e instanceof QuotaExceededException) { + throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); + } + } return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED); } } @@ -168,8 +175,13 @@ public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boo originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE); flowConfig = originalFlowConfig; } - - this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener); + try { + this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener); + } catch (Throwable e) { + if (e instanceof QuotaExceededException) { + throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); + } + } return new UpdateResponse(HttpStatus.S_200_OK); } diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java index 40efd5582f4..50ea62567d3 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringEscapeUtils; @@ -64,7 +65,8 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL log.info(createLog); FlowSpec flowSpec = createFlowSpecForConfig(flowConfig); FlowStatusId flowStatusId = - new FlowStatusId().setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY)).setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY)); + new FlowStatusId().setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY)) + .setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY)); if (flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { flowStatusId.setFlowExecutionId(Long.valueOf(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))); } else { @@ -78,8 +80,15 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL "FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken")); } - Map responseMap = this.flowCatalog.put(flowSpec, triggerListener); - // Values is either true, false, or an exception class + message + Map responseMap = new HashMap<>(); + try { + responseMap = this.flowCatalog.put(flowSpec, triggerListener); + } catch (Throwable e) { + // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings + if (e instanceof QuotaExceededException) { + throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); + } + } AddSpecResponse response = responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")); HttpStatus httpStatus; @@ -94,8 +103,6 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL httpStatus = HttpStatus.S_200_OK; } else if (Boolean.parseBoolean(response.getValue())) { httpStatus = HttpStatus.S_201_CREATED; - } else if (response.getValue().contains(QuotaExceededException.class.getSimpleName())) { - throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, response.getValue()); } else { throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, getErrorMessage(flowSpec)); } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java index 3c1573a2ea6..e6c81f3aacf 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java @@ -48,7 +48,7 @@ public interface MutableSpecCatalog extends SpecCatalog { * on adding a {@link Spec} to the {@link SpecCatalog}. The key for each entry is the name of the {@link SpecCatalogListener} * and the value is the result of the the action taken by the listener returned as an instance of {@link AddSpecResponse}. * */ - Map put(Spec spec); + Map put(Spec spec) throws Throwable; /** * Removes an existing {@link Spec} with the given URI. diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java index 67f2e39ca96..c09784e1951 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java @@ -48,7 +48,7 @@ public AddSpecCallback(Spec addedSpec) { _addedSpec = addedSpec; } - @Override public AddSpecResponse apply(SpecCatalogListener listener) { + public AddSpecResponse apply(SpecCatalogListener listener) { return listener.onAddSpec(_addedSpec); } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index a2015d691e2..f459c9588e4 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -29,7 +29,6 @@ import java.util.Properties; import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.gobblin.exception.QuotaExceededException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -343,7 +342,7 @@ public Spec getSpecWrapper(URI uri) { * @param triggerListener True if listeners should be notified. * @return a map of listeners and their {@link AddSpecResponse}s */ - public Map put(Spec spec, boolean triggerListener) { + public Map put(Spec spec, boolean triggerListener) throws Throwable { Map responseMap = new HashMap<>(); FlowSpec flowSpec = (FlowSpec) spec; Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); @@ -356,10 +355,16 @@ public Map put(Spec spec, boolean triggerListener) { if (triggerListener) { AddSpecResponse> response = this.listeners.onAddSpec(flowSpec); - // If flow fails compilation, the result will have a non-empty string with the error for (Map.Entry> entry : response.getValue().getSuccesses().entrySet()) { responseMap.put(entry.getKey().getName(), entry.getValue().getResult()); } + // If flow fails compilation, the result will have a non-empty string with the error + if (response.getValue().getFailures().size() > 0) { + for (Map.Entry> entry : response.getValue().getFailures().entrySet()) { + throw entry.getValue().getError().getCause(); + } + return responseMap; + } } AddSpecResponse schedulerResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null)); @@ -367,17 +372,12 @@ public Map put(Spec spec, boolean triggerListener) { if (isCompileSuccessful(schedulerResponse.getValue())) { synchronized (syncObject) { try { - // Even if the flow is valid, reject the flow if there are not enough resources available to run it - if (schedulerResponse.getValue().contains(QuotaExceededException.class.getSimpleName())) { - responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>(schedulerResponse.getValue())); - } else { - if (!flowSpec.isExplain()) { - long startTime = System.currentTimeMillis(); - specStore.addSpec(spec); - metrics.updatePutSpecTime(startTime); - } - responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true")); + if (!flowSpec.isExplain()) { + long startTime = System.currentTimeMillis(); + specStore.addSpec(spec); + metrics.updatePutSpecTime(startTime); } + responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true")); } catch (IOException e) { throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e); } finally { @@ -397,7 +397,7 @@ public static boolean isCompileSuccessful(String dag) { } @Override - public Map put(Spec spec) { + public Map put(Spec spec) throws Throwable { return put(spec, true); } diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java index 46b13203617..f79d8501a65 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java @@ -29,6 +29,7 @@ import java.util.Properties; import org.apache.commons.io.FileUtils; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecCatalogListener; @@ -147,7 +148,7 @@ public void cleanUp() throws Exception { } @Test - public void createFlowSpec() { + public void createFlowSpec() throws Throwable { // List Current Specs Collection specs = flowCatalog.getSpecs(); logger.info("[Before Create] Number of specs: " + specs.size()); @@ -205,7 +206,7 @@ public void deleteFlowSpec() throws SpecNotFoundException { } @Test (dependsOnMethods = "deleteFlowSpec") - public void testRejectBadFlow() { + public void testRejectBadFlow() throws Throwable { Collection specs = flowCatalog.getSpecs(); logger.info("[Before Create] Number of specs: " + specs.size()); int i=0; @@ -229,7 +230,7 @@ public void testRejectBadFlow() { } @Test (dependsOnMethods = "testRejectBadFlow") - public void testRejectMissingListener() { + public void testRejectMissingListener() throws Throwable { flowCatalog.removeListener(this.mockListener); Collection specs = flowCatalog.getSpecs(); logger.info("[Before Create] Number of specs: " + specs.size()); @@ -250,6 +251,32 @@ public void testRejectMissingListener() { Assert.assertEquals(flowCatalog.getSize(), 0); } + @Test (dependsOnMethods = "testRejectMissingListener") + public void testRejectQuotaExceededFlow() { + Collection specs = flowCatalog.getSpecs(); + logger.info("[Before Create] Number of specs: " + specs.size()); + int i=0; + for (Spec spec : specs) { + FlowSpec flowSpec = (FlowSpec) spec; + logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec)); + } + Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition"); + + // Create and add Spec + FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow"); + + // Assume that spec is rejected + when(this.mockListener.onAddSpec(any())).thenThrow(new RuntimeException(new QuotaExceededException("error"))); + try { + Map response = this.flowCatalog.put(badSpec); + } catch (Throwable e) { + Assert.assertTrue(e instanceof QuotaExceededException); + } + // Spec should be rejected from being stored + specs = flowCatalog.getSpecs(); + Assert.assertEquals(specs.size(), 0); + } + public static URI computeFlowSpecURI() { // Make sure this is relative URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java index 5aa0b438efe..101d9f46d4a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java @@ -108,7 +108,7 @@ public void addChange(DiffEntry change) { .withVersion(SPEC_VERSION) .withDescription(SPEC_DESCRIPTION) .build()); - } catch (IOException e) { + } catch (Throwable e) { log.warn("Could not load config file: " + configFilePath); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java index a472a37cb29..c4fd520e9bb 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import com.typesafe.config.ConfigFactory; +import java.io.IOException; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -32,6 +33,7 @@ import com.google.common.collect.Lists; import com.typesafe.config.Config; +import java.util.stream.Collectors; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.TimingEvent; @@ -41,6 +43,7 @@ import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.FlowId; import org.apache.gobblin.service.RequesterService; +import org.apache.gobblin.service.ServiceRequester; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode; import org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption; @@ -340,4 +343,18 @@ static void emitFlowEvent(Optional eventSubmitter, Dag getDistinctUniqueRequesters(String serializedRequesters) { + List uniqueRequesters; + try { + uniqueRequesters = RequesterService.deserialize(serializedRequesters) + .stream() + .map(ServiceRequester::getName) + .distinct() + .collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException("Could not process requesters due to ", e); + } + return uniqueRequesters; + } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java index 3e354001953..36b4d271804 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -108,16 +107,7 @@ public void checkQuota(Dag.DagNode dagNode, boolean onInit) th boolean requesterCheck = true; if (serializedRequesters != null) { - List uniqueRequesters; - try { - uniqueRequesters = RequesterService.deserialize(serializedRequesters) - .stream() - .map(ServiceRequester::getName) - .distinct() - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException("Could not process requesters due to ", e); - } + List uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters); for (String requester : uniqueRequesters) { int userQuotaIncrement = incrementJobCountAndCheckQuota( DagManagerUtils.getUserQuotaKey(requester, dagNode), requesterToJobCount, dagNode, getQuotaForUser(requester)); @@ -161,6 +151,8 @@ public void checkQuota(Dag.DagNode dagNode, boolean onInit) th */ private int incrementJobCountAndCheckQuota(String key, Map quotaMap, Dag.DagNode dagNode, int quotaForKey) { // Only increment job count for first attempt, since job is considered running between retries + // Include the scenario where currentAttempts is 0 (when checked by the scheduler) + // but it will not double increment due to first ensuring that the dag is not already incremented if (dagNode.getValue().getCurrentAttempts() > 1) { return 0; } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 42dabf89b9d..d00365f13b7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -328,14 +328,13 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { // Check quota limits against run immediately flows or adhoc flows before saving the schedule if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) { - try { - if (quotaManager.isPresent()) { - // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager + if (quotaManager.isPresent()) { + // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager + try { quotaManager.get().checkQuota(dag.getNodes().get(0), false); + } catch (QuotaExceededException e) { + throw new RuntimeException(e); } - } catch (QuotaExceededException e) { - _log.info(e.toString()); - return new AddSpecResponse<>(e.toString()); } } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 22208498d27..c25c91b593c 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -226,7 +226,7 @@ public void createTopologySpec() { } @Test (dependsOnMethods = "createTopologySpec") - public void createFlowSpec() throws Exception { + public void createFlowSpec() throws Throwable { // Since only 1 Topology with 1 SpecProducer has been added in previous test // .. it should be available and responsible for our new FlowSpec IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler(); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 5c67723677e..45242752ef0 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -83,7 +83,7 @@ public void setUp() { * Test whenever JobScheduler is calling setActive, the FlowSpec is loading into scheduledFlowSpecs (eventually) */ @Test - public void testJobSchedulerInit() throws Exception { + public void testJobSchedulerInit() throws Throwable { // Mock a FlowCatalog. File specDir = Files.createTempDir(); @@ -164,7 +164,7 @@ public void testDisableFlowRunImmediatelyOnStart() * Test that flowSpecs that throw compilation errors do not block the scheduling of other flowSpecs */ @Test - public void testJobSchedulerInitWithFailedSpec() throws Exception { + public void testJobSchedulerInitWithFailedSpec() throws Throwable { // Mock a FlowCatalog. File specDir = Files.createTempDir(); @@ -228,7 +228,7 @@ public boolean apply(Void input) { * Test that flowSpecs that throw compilation errors do not block the scheduling of other flowSpecs */ @Test - public void testJobSchedulerUnschedule() throws Exception { + public void testJobSchedulerUnschedule() throws Throwable { // Mock a FlowCatalog. File specDir = Files.createTempDir(); @@ -338,10 +338,9 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception { scheduler.setActive(true); scheduler.onAddSpec(flowSpec0); //Ignore the response for this request - AddSpecResponse response1 = scheduler.onAddSpec(flowSpec1); - Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); + Assert.assertThrows(QuotaExceededException.class, () -> scheduler.onAddSpec(flowSpec1)); - Assert.assertTrue(response1.getValue().contains(QuotaExceededException.class.getSimpleName())); + Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); // Second flow should not be added to scheduled flows since it was rejected Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); // set scheduler to be inactive and unschedule flows diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java index 0132a824949..8c1f751a321 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java @@ -16,7 +16,6 @@ */ package org.apache.gobblin.util.callbacks; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import com.google.common.base.Preconditions; @@ -64,7 +63,7 @@ public static CallbackResult createFromFuture(Future execFuture) R res = execFuture.get(); return createSuccessful(res); } - catch (ExecutionException e) { + catch (Exception e) { if (execFuture.isCancelled()) { return createCancelled(); } From 7b8cd35b23d39124b4125f260754a756484c5305 Mon Sep 17 00:00:00 2001 From: William Lo Date: Mon, 27 Jun 2022 12:09:25 -0700 Subject: [PATCH 08/14] Fix test --- .../modules/scheduler/GobblinServiceJobSchedulerTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java index 45242752ef0..0750392eee3 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Properties; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.exception.QuotaExceededException; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; @@ -338,7 +337,7 @@ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception { scheduler.setActive(true); scheduler.onAddSpec(flowSpec0); //Ignore the response for this request - Assert.assertThrows(QuotaExceededException.class, () -> scheduler.onAddSpec(flowSpec1)); + Assert.assertThrows(RuntimeException.class, () -> scheduler.onAddSpec(flowSpec1)); Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1); // Second flow should not be added to scheduled flows since it was rejected From 4b99a47c76b033d40f70b6360440e75e322f3ee4 Mon Sep 17 00:00:00 2001 From: William Lo Date: Wed, 29 Jun 2022 07:44:50 -0700 Subject: [PATCH 09/14] Cleanup around handling responses from callbacks in GaaS API --- .../gobblin/service/FlowConfigV2ResourceLocalHandler.java | 2 +- .../org/apache/gobblin/runtime/api/SpecCatalogListener.java | 1 + .../gobblin/service/modules/orchestration/DagManagerUtils.java | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java index 50ea62567d3..8ec10f860ee 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -101,7 +101,7 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL addSpecResponse != null && addSpecResponse.getValue() != null ? StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : ""); flowConfig.setProperties(props); httpStatus = HttpStatus.S_200_OK; - } else if (Boolean.parseBoolean(response.getValue())) { + } else if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) { httpStatus = HttpStatus.S_201_CREATED; } else { throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, getErrorMessage(flowSpec)); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java index c09784e1951..6bf61a1e6ee 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java @@ -48,6 +48,7 @@ public AddSpecCallback(Spec addedSpec) { _addedSpec = addedSpec; } + @Override public AddSpecResponse apply(SpecCatalogListener listener) { return listener.onAddSpec(_addedSpec); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java index c4fd520e9bb..9d59a90857e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java @@ -352,9 +352,9 @@ static List getDistinctUniqueRequesters(String serializedRequesters) { .map(ServiceRequester::getName) .distinct() .collect(Collectors.toList()); + return uniqueRequesters; } catch (IOException e) { throw new RuntimeException("Could not process requesters due to ", e); } - return uniqueRequesters; } } From 59838f067655f978295bf8a9eb9837f4aceafc91 Mon Sep 17 00:00:00 2001 From: William Lo Date: Wed, 29 Jun 2022 10:56:56 -0700 Subject: [PATCH 10/14] Fix checkstyle --- .../apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java index 8ec10f860ee..c69778ee392 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -89,7 +89,6 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); } } - AddSpecResponse response = responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")); HttpStatus httpStatus; if (flowConfig.hasExplain() && flowConfig.isExplain()) { From 1aaf86a34baf4a5a8f0d69e68a321ea783da5e43 Mon Sep 17 00:00:00 2001 From: William Lo Date: Mon, 11 Jul 2022 17:44:35 -0700 Subject: [PATCH 11/14] catch quotaexceededexception instead of checking type explicitly --- .../service/FlowConfigResourceLocalHandler.java | 12 ++++++------ .../service/FlowConfigV2ResourceLocalHandler.java | 7 +++---- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java index 58195a94b39..b7a8ca04e09 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java @@ -136,10 +136,10 @@ public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerLis } else { try { this.flowCatalog.put(flowSpec, triggerListener); + } catch (QuotaExceededException e) { + throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); } catch (Throwable e) { - if (e instanceof QuotaExceededException) { - throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } + // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings } return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED); } @@ -177,10 +177,10 @@ public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boo } try { this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener); + } catch (QuotaExceededException e) { + throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); } catch (Throwable e) { - if (e instanceof QuotaExceededException) { - throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } + // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings } return new UpdateResponse(HttpStatus.S_200_OK); } diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java index c69778ee392..c8d5cc4e0bc 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -83,11 +83,10 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL Map responseMap = new HashMap<>(); try { responseMap = this.flowCatalog.put(flowSpec, triggerListener); - } catch (Throwable e) { - // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings - if (e instanceof QuotaExceededException) { + } catch (QuotaExceededException e) { throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } + } catch (Throwable e) { + // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings } HttpStatus httpStatus; From d7e92ea8ac13168858632ca46250d6e87338cffb Mon Sep 17 00:00:00 2001 From: William Lo Date: Wed, 20 Jul 2022 16:59:15 -0700 Subject: [PATCH 12/14] Log other errors and throw 500 --- .../gobblin/service/FlowConfigResourceLocalHandler.java | 4 ++++ .../gobblin/service/FlowConfigV2ResourceLocalHandler.java | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java index b7a8ca04e09..7e91c9d1afd 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java @@ -140,6 +140,8 @@ public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerLis throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); } catch (Throwable e) { // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings + log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e); + throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage()); } return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED); } @@ -181,6 +183,8 @@ public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boo throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); } catch (Throwable e) { // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings + log.warn(String.format("Failed to add flow configuration %s.%sto catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e); + throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage()); } return new UpdateResponse(HttpStatus.S_200_OK); } diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java index c8d5cc4e0bc..2557c81f8ce 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -85,8 +85,10 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL responseMap = this.flowCatalog.put(flowSpec, triggerListener); } catch (QuotaExceededException e) { throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage()); - } catch (Throwable e) { + } catch (Throwable e) { // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings + log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e); + throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage()); } HttpStatus httpStatus; From 9b421e544e1117cea77a8de6a14bb29cd2115f60 Mon Sep 17 00:00:00 2001 From: William Lo Date: Thu, 21 Jul 2022 16:52:50 -0700 Subject: [PATCH 13/14] Fix checkstyle dead store --- .../gobblin/service/FlowConfigV2ResourceLocalHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java index 2557c81f8ce..248c851ac0d 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -80,7 +80,7 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL "FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken")); } - Map responseMap = new HashMap<>(); + Map responseMap; try { responseMap = this.flowCatalog.put(flowSpec, triggerListener); } catch (QuotaExceededException e) { From 4b801236e9abde2166fdf8440b14dd7e8cb7f120 Mon Sep 17 00:00:00 2001 From: William Lo Date: Fri, 22 Jul 2022 10:31:36 -0700 Subject: [PATCH 14/14] Fix checkstyle again --- .../apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java index 248c851ac0d..d2bb55743c5 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java @@ -18,7 +18,6 @@ import java.util.Arrays; import java.util.Comparator; -import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringEscapeUtils;