From 07c036214b33c99fb5db7b8f7472d4e15edbf6cb Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Wed, 7 Jun 2023 16:12:08 -0700 Subject: [PATCH] Add a field in datasource for current index name Signed-off-by: Heemin Kim --- .../ip2geo/jobscheduler/Datasource.java | 47 +++++++++------- .../jobscheduler/DatasourceUpdateService.java | 18 +++--- .../geospatial/ip2geo/Ip2GeoTestCase.java | 1 + .../ip2geo/jobscheduler/DatasourceTests.java | 56 +++++++++---------- .../DatasourceUpdateServiceTests.java | 1 + 5 files changed, 64 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index 0f884c32..dbed0d1c 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -89,6 +89,7 @@ public class Datasource implements Writeable, ScheduledJobParameter { */ private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); private static final ParseField STATE_FIELD = new ParseField("state"); + private static final ParseField CURRENT_INDEX_FIELD = new ParseField("current_index"); private static final ParseField INDICES_FIELD = new ParseField("indices"); private static final ParseField DATABASE_FIELD = new ParseField("database"); private static final ParseField UPDATE_STATS_FIELD = new ParseField("update_stats"); @@ -150,8 +151,14 @@ public class Datasource implements Writeable, ScheduledJobParameter { */ private DatasourceState state; /** - * @param indices A list of indices having GeoIP data - * @return A list of indices having GeoIP data + * @param currentIndex the current index name having GeoIP data + * @return the current index name having GeoIP data + */ + @Getter(AccessLevel.NONE) + private String currentIndex; + /** + * @param indices A list of indices having GeoIP data including currentIndex + * @return A list of indices having GeoIP data including currentIndex */ private List indices; /** @@ -181,9 +188,10 @@ public class Datasource implements Writeable, ScheduledJobParameter { DatasourceTask task = DatasourceTask.valueOf((String) args[6]); String endpoint = (String) args[7]; DatasourceState state = DatasourceState.valueOf((String) args[8]); - List indices = (List) args[9]; - Database database = (Database) args[10]; - UpdateStats updateStats = (UpdateStats) args[11]; + String currentIndex = (String) args[9]; + List indices = (List) args[10]; + Database database = (Database) args[11]; + UpdateStats updateStats = (UpdateStats) args[12]; Datasource parameter = new Datasource( name, lastUpdateTime, @@ -194,6 +202,7 @@ public class Datasource implements Writeable, ScheduledJobParameter { task, endpoint, state, + currentIndex, indices, database, updateStats @@ -212,6 +221,7 @@ public class Datasource implements Writeable, ScheduledJobParameter { PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), ENDPOINT_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), CURRENT_INDEX_FIELD); PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD); PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD); PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD); @@ -233,6 +243,7 @@ public Datasource(final String name, final IntervalSchedule schedule, final Stri DatasourceTask.ALL, endpoint, DatasourceState.CREATING, + null, new ArrayList<>(), new Database(), new UpdateStats() @@ -249,6 +260,7 @@ public Datasource(final StreamInput in) throws IOException { task = DatasourceTask.valueOf(in.readString()); endpoint = in.readString(); state = DatasourceState.valueOf(in.readString()); + currentIndex = in.readOptionalString(); indices = in.readStringList(); database = new Database(in); updateStats = new UpdateStats(in); @@ -265,6 +277,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeString(task.name()); out.writeString(endpoint); out.writeString(state.name()); + out.writeOptionalString(currentIndex); out.writeStringCollection(indices); database.writeTo(out); updateStats.writeTo(out); @@ -292,6 +305,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(TASK_FIELD.getPreferredName(), task.name()); builder.field(ENDPOINT_FIELD.getPreferredName(), endpoint); builder.field(STATE_FIELD.getPreferredName(), state.name()); + if (currentIndex != null) { + builder.field(CURRENT_INDEX_FIELD.getPreferredName(), currentIndex); + } builder.field(INDICES_FIELD.getPreferredName(), indices); builder.field(DATABASE_FIELD.getPreferredName(), database); builder.field(UPDATE_STATS_FIELD.getPreferredName(), updateStats); @@ -358,25 +374,18 @@ public String currentIndexName() { return null; } - if (database.updatedAt == null) { - return null; - } - - return indexNameFor(database.updatedAt.toEpochMilli()); + return currentIndex; } /** - * Index name for a given manifest + * New index name for a datasource with current time as suffix + * + * This method is not thread safe. It could generate same index depends on what Instant.now() returns. * - * @param manifest manifest - * @return Index name for a given manifest + * @return New index name for a datasource */ - public String indexNameFor(final DatasourceManifest manifest) { - return indexNameFor(manifest.getUpdatedAt()); - } - - private String indexNameFor(final long suffix) { - return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix); + public String newIndexName() { + return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, name, Instant.now().toEpochMilli()); } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 9dc94570..52bae69b 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -68,7 +68,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable } Instant startTime = Instant.now(); - String indexName = setupIndex(manifest, datasource); + String indexName = setupIndex(datasource); String[] header; List fieldsToStore; try (CSVParser reader = geoIpDataFacade.getDatabaseReader(manifest)) { @@ -86,7 +86,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable } Instant endTime = Instant.now(); - updateDatasourceAsSucceeded(datasource, manifest, fieldsToStore, startTime, endTime); + updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime); } /** @@ -199,16 +199,16 @@ private CSVRecord validateHeader(CSVRecord header) { * * @param manifest the manifest * @param datasource the datasource - * @return - * @throws IOException */ private void updateDatasourceAsSucceeded( + final String newIndexName, final Datasource datasource, final DatasourceManifest manifest, final List fields, final Instant startTime, final Instant endTime - ) throws IOException { + ) { + datasource.setCurrentIndex(newIndexName); datasource.setDatabase(manifest, fields); datasource.getUpdateStats().setLastSucceededAt(endTime); datasource.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); @@ -225,13 +225,11 @@ private void updateDatasourceAsSucceeded( /*** * Setup index to add a new geoip data * - * @param manifest the manifest * @param datasource the datasource - * @return - * @throws IOException + * @return new index name */ - private String setupIndex(final DatasourceManifest manifest, final Datasource datasource) throws IOException { - String indexName = datasource.indexNameFor(manifest); + private String setupIndex(final Datasource datasource) { + String indexName = datasource.newIndexName(); datasource.getIndices().add(indexName); datasourceFacade.updateDatasource(datasource); geoIpDataFacade.createIndexIfNotExists(indexName); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index 84609bae..f8b40232 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -208,6 +208,7 @@ protected Datasource randomDatasource(final Instant updateStartTime) { datasource.setSystemSchedule(datasource.getUserSchedule()); datasource.setTask(randomTask()); datasource.setState(randomState()); + datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString()); datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString())); datasource.getDatabase() diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java index 1f2210eb..80a43959 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java @@ -5,8 +5,6 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX; import java.time.Instant; @@ -15,20 +13,23 @@ import java.util.Arrays; import java.util.Locale; +import lombok.SneakyThrows; + import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; -import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; public class DatasourceTests extends Ip2GeoTestCase { - public void testParser() throws Exception { + @SneakyThrows + public void testParser_whenAllValueIsFilled_thenSucceed() { String id = GeospatialTestHelper.randomLowerCaseString(); IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS); String endpoint = GeospatialTestHelper.randomLowerCaseString(); Datasource datasource = new Datasource(id, schedule, endpoint); datasource.enable(); + datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString()); datasource.getDatabase().setFields(Arrays.asList("field1", "field2")); datasource.getDatabase().setProvider("test_provider"); datasource.getDatabase().setUpdatedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); @@ -46,11 +47,25 @@ public void testParser() throws Exception { assertTrue(datasource.equals(anotherDatasource)); } + @SneakyThrows + public void testParser_whenNullForOptionalFields_thenSucceed() { + String id = GeospatialTestHelper.randomLowerCaseString(); + IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS); + String endpoint = GeospatialTestHelper.randomLowerCaseString(); + Datasource datasource = new Datasource(id, schedule, endpoint); + Datasource anotherDatasource = Datasource.PARSER.parse( + createParser(datasource.toXContent(XContentFactory.jsonBuilder(), null)), + null + ); + assertTrue(datasource.equals(anotherDatasource)); + } + public void testCurrentIndexName_whenNotExpired_thenReturnName() { String id = GeospatialTestHelper.randomLowerCaseString(); Instant now = Instant.now(); Datasource datasource = new Datasource(); datasource.setName(id); + datasource.setCurrentIndex(datasource.newIndexName()); datasource.getDatabase().setProvider("provider"); datasource.getDatabase().setSha256Hash("sha256Hash"); datasource.getDatabase().setUpdatedAt(now); @@ -67,6 +82,7 @@ public void testCurrentIndexName_whenExpired_thenReturnNull() { Instant now = Instant.now(); Datasource datasource = new Datasource(); datasource.setName(id); + datasource.setCurrentIndex(datasource.newIndexName()); datasource.getDatabase().setProvider("provider"); datasource.getDatabase().setSha256Hash("sha256Hash"); datasource.getDatabase().setUpdatedAt(now); @@ -78,33 +94,13 @@ public void testCurrentIndexName_whenExpired_thenReturnNull() { assertNull(datasource.currentIndexName()); } - public void testCurrentIndexName_whenDatabaseUpdateDateIsNull_thenReturnNull() { - String id = GeospatialTestHelper.randomLowerCaseString(); - Datasource datasource = new Datasource(); - datasource.setName(id); - datasource.getDatabase().setProvider("provider"); - datasource.getDatabase().setSha256Hash("sha256Hash"); - datasource.getDatabase().setUpdatedAt(null); - datasource.getDatabase().setValidForInDays(1l); - datasource.getUpdateStats().setLastSucceededAt(Instant.now()); - datasource.getDatabase().setFields(new ArrayList<>()); - - assertFalse(datasource.isExpired()); - assertNull(datasource.currentIndexName()); - } - - public void testGetIndexNameFor() { - long updatedAt = randomPositiveLong(); - DatasourceManifest manifest = mock(DatasourceManifest.class); - when(manifest.getUpdatedAt()).thenReturn(updatedAt); - - String id = GeospatialTestHelper.randomLowerCaseString(); + @SneakyThrows + public void testNewIndexName_whenCalledTwice_thenTheyShouldBeDifferent() { Datasource datasource = new Datasource(); - datasource.setName(id); - assertEquals( - String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, id, updatedAt), - datasource.indexNameFor(manifest) - ); + String firstIndexName = datasource.newIndexName(); + Thread.sleep(1); + String secondIndexName = datasource.newIndexName(); + assertNotEquals(firstIndexName, secondIndexName); } public void testResetDatabase_whenCalled_thenNullifySomeFields() { diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index 8dcfbd7a..f650fc98 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -190,6 +190,7 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() { String lingeringIndex = indexPrefix + now.minusMillis(2).toEpochMilli(); Datasource datasource = new Datasource(); datasource.setName(datasourceName); + datasource.setCurrentIndex(currentIndex); datasource.getIndices().add(currentIndex); datasource.getIndices().add(oldIndex); datasource.getIndices().add(lingeringIndex);