diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index ac81e563..2a37eac3 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -307,7 +307,7 @@ public void disable() { * @return Current index name of a datasource */ public String currentIndexName() { - return indexNameFor(database.updatedAt.toEpochMilli()); + return isExpired() ? null : indexNameFor(database.updatedAt.toEpochMilli()); } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 7c4f8120..fcb32b56 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -153,11 +153,19 @@ protected void executeInternal( datasourceFacade.getDatasource(datasourceName, new ActionListener<>() { @Override public void onResponse(final Datasource datasource) { - if (handleInvalidDatasource(ingestDocument, datasource, handler)) { + if (datasource == null) { + handler.accept(null, new IllegalStateException("datasource does not exist")); return; } - geoIpDataFacade.getGeoIpData(datasource.currentIndexName(), ip, new ActionListener<>() { + String indexName = datasource.currentIndexName(); + if (indexName == null) { + ingestDocument.setFieldValue(targetField, DATA_EXPIRED); + handler.accept(ingestDocument, null); + return; + } + + geoIpDataFacade.getGeoIpData(indexName, ip, new ActionListener<>() { @Override public void onResponse(final Map ipToGeoData) { handleSingleIp(ip, ipToGeoData, ingestDocument, handler); @@ -229,12 +237,20 @@ protected void executeInternal( datasourceFacade.getDatasource(datasourceName, new ActionListener<>() { @Override public void onResponse(final Datasource datasource) { - if (handleInvalidDatasource(ingestDocument, datasource, handler)) { + if (datasource == null) { + handler.accept(null, new IllegalStateException("datasource does not exist")); + return; + } + + String indexName = datasource.currentIndexName(); + if (indexName == null) { + ingestDocument.setFieldValue(targetField, DATA_EXPIRED); + handler.accept(ingestDocument, null); return; } geoIpDataFacade.getGeoIpData( - datasource.currentIndexName(), + indexName, ipList.iterator(), clusterSettings.get(Ip2GeoSettings.MAX_BUNDLE_SIZE), clusterSettings.get(Ip2GeoSettings.MAX_CONCURRENT_SEARCHES), @@ -298,25 +314,6 @@ public void onFailure(final Exception e) { }; } - @VisibleForTesting - protected boolean handleInvalidDatasource( - final IngestDocument ingestDocument, - final Datasource datasource, - final BiConsumer handler - ) { - if (datasource == null) { - handler.accept(null, new IllegalStateException("datasource does not exist")); - return true; - } - - if (datasource.isExpired()) { - ingestDocument.setFieldValue(targetField, DATA_EXPIRED); - handler.accept(ingestDocument, null); - return true; - } - return false; - } - @Override public String getType() { return TYPE; diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java index b8be8068..49b7961f 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java @@ -46,7 +46,7 @@ public void testParser() throws Exception { assertTrue(datasource.equals(anotherDatasource)); } - public void testCurrentIndexName() { + public void testCurrentIndexName_whenNotExpired_thenReturnName() { String id = GeospatialTestHelper.randomLowerCaseString(); Instant now = Instant.now(); Datasource datasource = new Datasource(); @@ -54,14 +54,30 @@ public void testCurrentIndexName() { datasource.getDatabase().setProvider("provider"); datasource.getDatabase().setSha256Hash("sha256Hash"); datasource.getDatabase().setUpdatedAt(now); - datasource.getDatabase().setValidForInDays(10l); datasource.getDatabase().setFields(new ArrayList<>()); + assertEquals( String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, id, now.toEpochMilli()), datasource.currentIndexName() ); } + public void testCurrentIndexName_whenExpired_thenReturnNull() { + String id = GeospatialTestHelper.randomLowerCaseString(); + Instant now = Instant.now(); + Datasource datasource = new Datasource(); + datasource.setName(id); + datasource.getDatabase().setProvider("provider"); + datasource.getDatabase().setSha256Hash("sha256Hash"); + datasource.getDatabase().setUpdatedAt(now); + datasource.getDatabase().setValidForInDays(1l); + datasource.getUpdateStats().setLastSucceededAt(Instant.now().minus(25, ChronoUnit.HOURS)); + datasource.getDatabase().setFields(new ArrayList<>()); + + assertTrue(datasource.isExpired()); + assertNull(datasource.currentIndexName()); + } + public void testGetIndexNameFor() { long updatedAt = randomPositiveLong(); DatasourceManifest manifest = mock(DatasourceManifest.class);