-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added a cache to store datasource metadata (#338)
Signed-off-by: Heemin Kim <[email protected]>
- Loading branch information
Showing
10 changed files
with
495 additions
and
158 deletions.
There are no files selected for viewing
120 changes: 120 additions & 0 deletions
120
src/main/java/org/opensearch/geospatial/ip2geo/cache/Ip2GeoCache.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.geospatial.ip2geo.cache; | ||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
import lombok.Getter; | ||
import lombok.extern.log4j.Log4j2; | ||
|
||
import org.opensearch.common.xcontent.LoggingDeprecationHandler; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; | ||
import org.opensearch.geospatial.ip2geo.common.DatasourceState; | ||
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; | ||
import org.opensearch.index.engine.Engine; | ||
import org.opensearch.index.shard.IndexingOperationListener; | ||
import org.opensearch.index.shard.ShardId; | ||
|
||
@Log4j2 | ||
public class Ip2GeoCache implements IndexingOperationListener { | ||
private final DatasourceFacade datasourceFacade; | ||
private Map<String, DatasourceMetadata> data; | ||
|
||
public Ip2GeoCache(final DatasourceFacade datasourceFacade) { | ||
this.datasourceFacade = datasourceFacade; | ||
} | ||
|
||
public String getIndexName(final String datasourceName) { | ||
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName(); | ||
} | ||
|
||
public boolean isExpired(final String datasourceName) { | ||
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now()); | ||
} | ||
|
||
public boolean has(final String datasourceName) { | ||
return getData().containsKey(datasourceName); | ||
} | ||
|
||
public DatasourceState getState(final String datasourceName) { | ||
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState(); | ||
} | ||
|
||
private Map<String, DatasourceMetadata> getData() { | ||
if (data != null) { | ||
return data; | ||
} | ||
synchronized (this) { | ||
if (data != null) { | ||
return data; | ||
} | ||
Map<String, DatasourceMetadata> tempData = new ConcurrentHashMap<>(); | ||
datasourceFacade.getAllDatasources() | ||
.stream() | ||
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource))); | ||
data = tempData; | ||
return data; | ||
} | ||
} | ||
|
||
private void put(final Datasource datasource) { | ||
DatasourceMetadata metadata = new DatasourceMetadata(datasource); | ||
getData().put(datasource.getName(), metadata); | ||
} | ||
|
||
private void remove(final String datasourceName) { | ||
getData().remove(datasourceName); | ||
} | ||
|
||
@Override | ||
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) { | ||
if (Engine.Result.Type.FAILURE.equals(result.getResultType())) { | ||
return; | ||
} | ||
|
||
try { | ||
XContentParser parser = XContentType.JSON.xContent() | ||
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, index.source().utf8ToString()); | ||
parser.nextToken(); | ||
Datasource datasource = Datasource.PARSER.parse(parser, null); | ||
put(datasource); | ||
} catch (IOException e) { | ||
log.error("IOException occurred updating datasource metadata for datasource {} ", index.id(), e); | ||
} | ||
} | ||
|
||
@Override | ||
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) { | ||
if (result.getResultType().equals(Engine.Result.Type.FAILURE)) { | ||
return; | ||
} | ||
remove(delete.id()); | ||
} | ||
|
||
@Getter | ||
private static class DatasourceMetadata { | ||
private static DatasourceMetadata EMPTY_METADATA = new DatasourceMetadata(); | ||
private String indexName; | ||
private Instant expirationDate; | ||
private DatasourceState state; | ||
|
||
private DatasourceMetadata() { | ||
expirationDate = Instant.MIN; | ||
} | ||
|
||
public DatasourceMetadata(final Datasource datasource) { | ||
this.indexName = datasource.currentIndexName(); | ||
this.expirationDate = datasource.expirationDay(); | ||
this.state = datasource.getState(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.