From 3745683e379dd3609ee422ab0b2e867317227728 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Tue, 13 Jun 2023 13:47:24 -0700 Subject: [PATCH] Run update/delete request in a new thread This is not to block transport thread Signed-off-by: Heemin Kim --- .../DeleteDatasourceTransportAction.java | 20 +++++++++--- .../UpdateDatasourceTransportAction.java | 31 +++++++++++++------ .../DeleteDatasourceTransportActionTests.java | 3 +- .../UpdateDatasourceTransportActionTests.java | 3 +- 4 files changed, 41 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index a4706834..f9a13f9e 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -26,6 +26,7 @@ import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; /** @@ -39,6 +40,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction { + try { + deleteDatasource(request.getName()); + lockService.releaseLock(lock); + listener.onResponse(new AcknowledgedResponse(true)); + } catch (Exception e) { + lockService.releaseLock(lock); + listener.onFailure(e); + } + }); } catch (Exception e) { lockService.releaseLock(lock); listener.onFailure(e); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java index ffdb349d..bc247b70 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java @@ -31,6 +31,7 @@ import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; /** @@ -42,6 +43,7 @@ public class UpdateDatasourceTransportAction extends HandledTransportAction { + try { + Datasource datasource = datasourceFacade.getDatasource(request.getName()); + if (datasource == null) { + throw new ResourceNotFoundException("no such datasource exist"); + } + validate(request, datasource); + updateIfChanged(request, datasource); + lockService.releaseLock(lock); + listener.onResponse(new AcknowledgedResponse(true)); + } catch (Exception e) { + lockService.releaseLock(lock); + listener.onFailure(e); + } + }); } catch (Exception e) { lockService.releaseLock(lock); listener.onFailure(e); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java index fa2afef8..ce5c91d4 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java @@ -48,7 +48,8 @@ public void init() { ingestService, datasourceFacade, geoIpDataFacade, - ip2GeoProcessorFacade + ip2GeoProcessorFacade, + threadPool ); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java index 3df610e8..32272f62 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java @@ -45,7 +45,8 @@ public void init() { actionFilters, ip2GeoLockService, datasourceFacade, - datasourceUpdateService + datasourceUpdateService, + threadPool ); }