Skip to content

Commit

Permalink
Admission Controller Module Transport Interceptor Initial Commit
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva committed Aug 22, 2023
1 parent 784a473 commit 06a4ff4
Show file tree
Hide file tree
Showing 19 changed files with 789 additions and 0 deletions.
14 changes: 14 additions & 0 deletions modules/throttling/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apply plugin: 'opensearch.java-rest-test'

opensearchplugin {
description 'Plugin intercepting requests and throttle based on resource consumption'
classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin'
}

dependencies {
api project(path: ':modules:reindex')
}

testClusters.all {
module ':modules:reindex'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling;

import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.throttling.admissioncontroller.transport.AdmissionControllerTransportInterceptor;
import org.opensearch.transport.TransportInterceptor;

import java.util.ArrayList;
import java.util.List;

/**
* This plugin is used to register handlers to intercept both rest and transport requests.
*/
public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin {

/**
* Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing
* transport (inter-node) requests. This must not return <code>null</code>
*
* @param namedWriteableRegistry registry of all named writeables registered
* @param threadContext a {@link ThreadContext} of the current nodes or clients that can be used to set additional
* headers in the interceptors
* @return
*/
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
List<TransportInterceptor> interceptors = new ArrayList<>(0);
interceptors.add(new AdmissionControllerTransportInterceptor());
return interceptors;
}
}
39 changes: 39 additions & 0 deletions modules/throttling/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

