Skip to content

Commit

Permalink
Reduce lock duration and renew the lock during update
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed May 9, 2023
1 parent 91c609b commit 2e1f9bb
Show file tree
Hide file tree
Showing 15 changed files with 393 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import static org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService.LOCK_DURATION_IN_SECONDS;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand All @@ -21,9 +24,11 @@
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -36,6 +41,7 @@ public class PutDatasourceTransportAction extends HandledTransportAction<PutData
private final ThreadPool threadPool;
private final DatasourceFacade datasourceFacade;
private final DatasourceUpdateService datasourceUpdateService;
private final Ip2GeoLockService lockService;

/**
* Default constructor
Expand All @@ -44,50 +50,76 @@ public class PutDatasourceTransportAction extends HandledTransportAction<PutData
* @param threadPool the thread pool
* @param datasourceFacade the datasource facade
* @param datasourceUpdateService the datasource update service
* @param lockService the lock service
*/
@Inject
public PutDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final ThreadPool threadPool,
final DatasourceFacade datasourceFacade,
final DatasourceUpdateService datasourceUpdateService
final DatasourceUpdateService datasourceUpdateService,
final Ip2GeoLockService lockService
) {
super(PutDatasourceAction.NAME, transportService, actionFilters, PutDatasourceRequest::new);
this.threadPool = threadPool;
this.datasourceFacade = datasourceFacade;
this.datasourceUpdateService = datasourceUpdateService;
this.lockService = lockService;
}

@Override
protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
try {
StepListener<Void> createIndexStep = new StepListener<>();
datasourceFacade.createIndexIfNotExists(createIndexStep);
createIndexStep.whenComplete(v -> putDatasource(request, listener), exception -> listener.onFailure(exception));
} catch (Exception e) {
listener.onFailure(e);
}
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(new OpenSearchException("another processor is holing a lock on the resource. Try again later"));
return;
}
try {
internalDoExecute(request, lock, listener);
} catch (Exception e) {
listener.onFailure(e);
} finally {
lockService.releaseLock(
lock,
ActionListener.wrap(
released -> { log.info("Released lock for datasource[{}]", request.getName()); },
exception -> { log.error("Failed to release the lock", exception); }
)
);
}
}, exception -> { listener.onFailure(exception); }));
}

@VisibleForTesting
protected void putDatasource(final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener)
throws IOException {
Datasource datasource = Datasource.Builder.build(request);
datasourceFacade.putDatasource(datasource, getIndexResponseListener(datasource, listener));
protected void internalDoExecute(
final PutDatasourceRequest request,
final LockModel lock,
final ActionListener<AcknowledgedResponse> listener
) {
StepListener<Void> createIndexStep = new StepListener<>();
datasourceFacade.createIndexIfNotExists(createIndexStep);
createIndexStep.whenComplete(v -> {
Datasource datasource = Datasource.Builder.build(request);
datasourceFacade.putDatasource(
datasource,
getIndexResponseListener(datasource, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), listener)
);
}, exception -> listener.onFailure(exception));
}

