Skip to content

Commit

Permalink
Run update/delete request in a new thread (#337)
Browse files Browse the repository at this point in the history
This is not to block transport thread

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 authored Jun 16, 2023
1 parent e082a6f commit 63e3a47
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

/**
Expand All @@ -39,6 +40,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction<Dele
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;
private final Ip2GeoProcessorFacade ip2GeoProcessorFacade;
private final ThreadPool threadPool;

/**
* Constructor
Expand All @@ -56,14 +58,16 @@ public DeleteDatasourceTransportAction(
final IngestService ingestService,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade,
final Ip2GeoProcessorFacade ip2GeoProcessorFacade
final Ip2GeoProcessorFacade ip2GeoProcessorFacade,
final ThreadPool threadPool
) {
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
this.lockService = lockService;
this.ingestService = ingestService;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
this.ip2GeoProcessorFacade = ip2GeoProcessorFacade;
this.threadPool = threadPool;
}

/**
Expand All @@ -83,9 +87,17 @@ protected void doExecute(final Task task, final DeleteDatasourceRequest request,
return;
}
try {
deleteDatasource(request.getName());
lockService.releaseLock(lock);
listener.onResponse(new AcknowledgedResponse(true));
// TODO: makes every sub-methods as async call to avoid using a thread in generic pool
threadPool.generic().submit(() -> {
try {
deleteDatasource(request.getName());
lockService.releaseLock(lock);
listener.onResponse(new AcknowledgedResponse(true));
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
}
});
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

/**
Expand All @@ -42,6 +43,7 @@ public class UpdateDatasourceTransportAction extends HandledTransportAction<Upda
private final Ip2GeoLockService lockService;
private final DatasourceFacade datasourceFacade;
private final DatasourceUpdateService datasourceUpdateService;
private final ThreadPool threadPool;

/**
* Constructor
Expand All @@ -58,12 +60,14 @@ public UpdateDatasourceTransportAction(
final ActionFilters actionFilters,
final Ip2GeoLockService lockService,
final DatasourceFacade datasourceFacade,
final DatasourceUpdateService datasourceUpdateService
final DatasourceUpdateService datasourceUpdateService,
final ThreadPool threadPool
) {
super(UpdateDatasourceAction.NAME, transportService, actionFilters, UpdateDatasourceRequest::new);
this.lockService = lockService;
this.datasourceUpdateService = datasourceUpdateService;
this.datasourceFacade = datasourceFacade;
this.threadPool = threadPool;
}

/**
Expand All @@ -82,16 +86,23 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request,
);
return;
}

try {
Datasource datasource = datasourceFacade.getDatasource(request.getName());
if (datasource == null) {
throw new ResourceNotFoundException("no such datasource exist");
}
validate(request, datasource);
updateIfChanged(request, datasource);
lockService.releaseLock(lock);
listener.onResponse(new AcknowledgedResponse(true));
// TODO: makes every sub-methods as async call to avoid using a thread in generic pool
threadPool.generic().submit(() -> {
try {
Datasource datasource = datasourceFacade.getDatasource(request.getName());
if (datasource == null) {
throw new ResourceNotFoundException("no such datasource exist");
}
validate(request, datasource);
updateIfChanged(request, datasource);
lockService.releaseLock(lock);
listener.onResponse(new AcknowledgedResponse(true));
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
}
});
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public void init() {
ingestService,
datasourceFacade,
geoIpDataFacade,
ip2GeoProcessorFacade
ip2GeoProcessorFacade,
threadPool
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void init() {
actionFilters,
ip2GeoLockService,
datasourceFacade,
datasourceUpdateService
datasourceUpdateService,
threadPool
);
}

Expand Down

0 comments on commit 63e3a47

Please sign in to comment.