Skip to content

Commit

Permalink
Return null index name for expired data (#322)
Browse files Browse the repository at this point in the history
Return null index name for expired data so that it can be deleted
by clean up process. Clean up process exclude current index from deleting.
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 authored May 25, 2023
1 parent 617f355 commit e63ce86
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void disable() {
* @return Current index name of a datasource
*/
public String currentIndexName() {
return indexNameFor(database.updatedAt.toEpochMilli());
return isExpired() ? null : indexNameFor(database.updatedAt.toEpochMilli());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,19 @@ protected void executeInternal(
datasourceFacade.getDatasource(datasourceName, new ActionListener<>() {
@Override
public void onResponse(final Datasource datasource) {
if (handleInvalidDatasource(ingestDocument, datasource, handler)) {
if (datasource == null) {
handler.accept(null, new IllegalStateException("datasource does not exist"));
return;
}

geoIpDataFacade.getGeoIpData(datasource.currentIndexName(), ip, new ActionListener<>() {
String indexName = datasource.currentIndexName();
if (indexName == null) {
ingestDocument.setFieldValue(targetField, DATA_EXPIRED);
handler.accept(ingestDocument, null);
return;
}

geoIpDataFacade.getGeoIpData(indexName, ip, new ActionListener<>() {
@Override
public void onResponse(final Map<String, Object> ipToGeoData) {
handleSingleIp(ip, ipToGeoData, ingestDocument, handler);
Expand Down Expand Up @@ -229,12 +237,20 @@ protected void executeInternal(
datasourceFacade.getDatasource(datasourceName, new ActionListener<>() {
@Override
public void onResponse(final Datasource datasource) {
if (handleInvalidDatasource(ingestDocument, datasource, handler)) {
if (datasource == null) {
handler.accept(null, new IllegalStateException("datasource does not exist"));
return;
}

String indexName = datasource.currentIndexName();
if (indexName == null) {
ingestDocument.setFieldValue(targetField, DATA_EXPIRED);
handler.accept(ingestDocument, null);
return;
}

geoIpDataFacade.getGeoIpData(
datasource.currentIndexName(),
indexName,
ipList.iterator(),
clusterSettings.get(Ip2GeoSettings.MAX_BUNDLE_SIZE),
clusterSettings.get(Ip2GeoSettings.MAX_CONCURRENT_SEARCHES),
Expand Down Expand Up @@ -298,25 +314,6 @@ public void onFailure(final Exception e) {
};
}

@VisibleForTesting
protected boolean handleInvalidDatasource(
final IngestDocument ingestDocument,
final Datasource datasource,
final BiConsumer<IngestDocument, Exception> handler
) {
if (datasource == null) {
handler.accept(null, new IllegalStateException("datasource does not exist"));
return true;
}

if (datasource.isExpired()) {
ingestDocument.setFieldValue(targetField, DATA_EXPIRED);
handler.accept(ingestDocument, null);
return true;
}
return false;
}

@Override
public String getType() {
return TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,38 @@ public void testParser() throws Exception {
assertTrue(datasource.equals(anotherDatasource));
}

public void testCurrentIndexName() {
public void testCurrentIndexName_whenNotExpired_thenReturnName() {
String id = GeospatialTestHelper.randomLowerCaseString();
Instant now = Instant.now();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(now);
datasource.getDatabase().setValidForInDays(10l);
datasource.getDatabase().setFields(new ArrayList<>());

assertEquals(
String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, id, now.toEpochMilli()),
datasource.currentIndexName()
);
}

public void testCurrentIndexName_whenExpired_thenReturnNull() {
String id = GeospatialTestHelper.randomLowerCaseString();
Instant now = Instant.now();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(now);
datasource.getDatabase().setValidForInDays(1l);
datasource.getUpdateStats().setLastSucceededAt(Instant.now().minus(25, ChronoUnit.HOURS));
datasource.getDatabase().setFields(new ArrayList<>());

assertTrue(datasource.isExpired());
assertNull(datasource.currentIndexName());
}

public void testGetIndexNameFor() {
long updatedAt = randomPositiveLong();
DatasourceManifest manifest = mock(DatasourceManifest.class);
Expand Down

0 comments on commit e63ce86

Please sign in to comment.