Skip to content

Commit

Permalink
feat: Implemented circuit breaker (#71)
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Peter <[email protected]>
  • Loading branch information
JohannesDaniel authored Nov 26, 2024
1 parent b84bc5e commit 45e743f
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 4 deletions.
9 changes: 8 additions & 1 deletion src/main/java/com/o19s/es/ltr/LtrQueryParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.o19s.es.ltr;

import ciir.umass.edu.learning.RankerFactory;
import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import com.o19s.es.explore.ExplorerQueryBuilder;
import com.o19s.es.ltr.action.AddFeaturesToSetAction;
import com.o19s.es.ltr.action.CachesStatsAction;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.index.analysis.PreConfiguredTokenFilter;
import org.opensearch.index.analysis.PreConfiguredTokenizer;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.AnalysisPlugin;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -121,6 +123,8 @@
import static java.util.Collections.unmodifiableMap;

public class LtrQueryParserPlugin extends Plugin implements SearchPlugin, ScriptPlugin, ActionPlugin, AnalysisPlugin {
public static final String LTR_BASE_URI = "/_plugins/_ltr";
public static final String LTR_LEGACY_BASE_URI = "/_opendistro/_ltr";
private final LtrRankerParserFactory parserFactory;
private final Caches caches;

Expand Down Expand Up @@ -256,7 +260,10 @@ public Collection<Object> createComponents(Client client,
}
}
});
return asList(caches, parserFactory, getStats(client, clusterService, indexNameExpressionResolver));
final JvmService jvmService = new JvmService(environment.settings());
final LTRCircuitBreakerService ltrCircuitBreakerService = new LTRCircuitBreakerService(jvmService).init();

return asList(caches, parserFactory, ltrCircuitBreakerService, getStats(client, clusterService, indexNameExpressionResolver));
}

