Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1656] Return a http status 503 on GaaS when quota is exceeded for user or flowgroup #3516

Merged
merged 14 commits into from
Jul 25, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
Expand Down Expand Up @@ -133,7 +134,15 @@ public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerLis
if (!flowConfig.hasSchedule() && this.flowCatalog.exists(flowSpec.getUri())) {
return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_409_CONFLICT);
} else {
this.flowCatalog.put(flowSpec, triggerListener);
try {
this.flowCatalog.put(flowSpec, triggerListener);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
} catch (Throwable e) {
// TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this is just a request handler, so not hugely consequential to catch all exceptions, but please at least just log whatever you find here, so it's not a silent/squelched failure we have a hard time determining to be happening one future day.

log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e);
throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
}
return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED);
}
}
Expand Down Expand Up @@ -168,8 +177,15 @@ public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boo
originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
flowConfig = originalFlowConfig;
}

this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener);
try {
this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
} catch (Throwable e) {
// TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likely not major, but I'm unclear specifically what this TODO is suggesting ought to change in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make a ticket for it, but right now the flow compilation process is really descriptive what is missing, and users would see a generic HTTP status 400 error with a "Path does not exist" due to failing compilation. I think if we utilized exceptions rather than that compilation check it would be easier to differentiate and reason about.

log.warn(String.format("Failed to add flow configuration %s.%sto catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
}
return new UpdateResponse(HttpStatus.S_200_OK);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;


@Slf4j
public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsV2ResourceHandler {

Expand All @@ -60,9 +63,9 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL
}
log.info(createLog);
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
FlowStatusId flowStatusId = new FlowStatusId()
.setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
.setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
FlowStatusId flowStatusId =
new FlowStatusId().setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
.setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
if (flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
flowStatusId.setFlowExecutionId(Long.valueOf(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
} else {
Expand All @@ -76,7 +79,16 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL
"FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken"));
}

Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, triggerListener);
Map<String, AddSpecResponse> responseMap;
try {
responseMap = this.flowCatalog.put(flowSpec, triggerListener);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
} catch (Throwable e) {
// TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e);
throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
}
HttpStatus httpStatus;

if (flowConfig.hasExplain() && flowConfig.isExplain()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public interface MutableSpecCatalog extends SpecCatalog {
* on adding a {@link Spec} to the {@link SpecCatalog}. The key for each entry is the name of the {@link SpecCatalogListener}
* and the value is the result of the the action taken by the listener returned as an instance of {@link AddSpecResponse}.
* */
Map<String, AddSpecResponse> put(Spec spec);
Map<String, AddSpecResponse> put(Spec spec) throws Throwable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only newly added exception it seems could be thrown here is QuotaExceededException... if correct, why not make that the type mentioned in the throws clause? why go all the way down to Throwable?

Copy link
Contributor Author

@Will-Lo Will-Lo Jun 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit convoluted here the QuotaExceededException gets thrown in a callback and is caught by CallbackResult, which stores any exception thrown as a Throwable. I wanted to leverage that by accessing it through getCause(). Classes that rely on the CallBackResult responses would have to throw the generic Throwable class unless I casted the exception back into the QuotaExceededException, but I want to avoid doing that for future support of throwing compilation errors in this fashion as well.

In particular in this segment:

      if (response.getValue().getFailures().size() > 0) {
        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getFailures().entrySet()) {
          throw entry.getValue().getError().getCause();
        }
        return responseMap;
      }


/**
* Removes an existing {@link Spec} with the given URI.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public AddSpecCallback(Spec addedSpec) {
_addedSpec = addedSpec;
}

@Override public AddSpecResponse apply(SpecCatalogListener listener) {
@Override
public AddSpecResponse apply(SpecCatalogListener listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious what changed to lead you to remove @Override... are you just omitting what is optional or are you somehow actually no longer making an override? (I recommend @Override to catch errors.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry that was a mistake, I was changing a lot of function definitions trying to figure out the errors and this slipped through

return listener.onAddSpec(_addedSpec);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public Spec getSpecWrapper(URI uri) {
* @param triggerListener True if listeners should be notified.
* @return a map of listeners and their {@link AddSpecResponse}s
*/
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) throws Throwable {
Map<String, AddSpecResponse> responseMap = new HashMap<>();
FlowSpec flowSpec = (FlowSpec) spec;
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Expand All @@ -355,13 +355,21 @@ public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {

if (triggerListener) {
AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
// If flow fails compilation, the result will have a non-empty string with the error
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getSuccesses().entrySet()) {
responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
}
// If flow fails compilation, the result will have a non-empty string with the error
if (response.getValue().getFailures().size() > 0) {
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getFailures().entrySet()) {
throw entry.getValue().getError().getCause();
}
return responseMap;
}
}
AddSpecResponse<String> schedulerResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));

