Skip to content

Commit

Permalink
Changed class name and package (opensearch-project#341)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jul 13, 2023
1 parent 3a37a45 commit 19195d4
Show file tree
Hide file tree
Showing 27 changed files with 269 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.exceptions.ResourceInUseException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoProcessorDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
Expand All @@ -37,9 +37,9 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction<Dele
private static final long LOCK_DURATION_IN_SECONDS = 300l;
private final Ip2GeoLockService lockService;
private final IngestService ingestService;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;
private final Ip2GeoProcessorFacade ip2GeoProcessorFacade;
private final DatasourceDao datasourceDao;
private final GeoIpDataDao geoIpDataDao;
private final Ip2GeoProcessorDao ip2GeoProcessorDao;
private final ThreadPool threadPool;

/**
Expand All @@ -48,25 +48,25 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction<Dele
* @param actionFilters the action filters
* @param lockService the lock service
* @param ingestService the ingest service
* @param datasourceFacade the datasource facade
* @param datasourceDao the datasource facade
*/
@Inject
public DeleteDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final Ip2GeoLockService lockService,
final IngestService ingestService,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade,
final Ip2GeoProcessorFacade ip2GeoProcessorFacade,
final DatasourceDao datasourceDao,
final GeoIpDataDao geoIpDataDao,
final Ip2GeoProcessorDao ip2GeoProcessorDao,
final ThreadPool threadPool
) {
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
this.lockService = lockService;
this.ingestService = ingestService;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
this.ip2GeoProcessorFacade = ip2GeoProcessorFacade;
this.datasourceDao = datasourceDao;
this.geoIpDataDao = geoIpDataDao;
this.ip2GeoProcessorDao = ip2GeoProcessorDao;
this.threadPool = threadPool;
}