private LTRStats getStats(Client client, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.o19s.es.ltr.action;

import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.exception.LimitExceededException;
import com.o19s.es.ltr.action.AddFeaturesToSetAction.AddFeaturesToSetRequest;
import com.o19s.es.ltr.action.AddFeaturesToSetAction.AddFeaturesToSetResponse;
import com.o19s.es.ltr.action.FeatureStoreAction.FeatureStoreRequest;
Expand Down Expand Up @@ -59,25 +61,32 @@ public class TransportAddFeatureToSetAction extends HandledTransportAction<AddFe
private final TransportSearchAction searchAction;
private final TransportGetAction getAction;
private final TransportFeatureStoreAction featureStoreAction;
private final LTRCircuitBreakerService ltrCircuitBreakerService;

@Inject
public TransportAddFeatureToSetAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, TransportSearchAction searchAction,
TransportGetAction getAction, TransportFeatureStoreAction featureStoreAction) {
TransportGetAction getAction, TransportFeatureStoreAction featureStoreAction,
LTRCircuitBreakerService ltrCircuitBreakerService) {
super(AddFeaturesToSetAction.NAME, transportService, actionFilters, AddFeaturesToSetRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.getAction = getAction;
this.featureStoreAction = featureStoreAction;
this.ltrCircuitBreakerService = ltrCircuitBreakerService;
}

@Override
protected void doExecute(Task task, AddFeaturesToSetRequest request, ActionListener<AddFeaturesToSetResponse> listener) {
if (!clusterService.state().routingTable().hasIndex(request.getStore())) {
throw new IllegalArgumentException("Store [" + request.getStore() + "] does not exist, please create it first.");
}
if (this.ltrCircuitBreakerService.isOpen()) {
throw new LimitExceededException("Store [" + request.getStore() + "] adding feature set is not allowed " +
"as memory circuit is broken.");
}
new AsyncAction(task, request, listener, clusterService, searchAction, getAction, featureStoreAction).start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.o19s.es.ltr.action;

import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.exception.LimitExceededException;
import com.o19s.es.ltr.action.CreateModelFromSetAction.CreateModelFromSetRequest;
import com.o19s.es.ltr.action.CreateModelFromSetAction.CreateModelFromSetResponse;
import com.o19s.es.ltr.action.FeatureStoreAction.FeatureStoreRequest;
Expand Down Expand Up @@ -43,24 +45,31 @@ public class TransportCreateModelFromSetAction extends HandledTransportAction<Cr
private final ClusterService clusterService;
private final TransportGetAction getAction;
private final TransportFeatureStoreAction featureStoreAction;
private final LTRCircuitBreakerService ltrCircuitBreakerService;

@Inject
public TransportCreateModelFromSetAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, TransportGetAction getAction,
TransportFeatureStoreAction featureStoreAction) {
TransportFeatureStoreAction featureStoreAction,
LTRCircuitBreakerService ltrCircuitBreakerService) {
super(CreateModelFromSetAction.NAME, transportService, actionFilters, CreateModelFromSetRequest::new);
this.clusterService = clusterService;
this.getAction = getAction;
this.featureStoreAction = featureStoreAction;
this.ltrCircuitBreakerService = ltrCircuitBreakerService;
}

@Override
protected void doExecute(Task task, CreateModelFromSetRequest request, ActionListener<CreateModelFromSetResponse> listener) {
if (!clusterService.state().routingTable().hasIndex(request.getStore())) {
throw new IllegalArgumentException("Store [" + request.getStore() + "] does not exist, please create it first.");
}
if (this.ltrCircuitBreakerService.isOpen()) {
throw new LimitExceededException("Store [" + request.getStore() + "] creating model is not allowed " +
"as memory circuit is broken.");
}
GetRequest getRequest = new GetRequest(request.getStore())
.id(StorableElement.generateId(StoredFeatureSet.TYPE, request.getFeatureSetName()));
getRequest.setParentTask(clusterService.localNode().getId(), task.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.o19s.es.ltr.action;

import org.opensearch.ltr.breaker.LTRCircuitBreakerService;
import org.opensearch.ltr.exception.LimitExceededException;
import com.o19s.es.ltr.action.ClearCachesAction.ClearCachesNodesRequest;
import com.o19s.es.ltr.action.FeatureStoreAction.FeatureStoreRequest;
import com.o19s.es.ltr.action.FeatureStoreAction.FeatureStoreResponse;
Expand Down Expand Up @@ -55,18 +57,22 @@ public class TransportFeatureStoreAction extends HandledTransportAction<FeatureS
private final TransportClearCachesAction clearCachesAction;
private final Client client;
private final Logger logger = LogManager.getLogger(getClass());
private final LTRCircuitBreakerService ltrCircuitBreakerService;


@Inject
public TransportFeatureStoreAction(TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService, Client client,
LtrRankerParserFactory factory,
TransportClearCachesAction clearCachesAction) {
TransportClearCachesAction clearCachesAction,
LTRCircuitBreakerService ltrCircuitBreakerService) {
super(FeatureStoreAction.NAME, false, transportService, actionFilters, FeatureStoreRequest::new);
this.factory = factory;
this.clusterService = clusterService;
this.clearCachesAction = clearCachesAction;
this.client = client;
this.ltrCircuitBreakerService = ltrCircuitBreakerService;
}

@Override
Expand All @@ -75,6 +81,10 @@ protected void doExecute(Task task, FeatureStoreRequest request, ActionListener<
// To prevent index auto creation
throw new IllegalArgumentException("Store [" + request.getStore() + "] does not exist, please create it first.");
}
if (this.ltrCircuitBreakerService.isOpen()) {
throw new LimitExceededException("Store [" + request.getStore() + "] creating/updating features " +
"through simple CRUD is not allowed as memory circuit is broken.");
}
// some synchronous pre-checks that require the parser factory
precheck(request);
if (request.getValidation() != null) {
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/org/opensearch/ltr/breaker/BreakerName.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.ltr.breaker;

public enum BreakerName {

MEM("memory"),
CPU("cpu");

private String name;

BreakerName(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
26 changes: 26 additions & 0 deletions src/main/java/org/opensearch/ltr/breaker/CircuitBreaker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.ltr.breaker;

/**
* An interface for circuit breaker.
*
* We use circuit breaker to protect a certain system resource like memory, cpu etc.
*/
public interface CircuitBreaker {

boolean isOpen();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.ltr.breaker;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.monitor.jvm.JvmService;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Class {@code LTRCircuitBreakerService} provide storing, retrieving circuit breakers functions.
*
* This service registers internal system breakers and provide API for users to register their own breakers.
*/
public class LTRCircuitBreakerService {

private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();
private final JvmService jvmService;

private static final Logger logger = LogManager.getLogger(LTRCircuitBreakerService.class);

/**
* Constructor.
*
* @param jvmService jvm info
*/
public LTRCircuitBreakerService(JvmService jvmService) {
this.jvmService = jvmService;
}

public void registerBreaker(String name, CircuitBreaker breaker) {
breakers.putIfAbsent(name, breaker);
}

public void unregisterBreaker(String name) {
if (name == null) {
return;
}

breakers.remove(name);
}

public void clearBreakers() {
breakers.clear();
}

public CircuitBreaker getBreaker(String name) {
return breakers.get(name);
}

/**
* Initialize circuit breaker service.
*
* Register memory breaker by default.
*
* @return LTRCircuitBreakerService
*/
public LTRCircuitBreakerService init() {
// Register memory circuit breaker
registerBreaker(BreakerName.MEM.getName(), new MemoryCircuitBreaker(this.jvmService));
logger.info("Registered memory breaker.");

return this;
}

public Boolean isOpen() {
for (CircuitBreaker breaker : breakers.values()) {
if (breaker.isOpen()) {
return true;
}
}

return false;
}
}
42 changes: 42 additions & 0 deletions src/main/java/org/opensearch/ltr/breaker/MemoryCircuitBreaker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.ltr.breaker;

import org.opensearch.monitor.jvm.JvmService;

/**
* A circuit breaker for memory usage.
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {

private static final short defaultThreshold = 85;
private final JvmService jvmService;

public MemoryCircuitBreaker(JvmService jvmService) {
super(defaultThreshold);
this.jvmService = jvmService;
}

public MemoryCircuitBreaker(short threshold, JvmService jvmService) {
super(threshold);
this.jvmService = jvmService;
}

@Override
public boolean isOpen() {
return jvmService.stats().getMem().getHeapUsedPercent() > this.getThreshold();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.opensearch.ltr.breaker;

/**
* An abstract class for all breakers with threshold.
* @param <T> data type of threshold
*/
public abstract class ThresholdCircuitBreaker<T> implements CircuitBreaker {

private T threshold;

public ThresholdCircuitBreaker(T threshold) {
this.threshold = threshold;
}

public T getThreshold() {
return threshold;
}

@Override
public abstract boolean isOpen();
}
Loading

0 comments on commit 45e743f

Please sign in to comment.