if (isCompileSuccessful(responseMap)) {
// Check that the flow configuration is valid and matches to a corresponding edge
if (isCompileSuccessful(schedulerResponse.getValue())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it exceed quota, why compile is marked as succeed?It's a little confuse to read the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there's a better terminology here:
For compilation how I interpret it is that the flow configuration can compile (src and destination and any required parameters exist).
But it can pass the compilation step but fail on a resource validation check, which doesn't mean that the flow was improperly compiled or that the inputs were incorrect. It's more that the users have too many flows already sent in the system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add comments to explain this? Just to make the code more easy to read

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a large part of this is the tunneling I describe above, but if you actually do need additional info to support isExplain/hasExplain, you could insert it into (or wrapping around) the QuotaExceededException

synchronized (syncObject) {
try {
if (!flowSpec.isExplain()) {
Expand All @@ -384,19 +392,12 @@ public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
return responseMap;
}

public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
// If we cannot get the response from the scheduler, assume that the flow failed compilation
AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
return isCompileSuccessful(addSpecResponse.getValue());
}

public static boolean isCompileSuccessful(String dag) {
return dag != null && !dag.contains(ConfigException.class.getSimpleName());
}

@Override
public Map<String, AddSpecResponse> put(Spec spec) {
public Map<String, AddSpecResponse> put(Spec spec) throws Throwable {
return put(spec, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
Expand Down Expand Up @@ -105,19 +107,24 @@ public static FlowSpec initFlowSpec(String specStore, URI uri){
return initFlowSpec(specStore, uri, "flowName");
}

/**
* Create FLowSpec with specified URI and SpecStore location.
*/
/**
* Create FLowSpec with specified URI and SpecStore location.
*/
public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName){
return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty());
}

public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs) {
Properties properties = new Properties();
properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
properties.put("job.name", flowName);
properties.put("job.group", flowName);
properties.put("job.group", flowGroup);
properties.put("specStore.fs.dir", specStore);
properties.put("specExecInstance.capabilities", "source:destination");
properties.put("job.schedule", "0 0 0 ? * * 2050");
Config config = ConfigUtils.propertiesToConfig(properties);

Config defaults = ConfigUtils.propertiesToConfig(properties);
Config config = additionalConfigs.withFallback(defaults);
SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config);

FlowSpec.Builder flowSpecBuilder = null;
Expand All @@ -141,7 +148,7 @@ public void cleanUp() throws Exception {
}

@Test
public void createFlowSpec() {
public void createFlowSpec() throws Throwable {
// List Current Specs
Collection<Spec> specs = flowCatalog.getSpecs();
logger.info("[Before Create] Number of specs: " + specs.size());
Expand Down Expand Up @@ -199,7 +206,7 @@ public void deleteFlowSpec() throws SpecNotFoundException {
}

@Test (dependsOnMethods = "deleteFlowSpec")
public void testRejectBadFlow() {
public void testRejectBadFlow() throws Throwable {
Collection<Spec> specs = flowCatalog.getSpecs();
logger.info("[Before Create] Number of specs: " + specs.size());
int i=0;
Expand All @@ -223,7 +230,7 @@ public void testRejectBadFlow() {
}

@Test (dependsOnMethods = "testRejectBadFlow")
public void testRejectMissingListener() {
public void testRejectMissingListener() throws Throwable {
flowCatalog.removeListener(this.mockListener);
Collection<Spec> specs = flowCatalog.getSpecs();
logger.info("[Before Create] Number of specs: " + specs.size());
Expand All @@ -244,6 +251,32 @@ public void testRejectMissingListener() {
Assert.assertEquals(flowCatalog.getSize(), 0);
}

@Test (dependsOnMethods = "testRejectMissingListener")
public void testRejectQuotaExceededFlow() {
Collection<Spec> specs = flowCatalog.getSpecs();
logger.info("[Before Create] Number of specs: " + specs.size());
int i=0;
for (Spec spec : specs) {
FlowSpec flowSpec = (FlowSpec) spec;
logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
}
Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition");

// Create and add Spec
FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow");

// Assume that spec is rejected
when(this.mockListener.onAddSpec(any())).thenThrow(new RuntimeException(new QuotaExceededException("error")));
try {
Map<String, AddSpecResponse> response = this.flowCatalog.put(badSpec);
} catch (Throwable e) {
Assert.assertTrue(e instanceof QuotaExceededException);
}
// Spec should be rejected from being stored
specs = flowCatalog.getSpecs();
Assert.assertEquals(specs.size(), 0);
}

public static URI computeFlowSpecURI() {
// Make sure this is relative
URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void addChange(DiffEntry change) {
.withVersion(SPEC_VERSION)
.withDescription(SPEC_DESCRIPTION)
.build());
} catch (IOException e) {
} catch (Throwable e) {
log.warn("Could not load config file: " + configFilePath);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Objects;

import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -194,6 +195,8 @@ public void configure(Binder binder) {
binder.bind(Orchestrator.class);
binder.bind(SchedulerService.class);
binder.bind(GobblinServiceJobScheduler.class);
OptionalBinder.newOptionalBinder(binder, UserQuotaManager.class);
binder.bind(UserQuotaManager.class);
}

if (serviceConfig.isGitConfigMonitorEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -197,6 +198,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
@Inject
protected ServiceDatabaseManager databaseManager;

@Inject(optional=true)
@Getter
protected Optional<UserQuotaManager> quotaManager;

protected Optional<HelixLeaderState> helixLeaderGauges;

@Inject(optional = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -32,6 +33,7 @@
import com.google.common.collect.Lists;
import com.typesafe.config.Config;

import java.util.stream.Collectors;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
Expand All @@ -41,6 +43,7 @@
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
Expand Down Expand Up @@ -340,4 +343,18 @@ static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter, Dag<JobExecut
eventSubmitter.get().getTimingEvent(flowEvent).stop(flowMetadata);
}
}

static List<String> getDistinctUniqueRequesters(String serializedRequesters) {
List<String> uniqueRequesters;
try {
uniqueRequesters = RequesterService.deserialize(serializedRequesters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no biggie, but you could place the return statement within the try

.stream()
.map(ServiceRequester::getName)
.distinct()
.collect(Collectors.toList());
return uniqueRequesters;
} catch (IOException e) {
throw new RuntimeException("Could not process requesters due to ", e);
}
}
}
Loading