Skip to content

Commit

Permalink
Add a field in datasource for current index name (opensearch-project#333
Browse files Browse the repository at this point in the history
)

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jul 13, 2023
1 parent 21c4522 commit 3183b17
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
@Getter
@Setter
@EqualsAndHashCode
@EqualsAndHashCode(callSuper = false)
public class GetDatasourceResponse extends ActionResponse implements ToXContentObject {
private static final ParseField FIELD_NAME_DATASOURCES = new ParseField("datasources");
private static final ParseField FIELD_NAME_NAME = new ParseField("name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.master.AcknowledgedRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -34,8 +34,8 @@
@Getter
@Setter
@Log4j2
@EqualsAndHashCode
public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceRequest> {
@EqualsAndHashCode(callSuper = false)
public class PutDatasourceRequest extends ActionRequest {
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.master.AcknowledgedRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -32,7 +32,7 @@
@Setter
@Log4j2
@EqualsAndHashCode(callSuper = false)
public class UpdateDatasourceRequest extends AcknowledgedRequest<UpdateDatasourceRequest> {
public class UpdateDatasourceRequest extends ActionRequest {
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class Datasource implements Writeable, ScheduledJobParameter {
*/
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField STATE_FIELD = new ParseField("state");
private static final ParseField CURRENT_INDEX_FIELD = new ParseField("current_index");
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField DATABASE_FIELD = new ParseField("database");
private static final ParseField UPDATE_STATS_FIELD = new ParseField("update_stats");
Expand Down Expand Up @@ -150,8 +151,14 @@ public class Datasource implements Writeable, ScheduledJobParameter {
*/
private DatasourceState state;
/**
* @param indices A list of indices having GeoIP data
* @return A list of indices having GeoIP data
* @param currentIndex the current index name having GeoIP data
* @return the current index name having GeoIP data
*/
@Getter(AccessLevel.NONE)
private String currentIndex;
/**
* @param indices A list of indices having GeoIP data including currentIndex
* @return A list of indices having GeoIP data including currentIndex
*/
private List<String> indices;
/**
Expand Down Expand Up @@ -181,9 +188,10 @@ public class Datasource implements Writeable, ScheduledJobParameter {
DatasourceTask task = DatasourceTask.valueOf((String) args[6]);
String endpoint = (String) args[7];
DatasourceState state = DatasourceState.valueOf((String) args[8]);
List<String> indices = (List<String>) args[9];
Database database = (Database) args[10];
UpdateStats updateStats = (UpdateStats) args[11];
String currentIndex = (String) args[9];
List<String> indices = (List<String>) args[10];
Database database = (Database) args[11];
UpdateStats updateStats = (UpdateStats) args[12];
Datasource parameter = new Datasource(
name,
lastUpdateTime,
Expand All @@ -194,6 +202,7 @@ public class Datasource implements Writeable, ScheduledJobParameter {
task,
endpoint,
state,
currentIndex,
indices,
database,
updateStats
Expand All @@ -212,6 +221,7 @@ public class Datasource implements Writeable, ScheduledJobParameter {
PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), ENDPOINT_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), CURRENT_INDEX_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD);
Expand All @@ -233,6 +243,7 @@ public Datasource(final String name, final IntervalSchedule schedule, final Stri
DatasourceTask.ALL,
endpoint,
DatasourceState.CREATING,
null,
new ArrayList<>(),
new Database(),
new UpdateStats()
Expand All @@ -249,6 +260,7 @@ public Datasource(final StreamInput in) throws IOException {
task = DatasourceTask.valueOf(in.readString());
endpoint = in.readString();
state = DatasourceState.valueOf(in.readString());
currentIndex = in.readOptionalString();
indices = in.readStringList();
database = new Database(in);
updateStats = new UpdateStats(in);
Expand All @@ -265,6 +277,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeString(task.name());
out.writeString(endpoint);
out.writeString(state.name());
out.writeOptionalString(currentIndex);
out.writeStringCollection(indices);
database.writeTo(out);
updateStats.writeTo(out);
Expand Down Expand Up @@ -292,6 +305,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field(TASK_FIELD.getPreferredName(), task.name());
builder.field(ENDPOINT_FIELD.getPreferredName(), endpoint);
builder.field(STATE_FIELD.getPreferredName(), state.name());
if (currentIndex != null) {
builder.field(CURRENT_INDEX_FIELD.getPreferredName(), currentIndex);
}
builder.field(INDICES_FIELD.getPreferredName(), indices);
builder.field(DATABASE_FIELD.getPreferredName(), database);
builder.field(UPDATE_STATS_FIELD.getPreferredName(), updateStats);
Expand Down Expand Up @@ -358,25 +374,17 @@ public String currentIndexName() {
return null;
}

if (database.updatedAt == null) {
return null;
}

return indexNameFor(database.updatedAt.toEpochMilli());
return currentIndex;
}

/**
* Index name for a given manifest
* Index name for a datasource with given suffix
*
* @param manifest manifest
* @return Index name for a given manifest
* @param suffix the suffix of a index name
* @return index name for a datasource with given suffix
*/
public String indexNameFor(final DatasourceManifest manifest) {
return indexNameFor(manifest.getUpdatedAt());
}

private String indexNameFor(final long suffix) {
return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix);
public String newIndexName(final String suffix) {
return String.format(Locale.ROOT, "%s.%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import lombok.extern.log4j.Log4j2;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
}

Instant startTime = Instant.now();
String indexName = setupIndex(manifest, datasource);
String indexName = setupIndex(datasource);
String[] header;
List<String> fieldsToStore;
try (CSVParser reader = geoIpDataFacade.getDatabaseReader(manifest)) {
Expand All @@ -86,7 +87,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
}

Instant endTime = Instant.now();
updateDatasourceAsSucceeded(datasource, manifest, fieldsToStore, startTime, endTime);
updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime);
}

/**
Expand Down Expand Up @@ -199,16 +200,16 @@ private CSVRecord validateHeader(CSVRecord header) {
*
* @param manifest the manifest
* @param datasource the datasource
* @return
* @throws IOException
*/
private void updateDatasourceAsSucceeded(
final String newIndexName,
final Datasource datasource,
final DatasourceManifest manifest,
final List<String> fields,
final Instant startTime,
final Instant endTime
) throws IOException {
) {
datasource.setCurrentIndex(newIndexName);
datasource.setDatabase(manifest, fields);
datasource.getUpdateStats().setLastSucceededAt(endTime);
datasource.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli());
Expand All @@ -225,13 +226,11 @@ private void updateDatasourceAsSucceeded(
/***
* Setup index to add a new geoip data
*
* @param manifest the manifest
* @param datasource the datasource
* @return
* @throws IOException
* @return new index name
*/
private String setupIndex(final DatasourceManifest manifest, final Datasource datasource) throws IOException {
String indexName = datasource.indexNameFor(manifest);
private String setupIndex(final Datasource datasource) {
String indexName = datasource.newIndexName(UUID.randomUUID().toString());
datasource.getIndices().add(indexName);
datasourceFacade.updateDatasource(datasource);
geoIpDataFacade.createIndexIfNotExists(indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ protected Datasource randomDatasource(final Instant updateStartTime) {
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(randomTask());
datasource.setState(randomState());
datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString());
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
datasource.getDatabase()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.geospatial.ip2geo.jobscheduler;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX;

import java.time.Instant;
Expand All @@ -15,20 +13,23 @@
import java.util.Arrays;
import java.util.Locale;

import lombok.SneakyThrows;

import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;

public class DatasourceTests extends Ip2GeoTestCase {

public void testParser() throws Exception {
@SneakyThrows
public void testParser_whenAllValueIsFilled_thenSucceed() {
String id = GeospatialTestHelper.randomLowerCaseString();
IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS);
String endpoint = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource(id, schedule, endpoint);
datasource.enable();
datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString());
datasource.getDatabase().setFields(Arrays.asList("field1", "field2"));
datasource.getDatabase().setProvider("test_provider");
datasource.getDatabase().setUpdatedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS));
Expand All @@ -46,27 +47,39 @@ public void testParser() throws Exception {
assertTrue(datasource.equals(anotherDatasource));
}

@SneakyThrows
public void testParser_whenNullForOptionalFields_thenSucceed() {
String id = GeospatialTestHelper.randomLowerCaseString();
IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS);
String endpoint = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource(id, schedule, endpoint);
Datasource anotherDatasource = Datasource.PARSER.parse(
createParser(datasource.toXContent(XContentFactory.jsonBuilder(), null)),
null
);
assertTrue(datasource.equals(anotherDatasource));
}

public void testCurrentIndexName_whenNotExpired_thenReturnName() {
String id = GeospatialTestHelper.randomLowerCaseString();
Instant now = Instant.now();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.setCurrentIndex(datasource.newIndexName(GeospatialTestHelper.randomLowerCaseString()));
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(now);
datasource.getDatabase().setFields(new ArrayList<>());

assertEquals(
String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, id, now.toEpochMilli()),
datasource.currentIndexName()
);
assertNotNull(datasource.currentIndexName());
}

public void testCurrentIndexName_whenExpired_thenReturnNull() {
String id = GeospatialTestHelper.randomLowerCaseString();
Instant now = Instant.now();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.setCurrentIndex(datasource.newIndexName(GeospatialTestHelper.randomLowerCaseString()));
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(now);
Expand All @@ -78,33 +91,13 @@ public void testCurrentIndexName_whenExpired_thenReturnNull() {
assertNull(datasource.currentIndexName());
}

public void testCurrentIndexName_whenDatabaseUpdateDateIsNull_thenReturnNull() {
String id = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(null);
datasource.getDatabase().setValidForInDays(1l);
datasource.getUpdateStats().setLastSucceededAt(Instant.now());
datasource.getDatabase().setFields(new ArrayList<>());

assertFalse(datasource.isExpired());
assertNull(datasource.currentIndexName());
}

public void testGetIndexNameFor() {
long updatedAt = randomPositiveLong();
DatasourceManifest manifest = mock(DatasourceManifest.class);
when(manifest.getUpdatedAt()).thenReturn(updatedAt);

String id = GeospatialTestHelper.randomLowerCaseString();
@SneakyThrows
public void testNewIndexName_whenCalled_thenReturnedExpectedValue() {
String name = GeospatialTestHelper.randomLowerCaseString();
String suffix = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource();
datasource.setName(id);
assertEquals(
String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, id, updatedAt),
datasource.indexNameFor(manifest)
);
datasource.setName(name);
assertEquals(String.format(Locale.ROOT, "%s.%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix), datasource.newIndexName(suffix));
}

public void testResetDatabase_whenCalled_thenNullifySomeFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() {
String lingeringIndex = indexPrefix + now.minusMillis(2).toEpochMilli();
Datasource datasource = new Datasource();
datasource.setName(datasourceName);
datasource.setCurrentIndex(currentIndex);
datasource.getIndices().add(currentIndex);
datasource.getIndices().add(oldIndex);
datasource.getIndices().add(lingeringIndex);
Expand Down

0 comments on commit 3183b17

Please sign in to comment.