From b505ca3e2d02337f46f4ab9ff56f92b32ff4c206 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Fri, 16 Jun 2023 16:23:41 -0700 Subject: [PATCH] Remove IP2Geo processor validation (#336) Cannot query index to get data to validate IP2Geo processor. Will add validation when we decide to store some of data in cluster state metadata. Signed-off-by: Heemin Kim --- .../ip2geo/processor/Ip2GeoProcessor.java | 72 +++++-------------- .../ip2geo/processor/Ip2GeoProcessorIT.java | 16 ----- .../processor/Ip2GeoProcessorTests.java | 40 ++--------- 3 files changed, 23 insertions(+), 105 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 5c5f3280..f63630a5 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -4,8 +4,6 @@ */ package org.opensearch.geospatial.ip2geo.processor; -import static org.opensearch.cluster.service.ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME; -import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; import static org.opensearch.ingest.ConfigurationUtils.readBooleanProperty; import static org.opensearch.ingest.ConfigurationUtils.readOptionalList; import static org.opensearch.ingest.ConfigurationUtils.readStringProperty; @@ -29,6 +27,7 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.ingest.AbstractProcessor; import org.opensearch.ingest.IngestDocument; import org.opensearch.ingest.IngestService; @@ -154,7 +153,12 @@ protected void executeInternal( @Override public void onResponse(final Datasource datasource) { if (datasource == null) { - handler.accept(null, new IllegalStateException("datasource does not exist")); + handler.accept(null, new IllegalStateException("datasource is not available")); + return; + } + + if (DatasourceState.AVAILABLE.equals(datasource.getState()) == false) { + handler.accept(null, new IllegalStateException("datasource is not in an available state")); return; } @@ -174,6 +178,10 @@ public void onResponse(final Datasource datasource) { @Override public void onFailure(final Exception e) { + if (e instanceof IndexNotFoundException) { + handler.accept(null, new IllegalStateException("datasource is not available")); + return; + } handler.accept(null, e); } }); @@ -241,8 +249,8 @@ protected void executeInternal( datasourceFacade.getDatasource(datasourceName, new ActionListener<>() { @Override public void onResponse(final Datasource datasource) { - if (datasource == null) { - handler.accept(null, new IllegalStateException("datasource does not exist")); + if (datasource == null || DatasourceState.AVAILABLE.equals(datasource.getState()) == false) { + handler.accept(null, new IllegalStateException("datasource is not available")); return; } @@ -262,6 +270,10 @@ public void onResponse(final Datasource datasource) { @Override public void onFailure(final Exception e) { + if (e instanceof IndexNotFoundException) { + handler.accept(null, new IllegalStateException("datasource is not available")); + return; + } handler.accept(null, e); } }); @@ -319,15 +331,8 @@ public Factory(final IngestService ingestService, final DatasourceFacade datasou } /** - * When a user create a processor, this method is called twice. Once to validate the new processor and another - * to apply cluster state change after the processor is added. - * - * The second call is made by ClusterApplierService. Therefore, we cannot access cluster state in the call. - * That means, we cannot even query an index inside the call. - * - * Because the processor is validated in the first call, we skip the validation in the second call. - * - * @see org.opensearch.cluster.service.ClusterApplierService#state() + * Within this method, blocking request cannot be called because this method is executed in a transport thread. + * This means, validation using data in an index won't work. */ @Override public Ip2GeoProcessor create( @@ -342,11 +347,6 @@ public Ip2GeoProcessor create( List propertyNames = readOptionalList(TYPE, processorTag, config, CONFIG_PROPERTIES); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, CONFIG_IGNORE_MISSING, false); - // Skip validation for the call by cluster applier service - if (Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME) == false) { - validate(processorTag, datasourceName, propertyNames); - } - return new Ip2GeoProcessor( processorTag, description, @@ -360,39 +360,5 @@ public Ip2GeoProcessor create( geoIpDataFacade ); } - - private void validate(final String processorTag, final String datasourceName, final List propertyNames) throws IOException { - Datasource datasource = datasourceFacade.getDatasource(datasourceName); - - if (datasource == null) { - throw newConfigurationException(TYPE, processorTag, "datasource", "datasource [" + datasourceName + "] doesn't exist"); - } - - if (DatasourceState.AVAILABLE.equals(datasource.getState()) == false) { - throw newConfigurationException( - TYPE, - processorTag, - "datasource", - "datasource [" + datasourceName + "] is not in an available state" - ); - } - - if (propertyNames == null) { - return; - } - - // Validate properties are valid. If not add all available properties. - final Set availableProperties = new HashSet<>(datasource.getDatabase().getFields()); - for (String fieldName : propertyNames) { - if (availableProperties.contains(fieldName) == false) { - throw newConfigurationException( - TYPE, - processorTag, - "properties", - "property [" + fieldName + "] is not available in the datasource [" + datasourceName + "]" - ); - } - } - } } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java index aeface41..a99570ce 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java @@ -8,7 +8,6 @@ import java.io.IOException; import java.time.Duration; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -35,21 +34,6 @@ public class Ip2GeoProcessorIT extends GeospatialRestTestCase { private static final String IP = "ip"; private static final String SOURCE = "_source"; - @SneakyThrows - public void testCreateIp2GeoProcessor_whenNoSuchDatasourceExist_thenFails() { - String pipelineName = PREFIX + GeospatialTestHelper.randomLowerCaseString(); - - // Run - ResponseException exception = expectThrows( - ResponseException.class, - () -> createIp2GeoProcessorPipeline(pipelineName, Collections.emptyMap()) - ); - - // Verify - assertTrue(exception.getMessage().contains("doesn't exist")); - assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); - } - @SneakyThrows public void testCreateIp2GeoProcessor_whenValidInput_thenAddData() { Ip2GeoDataServer.start(); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java index c3dbf765..fcf3dcb3 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java @@ -25,7 +25,6 @@ import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.common.Randomness; import org.opensearch.geospatial.GeospatialTestHelper; @@ -46,40 +45,6 @@ public void init() { factory = new Ip2GeoProcessor.Factory(ingestService, datasourceFacade, geoIpDataFacade); } - public void testCreateWithNoDatasource() { - Map config = new HashMap<>(); - config.put("field", "ip"); - config.put(CONFIG_DATASOURCE_KEY, "no_datasource"); - OpenSearchException exception = expectThrows( - OpenSearchException.class, - () -> factory.create( - Collections.emptyMap(), - GeospatialTestHelper.randomLowerCaseString(), - GeospatialTestHelper.randomLowerCaseString(), - config - ) - ); - assertTrue(exception.getDetailedMessage().contains("doesn't exist")); - } - - public void testCreateWithInvalidDatasourceState() { - Datasource datasource = new Datasource(); - datasource.setName(GeospatialTestHelper.randomLowerCaseString()); - datasource.setState(randomStateExcept(DatasourceState.AVAILABLE)); - OpenSearchException exception = expectThrows(OpenSearchException.class, () -> createProcessor(datasource, Collections.emptyMap())); - assertTrue(exception.getDetailedMessage().contains("available state")); - } - - public void testCreateIp2GeoProcessor_whenInvalidProperties_thenException() { - Map config = new HashMap<>(); - config.put("properties", Arrays.asList(SUPPORTED_FIELDS.get(0), "invalid_property")); - OpenSearchException exception = expectThrows( - OpenSearchException.class, - () -> createProcessor(GeospatialTestHelper.randomLowerCaseString(), config) - ); - assertTrue(exception.getDetailedMessage().contains("property")); - } - public void testExecuteWithNoIpAndIgnoreMissing() throws Exception { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Map config = new HashMap<>(); @@ -125,13 +90,14 @@ public void testExecute_whenNonStringValue_thenException() throws Exception { public void testExecuteWithNullDatasource() throws Exception { BiConsumer handler = (doc, e) -> { assertNull(doc); - assertTrue(e.getMessage().contains("datasource does not exist")); + assertTrue(e.getMessage().contains("datasource is not available")); }; getActionListener(Collections.emptyMap(), handler).onResponse(null); } public void testExecuteWithExpiredDatasource() throws Exception { Datasource datasource = mock(Datasource.class); + when(datasource.getState()).thenReturn(DatasourceState.AVAILABLE); when(datasource.isExpired()).thenReturn(true); BiConsumer handler = (doc, e) -> { assertEquals("ip2geo_data_expired", doc.getFieldValue(DEFAULT_TARGET_FIELD + ".error", String.class)); @@ -174,6 +140,7 @@ public void testExecuteInternal_whenSingleIp_thenGetDatasourceIsCalled() { verify(datasourceFacade).getDatasource(eq(datasourceName), captor.capture()); Datasource datasource = mock(Datasource.class); when(datasource.isExpired()).thenReturn(false); + when(datasource.getState()).thenReturn(DatasourceState.AVAILABLE); when(datasource.currentIndexName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); captor.getValue().onResponse(datasource); @@ -301,6 +268,7 @@ public void testExecuteInternal_whenMultiIps_thenGetDatasourceIsCalled() { verify(datasourceFacade).getDatasource(eq(datasourceName), captor.capture()); Datasource datasource = mock(Datasource.class); when(datasource.isExpired()).thenReturn(false); + when(datasource.getState()).thenReturn(DatasourceState.AVAILABLE); when(datasource.currentIndexName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); captor.getValue().onResponse(datasource);