Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of code #342

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;
package org.opensearch.geospatial.ip2geo.dao;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -18,18 +18,20 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.ShardId;

/**
* Data access object for Datasource and GeoIP data with added caching layer
*/
@Log4j2
public class Ip2GeoCache implements IndexingOperationListener {
public class Ip2GeoCachedDao implements IndexingOperationListener {
private final DatasourceDao datasourceDao;
private Map<String, DatasourceMetadata> data;

public Ip2GeoCache(final DatasourceDao datasourceDao) {
public Ip2GeoCachedDao(final DatasourceDao datasourceDao) {
this.datasourceDao = datasourceDao;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.opensearch.action.ActionListener;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.cache.Ip2GeoCache;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoCachedDao;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.IngestService;
Expand Down Expand Up @@ -57,7 +57,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
private final ClusterSettings clusterSettings;
private final DatasourceDao datasourceDao;
private final GeoIpDataDao geoIpDataDao;
private final Ip2GeoCache ip2GeoCache;
private final Ip2GeoCachedDao ip2GeoCachedDao;

/**
* Ip2Geo processor type
Expand All @@ -76,7 +76,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
* @param clusterSettings the cluster settings
* @param datasourceDao the datasource facade
* @param geoIpDataDao the geoip data facade
* @param ip2GeoCache the cache
* @param ip2GeoCachedDao the cache
*/
public Ip2GeoProcessor(
final String tag,
Expand All @@ -89,7 +89,7 @@ public Ip2GeoProcessor(
final ClusterSettings clusterSettings,
final DatasourceDao datasourceDao,
final GeoIpDataDao geoIpDataDao,
final Ip2GeoCache ip2GeoCache
final Ip2GeoCachedDao ip2GeoCachedDao
) {
super(tag, description);
this.field = field;
Expand All @@ -100,7 +100,7 @@ public Ip2GeoProcessor(
this.clusterSettings = clusterSettings;
this.datasourceDao = datasourceDao;
this.geoIpDataDao = geoIpDataDao;
this.ip2GeoCache = ip2GeoCache;
this.ip2GeoCachedDao = ip2GeoCachedDao;
}

/**
Expand Down Expand Up @@ -154,8 +154,8 @@ protected void executeInternal(
final String ip
) {
validateDatasourceIsInAvailableState(datasourceName);
String indexName = ip2GeoCache.getIndexName(datasourceName);
if (ip2GeoCache.isExpired(datasourceName) || indexName == null) {
String indexName = ip2GeoCachedDao.getIndexName(datasourceName);
if (ip2GeoCachedDao.isExpired(datasourceName) || indexName == null) {
handleExpiredData(ingestDocument, handler);
return;
}
Expand Down Expand Up @@ -209,11 +209,11 @@ private List<Map<String, Object>> filteredGeoData(final List<Map<String, Object>
}

private void validateDatasourceIsInAvailableState(final String datasourceName) {
if (ip2GeoCache.has(datasourceName) == false) {
if (ip2GeoCachedDao.has(datasourceName) == false) {
throw new IllegalStateException("datasource does not exist");
}

if (DatasourceState.AVAILABLE.equals(ip2GeoCache.getState(datasourceName)) == false) {
if (DatasourceState.AVAILABLE.equals(ip2GeoCachedDao.getState(datasourceName)) == false) {
throw new IllegalStateException("datasource is not in an available state");
}
}
Expand Down Expand Up @@ -243,8 +243,8 @@ protected void executeInternal(
}

validateDatasourceIsInAvailableState(datasourceName);
String indexName = ip2GeoCache.getIndexName(datasourceName);
if (ip2GeoCache.isExpired(datasourceName) || indexName == null) {
String indexName = ip2GeoCachedDao.getIndexName(datasourceName);
if (ip2GeoCachedDao.isExpired(datasourceName) || indexName == null) {
handleExpiredData(ingestDocument, handler);
return;
}
Expand Down Expand Up @@ -290,7 +290,7 @@ public static final class Factory implements Processor.Factory {
private final IngestService ingestService;
private final DatasourceDao datasourceDao;
private final GeoIpDataDao geoIpDataDao;
private final Ip2GeoCache ip2GeoCache;
private final Ip2GeoCachedDao ip2GeoCachedDao;

/**
* Within this method, blocking request cannot be called because this method is executed in a transport thread.
Expand Down Expand Up @@ -320,7 +320,7 @@ public Ip2GeoProcessor create(
ingestService.getClusterService().getClusterSettings(),
datasourceDao,
geoIpDataDao,
ip2GeoCache
ip2GeoCachedDao
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@
import org.opensearch.geospatial.ip2geo.action.RestUpdateDatasourceHandler;
import org.opensearch.geospatial.ip2geo.action.UpdateDatasourceAction;
import org.opensearch.geospatial.ip2geo.action.UpdateDatasourceTransportAction;
import org.opensearch.geospatial.ip2geo.cache.Ip2GeoCache;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoCachedDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceRunner;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
Expand Down Expand Up @@ -94,7 +94,7 @@
*/
@Log4j2
public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin {
private Ip2GeoCache ip2GeoCache;
private Ip2GeoCachedDao ip2GeoCachedDao;
private DatasourceDao datasourceDao;
private GeoIpDataDao geoIpDataDao;

Expand All @@ -107,17 +107,17 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
this.datasourceDao = new DatasourceDao(parameters.client, parameters.ingestService.getClusterService());
this.geoIpDataDao = new GeoIpDataDao(parameters.ingestService.getClusterService(), parameters.client);
this.ip2GeoCache = new Ip2GeoCache(datasourceDao);
this.ip2GeoCachedDao = new Ip2GeoCachedDao(datasourceDao);
return MapBuilder.<String, Processor.Factory>newMapBuilder()
.put(FeatureProcessor.TYPE, new FeatureProcessor.Factory())
.put(Ip2GeoProcessor.TYPE, new Ip2GeoProcessor.Factory(parameters.ingestService, datasourceDao, geoIpDataDao, ip2GeoCache))
.put(Ip2GeoProcessor.TYPE, new Ip2GeoProcessor.Factory(parameters.ingestService, datasourceDao, geoIpDataDao, ip2GeoCachedDao))
.immutableMap();
}

@Override
public void onIndexModule(IndexModule indexModule) {
if (DatasourceExtension.JOB_INDEX_NAME.equals(indexModule.getIndex().getName())) {
indexModule.addIndexOperationListener(ip2GeoCache);
indexModule.addIndexOperationListener(ip2GeoCachedDao);
log.info("Ip2GeoListener started listening to operations on index {}", DatasourceExtension.JOB_INDEX_NAME);
}
}
Expand Down Expand Up @@ -170,7 +170,7 @@ public Collection<Object> createComponents(
ip2GeoExecutor,
geoIpDataDao,
ip2GeoLockService,
ip2GeoCache
ip2GeoCachedDao
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.cache.Ip2GeoCache;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoCachedDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoProcessorDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
Expand Down Expand Up @@ -79,7 +79,7 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase {
@Mock
protected GeoIpDataDao geoIpDataDao;
@Mock
protected Ip2GeoCache ip2GeoCache;
protected Ip2GeoCachedDao ip2GeoCachedDao;
@Mock
protected ClusterState clusterState;
@Mock
Expand Down Expand Up @@ -266,7 +266,7 @@ protected Ip2GeoProcessor randomIp2GeoProcessor(String datasourceName) {
clusterSettings,
datasourceDao,
geoIpDataDao,
ip2GeoCache
ip2GeoCachedDao
);
return ip2GeoProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;
package org.opensearch.geospatial.ip2geo.dao;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -23,20 +23,20 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.ShardId;

public class Ip2GeoCacheTests extends Ip2GeoTestCase {
private Ip2GeoCache ip2GeoCache;
public class Ip2GeoCachedDaoTests extends Ip2GeoTestCase {
private Ip2GeoCachedDao ip2GeoCachedDao;

@Before
public void init() {
ip2GeoCache = new Ip2GeoCache(datasourceDao);
ip2GeoCachedDao = new Ip2GeoCachedDao(datasourceDao);
}

public void testGetIndexName_whenCalled_thenReturnIndexName() {
Datasource datasource = randomDatasource();
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
String indexName = ip2GeoCache.getIndexName(datasource.getName());
String indexName = ip2GeoCachedDao.getIndexName(datasource.getName());

// Verify
assertEquals(datasource.currentIndexName(), indexName);
Expand All @@ -49,7 +49,7 @@ public void testIsExpired_whenExpired_thenReturnTrue() {
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
boolean isExpired = ip2GeoCache.isExpired(datasource.getName());
boolean isExpired = ip2GeoCachedDao.isExpired(datasource.getName());

// Verify
assertTrue(isExpired);
Expand All @@ -62,7 +62,7 @@ public void testIsExpired_whenNotExpired_thenReturnFalse() {
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
boolean isExpired = ip2GeoCache.isExpired(datasource.getName());
boolean isExpired = ip2GeoCachedDao.isExpired(datasource.getName());

// Verify
assertFalse(isExpired);
Expand All @@ -73,7 +73,7 @@ public void testHas_whenHasDatasource_thenReturnTrue() {
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
boolean hasDatasource = ip2GeoCache.has(datasource.getName());
boolean hasDatasource = ip2GeoCachedDao.has(datasource.getName());

// Verify
assertTrue(hasDatasource);
Expand All @@ -85,7 +85,7 @@ public void testHas_whenNoDatasource_thenReturnFalse() {

String datasourceName = GeospatialTestHelper.randomLowerCaseString();
// Run
boolean hasDatasource = ip2GeoCache.has(datasourceName);
boolean hasDatasource = ip2GeoCachedDao.has(datasourceName);

// Verify
assertFalse(hasDatasource);
Expand All @@ -96,7 +96,7 @@ public void testGetState_whenCalled_thenReturnState() {
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
DatasourceState state = ip2GeoCache.getState(datasource.getName());
DatasourceState state = ip2GeoCachedDao.getState(datasource.getName());

// Verify
assertEquals(datasource.getState(), state);
Expand All @@ -115,13 +115,13 @@ public void testPostIndex_whenFailed_thenNoUpdate() {
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);

// Run
ip2GeoCache.postIndex(shardId, index, result);
ip2GeoCachedDao.postIndex(shardId, index, result);

// Verify
assertFalse(ip2GeoCache.has(datasource.getName()));
assertTrue(ip2GeoCache.isExpired(datasource.getName()));
assertNull(ip2GeoCache.getIndexName(datasource.getName()));
assertNull(ip2GeoCache.getState(datasource.getName()));
assertFalse(ip2GeoCachedDao.has(datasource.getName()));
assertTrue(ip2GeoCachedDao.isExpired(datasource.getName()));
assertNull(ip2GeoCachedDao.getIndexName(datasource.getName()));
assertNull(ip2GeoCachedDao.getState(datasource.getName()));
}

@SneakyThrows
Expand All @@ -137,13 +137,13 @@ public void testPostIndex_whenSucceed_thenUpdate() {
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

// Run
ip2GeoCache.postIndex(shardId, index, result);
ip2GeoCachedDao.postIndex(shardId, index, result);

// Verify
assertTrue(ip2GeoCache.has(datasource.getName()));
assertFalse(ip2GeoCache.isExpired(datasource.getName()));
assertEquals(datasource.currentIndexName(), ip2GeoCache.getIndexName(datasource.getName()));
assertEquals(datasource.getState(), ip2GeoCache.getState(datasource.getName()));
assertTrue(ip2GeoCachedDao.has(datasource.getName()));
assertFalse(ip2GeoCachedDao.isExpired(datasource.getName()));
assertEquals(datasource.currentIndexName(), ip2GeoCachedDao.getIndexName(datasource.getName()));
assertEquals(datasource.getState(), ip2GeoCachedDao.getState(datasource.getName()));
}

public void testPostDelete_whenFailed_thenNoUpdate() {
Expand All @@ -156,10 +156,10 @@ public void testPostDelete_whenFailed_thenNoUpdate() {
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);

// Run
ip2GeoCache.postDelete(shardId, index, result);
ip2GeoCachedDao.postDelete(shardId, index, result);

// Verify
assertTrue(ip2GeoCache.has(datasource.getName()));
assertTrue(ip2GeoCachedDao.has(datasource.getName()));
}

public void testPostDelete_whenSucceed_thenUpdate() {
Expand All @@ -173,9 +173,9 @@ public void testPostDelete_whenSucceed_thenUpdate() {
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

// Run
ip2GeoCache.postDelete(shardId, index, result);
ip2GeoCachedDao.postDelete(shardId, index, result);

// Verify
assertFalse(ip2GeoCache.has(datasource.getName()));
assertFalse(ip2GeoCachedDao.has(datasource.getName()));
}
}
Loading