Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a field in datasource for current index name #333

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
@Getter
@Setter
@EqualsAndHashCode
@EqualsAndHashCode(callSuper = false)
public class GetDatasourceResponse extends ActionResponse implements ToXContentObject {
private static final ParseField FIELD_NAME_DATASOURCES = new ParseField("datasources");
private static final ParseField FIELD_NAME_NAME = new ParseField("name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.master.AcknowledgedRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -34,8 +34,8 @@
@Getter
@Setter
@Log4j2
@EqualsAndHashCode
public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceRequest> {
@EqualsAndHashCode(callSuper = false)
public class PutDatasourceRequest extends ActionRequest {
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.master.AcknowledgedRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -32,7 +32,7 @@
@Setter
@Log4j2
@EqualsAndHashCode(callSuper = false)
public class UpdateDatasourceRequest extends AcknowledgedRequest<UpdateDatasourceRequest> {
public class UpdateDatasourceRequest extends ActionRequest {
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class Datasource implements Writeable, ScheduledJobParameter {
*/
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField STATE_FIELD = new ParseField("state");
private static final ParseField CURRENT_INDEX_FIELD = new ParseField("current_index");
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField DATABASE_FIELD = new ParseField("database");
private static final ParseField UPDATE_STATS_FIELD = new ParseField("update_stats");
Expand Down Expand Up @@ -150,8 +151,14 @@ public class Datasource implements Writeable, ScheduledJobParameter {
*/
private DatasourceState state;
/**
* @param indices A list of indices having GeoIP data
* @return A list of indices having GeoIP data
* @param currentIndex the current index name having GeoIP data
* @return the current index name having GeoIP data
*/
@Getter(AccessLevel.NONE)
private String currentIndex;
/**
* @param indices A list of indices having GeoIP data including currentIndex
* @return A list of indices having GeoIP data including currentIndex
*/
private List<String> indices;
/**
Expand Down Expand Up @@ -181,9 +188,10 @@ public class Datasource implements Writeable, ScheduledJobParameter {
DatasourceTask task = DatasourceTask.valueOf((String) args[6]);
String endpoint = (String) args[7];
DatasourceState state = DatasourceState.valueOf((String) args[8]);
List<String> indices = (List<String>) args[9];
Database database = (Database) args[10];
UpdateStats updateStats = (UpdateStats) args[11];
String currentIndex = (String) args[9];
List<String> indices = (List<String>) args[10];
Database database = (Database) args[11];
UpdateStats updateStats = (UpdateStats) args[12];
Datasource parameter = new Datasource(
name,
lastUpdateTime,
Expand All @@ -194,6 +202,7 @@ public class Datasource implements Writeable, ScheduledJobParameter {
task,
endpoint,
state,
currentIndex,
indices,
database,
updateStats
Expand All @@ -212,6 +221,7 @@ public class Datasource implements Writeable, ScheduledJobParameter {
PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), ENDPOINT_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), CURRENT_INDEX_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD);
Expand All @@ -233,6 +243,7 @@ public Datasource(final String name, final IntervalSchedule schedule, final Stri
DatasourceTask.ALL,
endpoint,
DatasourceState.CREATING,
null,
new ArrayList<>(),
new Database(),
new UpdateStats()
Expand All @@ -249,6 +260,7 @@ public Datasource(final StreamInput in) throws IOException {
task = DatasourceTask.valueOf(in.readString());
endpoint = in.readString();
state = DatasourceState.valueOf(in.readString());
currentIndex = in.readOptionalString();
indices = in.readStringList();
database = new Database(in);
updateStats = new UpdateStats(in);
Expand All @@ -265,6 +277,7 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeString(task.name());
out.writeString(endpoint);
out.writeString(state.name());
out.writeOptionalString(currentIndex);
out.writeStringCollection(indices);
database.writeTo(out);
updateStats.writeTo(out);
Expand Down Expand Up @@ -292,6 +305,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field(TASK_FIELD.getPreferredName(), task.name());
builder.field(ENDPOINT_FIELD.getPreferredName(), endpoint);
builder.field(STATE_FIELD.getPreferredName(), state.name());
if (currentIndex != null) {
builder.field(CURRENT_INDEX_FIELD.getPreferredName(), currentIndex);
}
builder.field(INDICES_FIELD.getPreferredName(), indices);
builder.field(DATABASE_FIELD.getPreferredName(), database);
builder.field(UPDATE_STATS_FIELD.getPreferredName(), updateStats);
Expand Down Expand Up @@ -358,25 +374,17 @@ public String currentIndexName() {
return null;
}

if (database.updatedAt == null) {
return null;
}

return indexNameFor(database.updatedAt.toEpochMilli());
return currentIndex;
}

/**
* Index name for a given manifest
* Index name for a datasource with given suffix
*
* @param manifest manifest
* @return Index name for a given manifest
* @param suffix the suffix of a index name
* @return index name for a datasource with given suffix
*/
public String indexNameFor(final DatasourceManifest manifest) {
return indexNameFor(manifest.getUpdatedAt());
}

private String indexNameFor(final long suffix) {
return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix);
public String newIndexName(final String suffix) {
return String.format(Locale.ROOT, "%s.%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import lombok.extern.log4j.Log4j2;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
}

Instant startTime = Instant.now();
String indexName = setupIndex(manifest, datasource);
String indexName = setupIndex(datasource);
String[] header;
List<String> fieldsToStore;
try (CSVParser reader = geoIpDataFacade.getDatabaseReader(manifest)) {
Expand All @@ -86,7 +87,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
}

Instant endTime = Instant.now();
updateDatasourceAsSucceeded(datasource, manifest, fieldsToStore, startTime, endTime);
updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime);
}

/**
Expand Down Expand Up @@ -199,16 +200,16 @@ private CSVRecord validateHeader(CSVRecord header) {
*
* @param manifest the manifest
* @param datasource the datasource
* @return
* @throws IOException
*/
private void updateDatasourceAsSucceeded(
final String newIndexName,
final Datasource datasource,
final DatasourceManifest manifest,
final List<String> fields,
final Instant startTime,
final Instant endTime
) throws IOException {
) {
datasource.setCurrentIndex(newIndexName);
datasource.setDatabase(manifest, fields);
datasource.getUpdateStats().setLastSucceededAt(endTime);
datasource.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli());
Expand All @@ -225,13 +226,11 @@ private void updateDatasourceAsSucceeded(
/***
* Setup index to add a new geoip data
*
* @param manifest the manifest
* @param datasource the datasource
* @return
* @throws IOException
* @return new index name
*/
private String setupIndex(final DatasourceManifest manifest, final Datasource datasource) throws IOException {
String indexName = datasource.indexNameFor(manifest);
private String setupIndex(final Datasource datasource) {
String indexName = datasource.newIndexName(UUID.randomUUID().toString());
datasource.getIndices().add(indexName);
datasourceFacade.updateDatasource(datasource);
geoIpDataFacade.createIndexIfNotExists(indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ protected Datasource randomDatasource(final Instant updateStartTime) {
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(randomTask());
datasource.setState(randomState());
datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString());
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
datasource.getDatabase()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.geospatial.ip2geo.jobscheduler;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX;

import java.time.Instant;
Expand All @@ -15,20 +13,23 @@
import java.util.Arrays;
import java.util.Locale;

import lombok.SneakyThrows;

import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;

public class DatasourceTests extends Ip2GeoTestCase {

public void testParser() throws Exception {
@SneakyThrows
public void testParser_whenAllValueIsFilled_thenSucceed() {
String id = GeospatialTestHelper.randomLowerCaseString();
IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS);
String endpoint = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource(id, schedule, endpoint);
datasource.enable();
datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString());
datasource.getDatabase().setFields(Arrays.asList("field1", "field2"));
datasource.getDatabase().setProvider("test_provider");
datasource.getDatabase().setUpdatedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS));
Expand All @@ -46,27 +47,39 @@ public void testParser() throws Exception {
assertTrue(datasource.equals(anotherDatasource));
}

@SneakyThrows
public void testParser_whenNullForOptionalFields_thenSucceed() {
String id = GeospatialTestHelper.randomLowerCaseString();
IntervalSchedule schedule = new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), 1, ChronoUnit.DAYS);
String endpoint = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource(id, schedule, endpoint);
Datasource anotherDatasource = Datasource.PARSER.parse(
createParser(datasource.toXContent(XContentFactory.jsonBuilder(), null)),
null
);
assertTrue(datasource.equals(anotherDatasource));
}

public void testCurrentIndexName_whenNotExpired_thenReturnName() {
String id = GeospatialTestHelper.randomLowerCaseString();
Instant now = Instant.now();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.setCurrentIndex(datasource.newIndexName(GeospatialTestHelper.randomLowerCaseString()));
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(now);
datasource.getDatabase().setFields(new ArrayList<>());

assertEquals(
String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, id, now.toEpochMilli()),
datasource.currentIndexName()
);
assertNotNull(datasource.currentIndexName());
}

public void testCurrentIndexName_whenExpired_thenReturnNull() {
String id = GeospatialTestHelper.randomLowerCaseString();
Instant now = Instant.now();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.setCurrentIndex(datasource.newIndexName(GeospatialTestHelper.randomLowerCaseString()));
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(now);
Expand All @@ -78,33 +91,13 @@ public void testCurrentIndexName_whenExpired_thenReturnNull() {
assertNull(datasource.currentIndexName());
}

public void testCurrentIndexName_whenDatabaseUpdateDateIsNull_thenReturnNull() {
String id = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(null);
datasource.getDatabase().setValidForInDays(1l);
datasource.getUpdateStats().setLastSucceededAt(Instant.now());
datasource.getDatabase().setFields(new ArrayList<>());

assertFalse(datasource.isExpired());
assertNull(datasource.currentIndexName());
}

public void testGetIndexNameFor() {
long updatedAt = randomPositiveLong();
DatasourceManifest manifest = mock(DatasourceManifest.class);
when(manifest.getUpdatedAt()).thenReturn(updatedAt);

String id = GeospatialTestHelper.randomLowerCaseString();
@SneakyThrows
public void testNewIndexName_whenCalled_thenReturnedExpectedValue() {
String name = GeospatialTestHelper.randomLowerCaseString();
String suffix = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource();
datasource.setName(id);
assertEquals(
String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, id, updatedAt),
datasource.indexNameFor(manifest)
);
datasource.setName(name);
assertEquals(String.format(Locale.ROOT, "%s.%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix), datasource.newIndexName(suffix));
}

public void testResetDatabase_whenCalled_thenNullifySomeFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() {
String lingeringIndex = indexPrefix + now.minusMillis(2).toEpochMilli();
Datasource datasource = new Datasource();
datasource.setName(datasourceName);
datasource.setCurrentIndex(currentIndex);
datasource.getIndices().add(currentIndex);
datasource.getIndices().add(oldIndex);
datasource.getIndices().add(lingeringIndex);
Expand Down