Skip to content

Commit

Permalink
Add circuit breaker
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed May 18, 2023
1 parent 0d65260 commit 3998e7d
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.geospatial.ip2geo.action;

import java.util.Collections;
import java.util.List;

import org.opensearch.OpenSearchException;
Expand All @@ -14,6 +15,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -66,6 +68,10 @@ public void onResponse(final List<Datasource> datasources) {

@Override
public void onFailure(final Exception e) {
if (e instanceof IndexNotFoundException) {
listener.onResponse(new GetDatasourceResponse(Collections.emptyList()));
return;
}
listener.onFailure(e);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
Expand All @@ -35,6 +37,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.SpecialPermission;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand All @@ -51,8 +54,10 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.shared.Constants;
import org.opensearch.geospatial.shared.StashedThreadContext;
Expand Down Expand Up @@ -187,7 +192,7 @@ protected CSVParser internalGetDatabaseReader(final DatasourceManifest manifest,
}

/**
* Create a document in json string format to ingest in datasource database index
* Create a document to ingest in datasource database index
*
* It assumes the first field as ip_range. The rest is added under data field.
*
Expand All @@ -204,31 +209,23 @@ protected CSVParser internalGetDatabaseReader(final DatasourceManifest manifest,
* @param fields a list of field name
* @param values a list of values
* @return Document in json string format
* @throws IOException the exception
*/
public String createDocument(final String[] fields, final String[] values) {
public XContentBuilder createDocument(final String[] fields, final String[] values) throws IOException {
if (fields.length != values.length) {
throw new OpenSearchException("header[{}] and record[{}] length does not match", fields, values);
}
StringBuilder sb = new StringBuilder();
sb.append("{\"");
sb.append(IP_RANGE_FIELD_NAME);
sb.append("\":\"");
sb.append(values[0]);
sb.append("\",\"");
sb.append(DATA_FIELD_NAME);
sb.append("\":{");
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder.field(IP_RANGE_FIELD_NAME, values[0]);
builder.startObject(DATA_FIELD_NAME);
for (int i = 1; i < fields.length; i++) {
if (i != 1) {
sb.append(",");
}
sb.append("\"");
sb.append(fields[i]);
sb.append("\":\"");
sb.append(values[i]);
sb.append("\"");
builder.field(fields[i], values[i]);
}
sb.append("}}");
return sb.toString();
builder.endObject();
builder.endObject();
builder.close();
return builder;
}

/**
Expand Down Expand Up @@ -368,14 +365,20 @@ public void putGeoIpData(
@NonNull final Iterator<CSVRecord> iterator,
final int bulkSize,
@NonNull final Runnable renewLock
) {
) throws IOException {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
final BulkRequest bulkRequest = new BulkRequest();
Queue<DocWriteRequest> requests = new LinkedList<>();
for (int i = 0; i < bulkSize; i++) {
requests.add(Requests.indexRequest(indexName));
}
while (iterator.hasNext()) {
CSVRecord record = iterator.next();
String document = createDocument(fields, record.values());
IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON);
bulkRequest.add(request);
XContentBuilder document = createDocument(fields, record.values());
IndexRequest indexRequest = (IndexRequest) requests.poll();
indexRequest.source(document);
indexRequest.id(record.get(0));
bulkRequest.add(indexRequest);
if (iterator.hasNext() == false || bulkRequest.requests().size() == bulkSize) {
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
if (response.hasFailures()) {
Expand All @@ -385,6 +388,7 @@ public void putGeoIpData(
response.buildFailureMessage()
);
}
requests.addAll(bulkRequest.requests());
bulkRequest.requests().clear();
}
renewLock.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ public class Ip2GeoSettings {
Setting.Property.Dynamic
);

/**
* Minimum heap memory size required to start datasource update
*/
public static final Setting<Long> MIN_HEAP_SIZE_PER_DATASOURCE_UPDATE = Setting.longSetting(
"plugins.geospatial.ip2geo.processor.min_heap_size_per_datasource_update_in_byte",
300 * 1024 * 1024,
0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Return all settings of Ip2Geo feature
* @return a list of all settings for Ip2Geo feature
Expand All @@ -102,7 +113,8 @@ public static final List<Setting<?>> settings() {
TIMEOUT,
INDEXING_BULK_SIZE,
MAX_BUNDLE_SIZE,
MAX_CONCURRENT_SEARCHES
MAX_CONCURRENT_SEARCHES,
MIN_HEAP_SIZE_PER_DATASOURCE_UPDATE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.plugin.GeospatialPlugin;

@Log4j2
public class DatasourceUpdateService {
Expand Down Expand Up @@ -71,6 +72,11 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
String indexName = setupIndex(manifest, datasource);
String[] header;
List<String> fieldsToStore;
long minMemoryRequired = clusterSettings.get(Ip2GeoSettings.MIN_HEAP_SIZE_PER_DATASOURCE_UPDATE);
GeospatialPlugin.circuitBreaker.addEstimateBytesAndMaybeBreak(
minMemoryRequired,
"not enough memory is available to process geoip data"
);
try (CSVParser reader = geoIpDataFacade.getDatabaseReader(manifest)) {
CSVRecord headerLine = reader.iterator().next();
header = validateHeader(headerLine).values();
Expand All @@ -89,6 +95,8 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
clusterSettings.get(Ip2GeoSettings.INDEXING_BULK_SIZE),
renewLock
);
} finally {
GeospatialPlugin.circuitBreaker.addWithoutBreaking(-minMemoryRequired);
}

Instant endTime = Instant.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
Expand Down Expand Up @@ -66,8 +68,10 @@
import org.opensearch.geospatial.stats.upload.UploadStatsTransportAction;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.indices.breaker.BreakerSettings;
import org.opensearch.ingest.Processor;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.CircuitBreakerPlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.Plugin;
Expand All @@ -85,7 +89,35 @@
* Entry point for Geospatial features. It provides additional Processors, Actions
* to interact with Cluster.
*/
public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin {
public class GeospatialPlugin extends Plugin
implements
IngestPlugin,
ActionPlugin,
MapperPlugin,
SearchPlugin,
SystemIndexPlugin,
CircuitBreakerPlugin {
private static final String CIRCUIT_BREAKER_NAME = "geospatial";
private static final Setting<ByteSizeValue> GEOSPATIAL_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting(
"plugins.geospatial.breaker.limit",
"100%",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private static final Setting<Double> GEOSPATIAL_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting(
"plugins.geospatial.breaker.overhead",
1.0d,
0.0d,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private static final Setting<CircuitBreaker.Type> GEOSPATIAL_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>(
"plugins.geospatial.breaker.type",
"memory",
CircuitBreaker.Type::parseValue,
Setting.Property.NodeScope
);
public static CircuitBreaker circuitBreaker;

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
Expand Down Expand Up @@ -116,7 +148,12 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {

@Override
public List<Setting<?>> getSettings() {
return Ip2GeoSettings.settings();
List<Setting<?>> settings = new ArrayList<>();
settings.add(GEOSPATIAL_CIRCUIT_BREAKER_LIMIT_SETTING);
settings.add(GEOSPATIAL_CIRCUIT_BREAKER_OVERHEAD_SETTING);
settings.add(GEOSPATIAL_CIRCUIT_BREAKER_TYPE_SETTING);
settings.addAll(Ip2GeoSettings.settings());
return settings;
}

@Override
Expand Down Expand Up @@ -230,4 +267,20 @@ public List<AggregationSpec> getAggregations() {

return List.of(geoHexGridSpec);
}

@Override
public BreakerSettings getCircuitBreaker(final Settings settings) {
return new BreakerSettings(
CIRCUIT_BREAKER_NAME,
GEOSPATIAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(),
GEOSPATIAL_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
GEOSPATIAL_CIRCUIT_BREAKER_TYPE_SETTING.get(settings),
CircuitBreaker.Durability.TRANSIENT
);
}

@Override
public void setCircuitBreaker(final CircuitBreaker circuitBreaker) {
this.circuitBreaker = circuitBreaker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.geospatial.ip2geo;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -38,6 +39,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Randomness;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
Expand All @@ -52,6 +54,7 @@
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.geospatial.plugin.GeospatialPlugin;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.ingest.IngestService;
import org.opensearch.jobscheduler.spi.LockModel;
Expand Down Expand Up @@ -101,6 +104,7 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase {

@Before
public void prepareIp2GeoTestCase() {
GeospatialPlugin.circuitBreaker = mock(CircuitBreaker.class);
openMocks = MockitoAnnotations.openMocks(this);
settings = Settings.EMPTY;
client = new NoOpNodeClient(this.getTestName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.Randomness;
import org.opensearch.common.Strings;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.geospatial.GeospatialTestHelper;
Expand Down Expand Up @@ -103,12 +104,13 @@ public void testCreateIndexIfNotExistsWithoutExistingIndex() {
verifyingGeoIpDataFacade.createIndexIfNotExists(index);
}

@SneakyThrows
public void testCreateDocument() {
String[] names = { "ip", "country", "city" };
String[] values = { "1.0.0.0/25", "USA", "Seattle" };
assertEquals(
"{\"_cidr\":\"1.0.0.0/25\",\"_data\":{\"country\":\"USA\",\"city\":\"Seattle\"}}",
noOpsGeoIpDataFacade.createDocument(names, values)
Strings.toString(noOpsGeoIpDataFacade.createDocument(names, values))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
Expand All @@ -33,6 +35,7 @@
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.plugin.GeospatialPlugin;

@SuppressForbidden(reason = "unit test")
public class DatasourceUpdateServiceTests extends Ip2GeoTestCase {
Expand Down Expand Up @@ -124,6 +127,7 @@ public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() {
datasourceUpdateService.updateOrCreateGeoIpData(datasource, mock(Runnable.class));

// Verify
verify(GeospatialPlugin.circuitBreaker).addEstimateBytesAndMaybeBreak(anyLong(), anyString());
assertEquals(manifest.getProvider(), datasource.getDatabase().getProvider());
assertEquals(manifest.getSha256Hash(), datasource.getDatabase().getSha256Hash());
assertEquals(Instant.ofEpochMilli(manifest.getUpdatedAt()), datasource.getDatabase().getUpdatedAt());
Expand Down

0 comments on commit 3998e7d

Please sign in to comment.