Expand Down Expand Up @@ -107,32 +107,32 @@ protected void doExecute(final Task task, final DeleteDatasourceRequest request,

@VisibleForTesting
protected void deleteDatasource(final String datasourceName) throws IOException {
Datasource datasource = datasourceFacade.getDatasource(datasourceName);
Datasource datasource = datasourceDao.getDatasource(datasourceName);
if (datasource == null) {
throw new ResourceNotFoundException("no such datasource exist");
}

setDatasourceStateAsDeleting(datasource);
geoIpDataFacade.deleteIp2GeoDataIndex(datasource.getIndices());
datasourceFacade.deleteDatasource(datasource);
geoIpDataDao.deleteIp2GeoDataIndex(datasource.getIndices());
datasourceDao.deleteDatasource(datasource);
}

private void setDatasourceStateAsDeleting(final Datasource datasource) {
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
if (ip2GeoProcessorDao.getProcessors(datasource.getName()).isEmpty() == false) {
throw new ResourceInUseException("datasource is being used by one of processors");
}

DatasourceState previousState = datasource.getState();
datasource.setState(DatasourceState.DELETING);
datasourceFacade.updateDatasource(datasource);
datasourceDao.updateDatasource(datasource);

// Check again as processor might just have been created.
// If it fails to update the state back to the previous state, the new processor
// will fail to convert an ip to a geo data.
// In such case, user have to delete the processor and delete this datasource again.
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
if (ip2GeoProcessorDao.getProcessors(datasource.getName()).isEmpty() == false) {
datasource.setState(previousState);
datasourceFacade.updateDatasource(datasource);
datasourceDao.updateDatasource(datasource);
throw new ResourceInUseException("datasource is being used by one of processors");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.tasks.Task;
Expand All @@ -24,31 +24,31 @@
* Transport action to get datasource
*/
public class GetDatasourceTransportAction extends HandledTransportAction<GetDatasourceRequest, GetDatasourceResponse> {
private final DatasourceFacade datasourceFacade;
private final DatasourceDao datasourceDao;

/**
* Default constructor
* @param transportService the transport service
* @param actionFilters the action filters
* @param datasourceFacade the datasource facade
* @param datasourceDao the datasource facade
*/
@Inject
public GetDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final DatasourceFacade datasourceFacade
final DatasourceDao datasourceDao
) {
super(GetDatasourceAction.NAME, transportService, actionFilters, GetDatasourceRequest::new);
this.datasourceFacade = datasourceFacade;
this.datasourceDao = datasourceDao;
}

@Override
protected void doExecute(final Task task, final GetDatasourceRequest request, final ActionListener<GetDatasourceResponse> listener) {
if (shouldGetAllDatasource(request)) {
// We don't expect too many data sources. Therefore, querying all data sources without pagination should be fine.
datasourceFacade.getAllDatasources(newActionListener(listener));
datasourceDao.getAllDatasources(newActionListener(listener));
} else {
datasourceFacade.getDatasources(request.getNames(), newActionListener(listener));
datasourceDao.getDatasources(request.getNames(), newActionListener(listener));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.index.engine.VersionConflictEngineException;
Expand All @@ -39,7 +39,7 @@
@Log4j2
public class PutDatasourceTransportAction extends HandledTransportAction<PutDatasourceRequest, AcknowledgedResponse> {
private final ThreadPool threadPool;
private final DatasourceFacade datasourceFacade;
private final DatasourceDao datasourceDao;
private final DatasourceUpdateService datasourceUpdateService;
private final Ip2GeoLockService lockService;

Expand All @@ -48,7 +48,7 @@ public class PutDatasourceTransportAction extends HandledTransportAction<PutData
* @param transportService the transport service
* @param actionFilters the action filters
* @param threadPool the thread pool
* @param datasourceFacade the datasource facade
* @param datasourceDao the datasource facade
* @param datasourceUpdateService the datasource update service
* @param lockService the lock service
*/
Expand All @@ -57,13 +57,13 @@ public PutDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final ThreadPool threadPool,
final DatasourceFacade datasourceFacade,
final DatasourceDao datasourceDao,
final DatasourceUpdateService datasourceUpdateService,
final Ip2GeoLockService lockService
) {
super(PutDatasourceAction.NAME, transportService, actionFilters, PutDatasourceRequest::new);
this.threadPool = threadPool;
this.datasourceFacade = datasourceFacade;
this.datasourceDao = datasourceDao;
this.datasourceUpdateService = datasourceUpdateService;
this.lockService = lockService;
}
Expand Down Expand Up @@ -97,10 +97,10 @@ protected void internalDoExecute(
final ActionListener<AcknowledgedResponse> listener
) {
StepListener<Void> createIndexStep = new StepListener<>();
datasourceFacade.createIndexIfNotExists(createIndexStep);
datasourceDao.createIndexIfNotExists(createIndexStep);
createIndexStep.whenComplete(v -> {
Datasource datasource = Datasource.Builder.build(request);
datasourceFacade.putDatasource(datasource, getIndexResponseListener(datasource, lock, listener));
datasourceDao.putDatasource(datasource, getIndexResponseListener(datasource, lock, listener));
}, exception -> {
lockService.releaseLock(lock);
listener.onFailure(exception);
Expand Down Expand Up @@ -165,7 +165,7 @@ private void markDatasourceAsCreateFailed(final Datasource datasource) {
datasource.getUpdateStats().setLastFailedAt(Instant.now());
datasource.setState(DatasourceState.CREATE_FAILED);
try {
datasourceFacade.updateDatasource(datasource);
datasourceDao.updateDatasource(datasource);
} catch (Exception e) {
log.error("Failed to mark datasource state as CREATE_FAILED for {}", datasource.getName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
Expand All @@ -41,7 +41,7 @@
public class UpdateDatasourceTransportAction extends HandledTransportAction<UpdateDatasourceRequest, AcknowledgedResponse> {
private static final long LOCK_DURATION_IN_SECONDS = 300l;
private final Ip2GeoLockService lockService;
private final DatasourceFacade datasourceFacade;
private final DatasourceDao datasourceDao;
private final DatasourceUpdateService datasourceUpdateService;
private final ThreadPool threadPool;

Expand All @@ -51,22 +51,22 @@ public class UpdateDatasourceTransportAction extends HandledTransportAction<Upda
* @param transportService the transport service
* @param actionFilters the action filters
* @param lockService the lock service
* @param datasourceFacade the datasource facade
* @param datasourceDao the datasource facade
* @param datasourceUpdateService the datasource update service
*/
@Inject
public UpdateDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final Ip2GeoLockService lockService,
final DatasourceFacade datasourceFacade,
final DatasourceDao datasourceDao,
final DatasourceUpdateService datasourceUpdateService,
final ThreadPool threadPool
) {
super(UpdateDatasourceAction.NAME, transportService, actionFilters, UpdateDatasourceRequest::new);
this.lockService = lockService;
this.datasourceUpdateService = datasourceUpdateService;
this.datasourceFacade = datasourceFacade;
this.datasourceDao = datasourceDao;
this.threadPool = threadPool;
}

Expand All @@ -90,7 +90,7 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request,
// TODO: makes every sub-methods as async call to avoid using a thread in generic pool
threadPool.generic().submit(() -> {
try {
Datasource datasource = datasourceFacade.getDatasource(request.getName());
Datasource datasource = datasourceDao.getDatasource(request.getName());
if (datasource == null) {
throw new ResourceNotFoundException("no such datasource exist");
}
Expand Down Expand Up @@ -124,7 +124,7 @@ private void updateIfChanged(final UpdateDatasourceRequest request, final Dataso
}

if (isChanged) {
datasourceFacade.updateDatasource(datasource);
datasourceDao.updateDatasource(datasource);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.ShardId;

@Log4j2
public class Ip2GeoCache implements IndexingOperationListener {
private final DatasourceFacade datasourceFacade;
private final DatasourceDao datasourceDao;
private Map<String, DatasourceMetadata> data;

public Ip2GeoCache(final DatasourceFacade datasourceFacade) {
this.datasourceFacade = datasourceFacade;
public Ip2GeoCache(final DatasourceDao datasourceDao) {
this.datasourceDao = datasourceDao;
}

public String getIndexName(final String datasourceName) {
Expand Down Expand Up @@ -58,7 +58,7 @@ private Map<String, DatasourceMetadata> getData() {
return data;
}
Map<String, DatasourceMetadata> tempData = new ConcurrentHashMap<>();
datasourceFacade.getAllDatasources()
datasourceDao.getAllDatasources()
.stream()
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
data = tempData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.common;
package org.opensearch.geospatial.ip2geo.dao;

import java.io.BufferedReader;
import java.io.IOException;
Expand Down Expand Up @@ -47,6 +47,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.shared.StashedThreadContext;
Expand All @@ -56,16 +57,16 @@
import org.opensearch.search.SearchHit;

/**
* Facade class for datasource
* Data access object for datasource
*/
@Log4j2
public class DatasourceFacade {
public class DatasourceDao {
private static final Integer MAX_SIZE = 1000;
private final Client client;
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;

public DatasourceFacade(final Client client, final ClusterService clusterService) {
public DatasourceDao(final Client client, final ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.clusterSettings = clusterService.getClusterSettings();
Expand Down Expand Up @@ -103,7 +104,7 @@ public void onFailure(final Exception e) {

private String getIndexMapping() {
try {
try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) {
try (InputStream is = DatasourceDao.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
return reader.lines().map(String::trim).collect(Collectors.joining());
}
Expand Down
Loading

0 comments on commit 19195d4

Please sign in to comment.