grant {
// needed to generate runtime classes
permission java.lang.RuntimePermission "createClassLoader";

// needed to find the classloader to load allowlisted classes from
permission java.lang.RuntimePermission "getClassLoader";
};
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings;
import org.opensearch.transport.ProxyConnectionStrategy;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.RemoteConnectionStrategy;
Expand Down Expand Up @@ -233,6 +234,8 @@ public void apply(Settings value, Settings current, Settings previous) {
public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
AdmissionControllerSettings.ADMISSION_CONTROLLER_URI_LIST_SETTING,
AdmissionControllerSettings.ADMISSION_CONTROLLER_MODE,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING,
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerService;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
Expand Down Expand Up @@ -904,6 +905,12 @@ protected Node(
transportService.getTaskManager()
);

final AdmissionControllerService admissionControllerService = AdmissionControllerService.newAdmissionControllerService(
settings,
clusterService,
threadPool
);

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
Expand Down Expand Up @@ -1094,6 +1101,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(AdmissionControllerService.class).toInstance(admissionControllerService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down Expand Up @@ -1263,6 +1271,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
injector.getInstance(AdmissionControllerService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
Expand Down Expand Up @@ -1418,6 +1427,7 @@ private Node stop() {
injector.getInstance(ClusterService.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
injector.getInstance(AdmissionControllerService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
Expand Down Expand Up @@ -1481,6 +1491,8 @@ public synchronized void close() throws IOException {
toClose.add(nodeService.getSearchBackpressureService());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("admission_controller"));
toClose.add(injector.getInstance(AdmissionControllerService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(() -> stopWatch.stop().start("search"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling.admissioncontroller;

import java.util.Locale;

/**
* Defines the AdmissionControllerMode
*/
public enum AdmissionControllerMode {
/**
* AdmissionController is completely disabled.
*/
DISABLED("disabled"),

/**
* AdmissionController only monitors the rejection criteria for the requests.
*/
MONITOR("monitor_only"),

/**
* AdmissionController monitors and rejects tasks that exceed resource usage thresholds.
*/
ENFORCED("enforced"),

/**
* AdmissionController monitors and rejects only X percent of the requests that exceed resource usage thresholds.
*/
HALF_OPEN("half_open");

private final String mode;

/**
* @param mode update mode of the admission controller
*/
AdmissionControllerMode(String mode) {
this.mode = mode;
}

/**
*
* @return mode of the admission controller
*/
private String getMode() {
return this.mode;
}

/**
*
* @param name is the mode of the current
* @return Enum of AdmissionControllerMode based on the mode
*/
public static AdmissionControllerMode fromName(String name) {
switch (name.toLowerCase(Locale.ROOT)) {
case "disabled":
return DISABLED;
case "monitor_only":
return MONITOR;
case "enforced":
return ENFORCED;
case "half_open":
return HALF_OPEN;
}

throw new IllegalArgumentException("Invalid AdmissionControllerMode: " + name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling.admissioncontroller;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.controllers.AdmissionController;
import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch.
*/
public class AdmissionControllerService extends AbstractLifecycleComponent {
private final ThreadPool threadPool;
private final AdmissionControllerSettings admissionControllerSettings;
private final ConcurrentMap<String, AdmissionController> ADMISSION_CONTROLLERS;

private static AdmissionControllerService admissionControllerService = null;

public static final String IO_BASED_ADMISSION_CONTROLLER = "global_io_usage";

/**
*
* @param settings Immutable settings instance
* @param clusterService ClusterService Instance
* @param threadPool ThreadPool Instance
*/
private AdmissionControllerService(Settings settings, ClusterService clusterService, ThreadPool threadPool) {
this.threadPool = threadPool;
this.admissionControllerSettings = new AdmissionControllerSettings(clusterService, settings);
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
}

/**
* Initialise and Register all the admissionControllers
*/
private void initialise() {
// Initialise different type of admission controllers
AdmissionControllerState ioBasedAdmissionControllerState = new AdmissionControllerState(
IO_BASED_ADMISSION_CONTROLLER,
AdmissionControllerMode.MONITOR,
this.admissionControllerSettings.getDefaultUriList()
);
registerAdmissionController(ioBasedAdmissionControllerState);
}

/**
* Handler to trigger registered admissionController
*/
public void applyAdmissionController() {
if (this.isAdmissionControllerEnabled()) {
this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.acquire(); });
}
}

/**
*
*/
@Override
protected void doStart() {
this.initialise();
}

/**
*
* @return singleton admissionControllerService Instance
*/
public static AdmissionControllerService getInstance() {
return admissionControllerService;
}

/**
*
* @param settings Immutable settings instance
* @param clusterService ClusterService Instance
* @param threadPool ThreadPool Instance
* @return singleton admissionControllerService Instance
*/
public static synchronized AdmissionControllerService newAdmissionControllerService(
Settings settings,
ClusterService clusterService,
ThreadPool threadPool
) {
if (admissionControllerService == null) {
admissionControllerService = new AdmissionControllerService(settings, clusterService, threadPool);
}
return admissionControllerService;
}

@Override
protected void doStop() {}

/**
* @throws IOException
*/
@Override
protected void doClose() throws IOException {}

/**
*
* @return true if the admissionController Feature is enabled
*/
public Boolean isAdmissionControllerEnabled() {
return this.admissionControllerSettings.isAdmissionControllerEnabled();
}

/**
*
* @param admissionControllerState admissionControllerState to register into the service.
*/
private void registerAdmissionController(AdmissionControllerState admissionControllerState) {
if (!this.ADMISSION_CONTROLLERS.containsKey(admissionControllerState.getControllerName())) {
AdmissionController admissionController = controllerFactory(admissionControllerState);
if (admissionController != null) {
this.ADMISSION_CONTROLLERS.put(admissionControllerState.getControllerName(), admissionController);
}
}
}

/**
*
* @param admissionControllerState admissionControllerState to create instance
* @return AdmissionController Instance
*/
private static AdmissionController controllerFactory(AdmissionControllerState admissionControllerState) {
switch (admissionControllerState.getControllerName()) {
case IO_BASED_ADMISSION_CONTROLLER:
return new IOBasedAdmissionController(admissionControllerState);
default:
return null;
}
}
}
Loading

0 comments on commit 06a4ff4

Please sign in to comment.