@VisibleForTesting
protected ActionListener<IndexResponse> getIndexResponseListener(
final Datasource datasource,
final Runnable renewLock,
final ActionListener<AcknowledgedResponse> listener
) {
return new ActionListener<>() {
@Override
public void onResponse(final IndexResponse indexResponse) {
// This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread
// pool.
threadPool.generic().submit(() -> { createDatasource(datasource); });
threadPool.generic().submit(() -> { createDatasource(datasource, renewLock); });
listener.onResponse(new AcknowledgedResponse(true));
}

Expand All @@ -103,15 +135,15 @@ public void onFailure(final Exception e) {
}

@VisibleForTesting
protected void createDatasource(final Datasource datasource) {
protected void createDatasource(final Datasource datasource, final Runnable renewLock) {
if (DatasourceState.CREATING.equals(datasource.getState()) == false) {
log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.CREATING, datasource.getState());
markDatasourceAsCreateFailed(datasource);
return;
}

try {
datasourceUpdateService.updateOrCreateGeoIpData(datasource);
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
} catch (Exception e) {
log.error("Failed to create datasource for {}", datasource.getName(), e);
markDatasourceAsCreateFailed(datasource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,15 @@ public void onFailure(final Exception e) {
* @param fields Field name matching with data in CSVRecord in order
* @param iterator GeoIP data to insert
* @param bulkSize Bulk size of data to process
* @param renewLock Runnable to renew lock
*/
public void putGeoIpData(final String indexName, final String[] fields, final Iterator<CSVRecord> iterator, final int bulkSize) {
public void putGeoIpData(
final String indexName,
final String[] fields,
final Iterator<CSVRecord> iterator,
final int bulkSize,
final Runnable renewLock
) {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
while (iterator.hasNext()) {
Expand All @@ -352,6 +359,7 @@ public void putGeoIpData(final String indexName, final String[] fields, final It
}
bulkRequest.requests().clear();
}
renewLock.run();
}
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,25 @@

import static org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension.JOB_INDEX_NAME;

import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;

/**
* A wrapper of job scheduler's lock service for datasource
*/
public class Ip2GeoLockService {
public static final long LOCK_DURATION_IN_SECONDS = 300l;
public static final long RENEW_AFTER_IN_SECONDS = 120l;
private final ClusterService clusterService;
private final Client client;
private final LockService lockService;

/**
Expand All @@ -33,10 +34,8 @@ public class Ip2GeoLockService {
* @param clusterService the cluster service
* @param client the client
*/
@Inject
public Ip2GeoLockService(final ClusterService clusterService, final Client client) {
this.clusterService = clusterService;
this.client = client;
this.lockService = new LockService(client, clusterService);
}

Expand Down Expand Up @@ -68,10 +67,9 @@ public void releaseLock(final LockModel lockModel, final ActionListener<Boolean>
* Synchronous method of LockService#renewLock
*
* @param lockModel lock to renew
* @param timeout timeout in milliseconds precise
* @return renewed lock if renew succeed and null otherwise
*/
public LockModel renewLock(final LockModel lockModel, final TimeValue timeout) {
public LockModel renewLock(final LockModel lockModel) {
AtomicReference<LockModel> lockReference = new AtomicReference();
CountDownLatch countDownLatch = new CountDownLatch(1);
lockService.renewLock(lockModel, new ActionListener<>() {
Expand All @@ -89,10 +87,24 @@ public void onFailure(final Exception e) {
});

try {
countDownLatch.await(timeout.getMillis(), TimeUnit.MILLISECONDS);
countDownLatch.await(clusterService.getClusterSettings().get(Ip2GeoSettings.TIMEOUT).getSeconds(), TimeUnit.SECONDS);
return lockReference.get();
} catch (InterruptedException e) {
return null;
}
}

public Runnable getRenewLockRunnable(final AtomicReference<LockModel> lockModel) {
return () -> {
LockModel preLock = lockModel.get();
if (Instant.now().isBefore(preLock.getLockTime().plusSeconds(RENEW_AFTER_IN_SECONDS))) {
return;
}

lockModel.set(renewLock(lockModel.get()));
if (lockModel.get() == null) {
new OpenSearchException("failed to renew a lock [{}]", preLock);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.geospatial.ip2geo.action.PutDatasourceRequest;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
Expand All @@ -50,7 +51,6 @@ public class Datasource implements Writeable, ScheduledJobParameter {
* Prefix of indices having Ip2Geo data
*/
public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data";
private static final long LOCK_DURATION_IN_SECONDS = 60 * 60;
private static final long MAX_JITTER_IN_MINUTES = 5;
private static final long ONE_DAY_IN_HOURS = 24;
private static final long ONE_HOUR_IN_MINUTES = 60;
Expand Down Expand Up @@ -282,7 +282,7 @@ public boolean isEnabled() {

@Override
public Long getLockDurationSeconds() {
return LOCK_DURATION_IN_SECONDS;
return Ip2GeoLockService.LOCK_DURATION_IN_SECONDS;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.log4j.Log4j2;

Expand All @@ -16,10 +17,10 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.utils.LockService;

/**
* Datasource update task
Expand Down Expand Up @@ -52,6 +53,7 @@ public static DatasourceRunner getJobRunnerInstance() {
private DatasourceUpdateService datasourceUpdateService;
private Ip2GeoExecutor ip2GeoExecutor;
private DatasourceFacade datasourceFacade;
private Ip2GeoLockService ip2GeoLockService;
private boolean initialized;

private DatasourceRunner() {
Expand All @@ -65,12 +67,14 @@ public void initialize(
final ClusterService clusterService,
final DatasourceUpdateService datasourceUpdateService,
final Ip2GeoExecutor ip2GeoExecutor,
final DatasourceFacade datasourceFacade
final DatasourceFacade datasourceFacade,
final Ip2GeoLockService ip2GeoLockService
) {
this.clusterService = clusterService;
this.datasourceUpdateService = datasourceUpdateService;
this.ip2GeoExecutor = ip2GeoExecutor;
this.datasourceFacade = datasourceFacade;
this.ip2GeoLockService = ip2GeoLockService;
this.initialized = true;
}

Expand All @@ -87,30 +91,27 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC
);
}

ip2GeoExecutor.forDatasourceUpdate().submit(updateDatasourceRunner(jobParameter, context));
ip2GeoExecutor.forDatasourceUpdate().submit(updateDatasourceRunner(jobParameter));
}

/**
* Update GeoIP data
*
* Lock is used so that only one of nodes run this task.
* Lock duration is 1 hour to avoid refreshing. This is okay because update interval is 1 day minimum.
*
* @param jobParameter job parameter
* @param context context
*/
@VisibleForTesting
protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter, final JobExecutionContext context) {
final LockService lockService = context.getLockService();
protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter) {
return () -> {
lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> {
ip2GeoLockService.acquireLock(jobParameter.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
return;
}
try {
updateDatasource(jobParameter);
updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock)));
} finally {
lockService.release(
ip2GeoLockService.releaseLock(
lock,
ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock, exception); })
);
Expand All @@ -120,7 +121,7 @@ protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParamet
}

@VisibleForTesting
protected void updateDatasource(final ScheduledJobParameter jobParameter) throws IOException {
protected void updateDatasource(final ScheduledJobParameter jobParameter, final Runnable renewLock) throws IOException {
Datasource datasource = datasourceFacade.getDatasource(jobParameter.getName());
/**
* If delete request comes while update task is waiting on a queue for other update tasks to complete,
Expand All @@ -143,7 +144,7 @@ protected void updateDatasource(final ScheduledJobParameter jobParameter) throws

try {
datasourceUpdateService.deleteUnusedIndices(datasource);
datasourceUpdateService.updateOrCreateGeoIpData(datasource);
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
datasourceUpdateService.deleteUnusedIndices(datasource);
} catch (Exception e) {
log.error("Failed to update datasource for {}", datasource.getName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ public DatasourceUpdateService(
/**
* Update GeoIp data
*
* @param datasource
* @param datasource the datasource
* @param renewLock runnable to renew lock
* @throws Exception
*/
public void updateOrCreateGeoIpData(final Datasource datasource) throws Exception {
public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable renewLock) throws Exception {
URL url = new URL(datasource.getEndpoint());
DatasourceManifest manifest = DatasourceManifest.Builder.build(url);

Expand All @@ -77,7 +78,13 @@ public void updateOrCreateGeoIpData(final Datasource datasource) throws Exceptio
datasource.getDatabase().getFields().toString()
);
}
geoIpDataFacade.putGeoIpData(indexName, header, reader.iterator(), clusterSettings.get(Ip2GeoSettings.INDEXING_BULK_SIZE));
geoIpDataFacade.putGeoIpData(
indexName,
header,
reader.iterator(),
clusterSettings.get(Ip2GeoSettings.INDEXING_BULK_SIZE),
renewLock
);
}

Instant endTime = Instant.now();
Expand Down
Loading

0 comments on commit 2e1f9bb

Please sign in to comment.