diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java index 6b3c83b7..7bd03316 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java @@ -8,6 +8,7 @@ import static org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension.JOB_INDEX_NAME; import java.time.Instant; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -56,6 +57,38 @@ public void acquireLock(final String datasourceName, final Long lockDurationSeco lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener); } + /** + * Synchronous method of #acquireLock + * + * @param datasourceName datasourceName to acquire lock on + * @param lockDurationSeconds the lock duration in seconds + * @return lock model + */ + public Optional acquireLock(final String datasourceName, final Long lockDurationSeconds) { + AtomicReference lockReference = new AtomicReference(); + CountDownLatch countDownLatch = new CountDownLatch(1); + lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, new ActionListener<>() { + @Override + public void onResponse(final LockModel lockModel) { + lockReference.set(lockModel); + countDownLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + lockReference.set(null); + countDownLatch.countDown(); + } + }); + + try { + countDownLatch.await(clusterService.getClusterSettings().get(Ip2GeoSettings.TIMEOUT).getSeconds(), TimeUnit.SECONDS); + return Optional.ofNullable(lockReference.get()); + } catch (InterruptedException e) { + return Optional.empty(); + } + } + /** * Wrapper method of LockService#release * diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java index 8d8938c9..b92a58a8 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java @@ -8,11 +8,11 @@ import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import lombok.extern.log4j.Log4j2; -import org.opensearch.action.ActionListener; import org.opensearch.cluster.service.ClusterService; import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; @@ -20,6 +20,7 @@ import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; @@ -108,16 +109,23 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC @VisibleForTesting protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter) { return () -> { - ip2GeoLockService.acquireLock(jobParameter.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { - if (lock == null) { - return; - } - try { - updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock))); - } finally { - ip2GeoLockService.releaseLock(lock); - } - }, exception -> { log.error("Failed to acquire lock for job [{}]", jobParameter.getName(), exception); })); + Optional lockModel = ip2GeoLockService.acquireLock( + jobParameter.getName(), + Ip2GeoLockService.LOCK_DURATION_IN_SECONDS + ); + if (lockModel.isEmpty()) { + log.error("Failed to update. Another processor is holding a lock for datasource[{}]", jobParameter.getName()); + return; + } + + LockModel lock = lockModel.get(); + try { + updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock))); + } catch (Exception e) { + log.error("Failed to update datasource[{}]", jobParameter.getName(), e); + } finally { + ip2GeoLockService.releaseLock(lock); + } }; } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java index d6bfcd13..adcfae7f 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java @@ -38,6 +38,14 @@ public void testAcquireLock_whenValidInput_thenSucceed() { noOpsLockService.acquireLock(GeospatialTestHelper.randomLowerCaseString(), randomPositiveLong(), mock(ActionListener.class)); } + public void testAcquireLock_whenCalled_thenNotBlocked() { + long expectedDurationInMillis = 1000; + Instant before = Instant.now(); + assertTrue(ip2GeoLockService.acquireLock(null, null).isEmpty()); + Instant after = Instant.now(); + assertTrue(after.toEpochMilli() - before.toEpochMilli() < expectedDurationInMillis); + } + public void testReleaseLock_whenValidInput_thenSucceed() { // Cannot test because LockService is final class // Simply calling method to increase coverage diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java index d08dc19a..a5461f3a 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java @@ -6,7 +6,6 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -16,16 +15,15 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import static org.opensearch.geospatial.GeospatialTestHelper.randomLowerCaseString; -import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Optional; import lombok.SneakyThrows; import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.opensearch.action.ActionListener; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; @@ -44,11 +42,15 @@ public void init() { } public void testRunJob_whenInvalidClass_thenThrowException() { - JobExecutionContext jobExecutionContext = mock(JobExecutionContext.class); + JobDocVersion jobDocVersion = new JobDocVersion(randomInt(), randomInt(), randomInt()); + String jobIndexName = randomLowerCaseString(); + String jobId = randomLowerCaseString(); + JobExecutionContext jobExecutionContext = new JobExecutionContext(Instant.now(), jobDocVersion, lockService, jobIndexName, jobId); ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); expectThrows(IllegalStateException.class, () -> DatasourceRunner.getJobRunnerInstance().runJob(jobParameter, jobExecutionContext)); } + @SneakyThrows public void testRunJob_whenValidInput_thenSucceed() { JobDocVersion jobDocVersion = new JobDocVersion(randomInt(), randomInt(), randomInt()); String jobIndexName = randomLowerCaseString(); @@ -56,59 +58,50 @@ public void testRunJob_whenValidInput_thenSucceed() { JobExecutionContext jobExecutionContext = new JobExecutionContext(Instant.now(), jobDocVersion, lockService, jobIndexName, jobId); Datasource datasource = randomDatasource(); + LockModel lockModel = randomLockModel(); + when(ip2GeoLockService.acquireLock(datasource.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS)).thenReturn( + Optional.of(lockModel) + ); + // Run DatasourceRunner.getJobRunnerInstance().runJob(datasource, jobExecutionContext); // Verify - verify(ip2GeoLockService).acquireLock( - eq(datasource.getName()), - eq(Ip2GeoLockService.LOCK_DURATION_IN_SECONDS), - any(ActionListener.class) - ); + verify(ip2GeoLockService).acquireLock(datasource.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS); + verify(datasourceFacade).getDatasource(datasource.getName()); + verify(ip2GeoLockService).releaseLock(lockModel); } @SneakyThrows - public void testUpdateDatasourceRunner_whenFailedToAcquireLock_thenError() { - validateDoExecute(null, null); - } + public void testUpdateDatasourceRunner_whenExceptionBeforeAcquiringLock_thenNoReleaseLock() { + ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); + when(jobParameter.getName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); + when(ip2GeoLockService.acquireLock(jobParameter.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS)).thenThrow( + new RuntimeException() + ); - @SneakyThrows - public void testUpdateDatasourceRunner_whenValidInput_thenSucceed() { - String jobIndexName = GeospatialTestHelper.randomLowerCaseString(); - String jobId = GeospatialTestHelper.randomLowerCaseString(); - LockModel lockModel = new LockModel(jobIndexName, jobId, Instant.now(), randomPositiveLong(), false); - validateDoExecute(lockModel, null); - } + // Run + expectThrows(Exception.class, () -> DatasourceRunner.getJobRunnerInstance().updateDatasourceRunner(jobParameter).run()); - @SneakyThrows - public void testUpdateDatasourceRunner_whenException_thenError() { - validateDoExecute(null, new RuntimeException()); + // Verify + verify(ip2GeoLockService, never()).releaseLock(any()); } - private void validateDoExecute(final LockModel lockModel, final Exception exception) throws IOException { + @SneakyThrows + public void testUpdateDatasourceRunner_whenExceptionAfterAcquiringLock_thenReleaseLock() { ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); when(jobParameter.getName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); + LockModel lockModel = randomLockModel(); + when(ip2GeoLockService.acquireLock(jobParameter.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS)).thenReturn( + Optional.of(lockModel) + ); + when(datasourceFacade.getDatasource(jobParameter.getName())).thenThrow(new RuntimeException()); // Run DatasourceRunner.getJobRunnerInstance().updateDatasourceRunner(jobParameter).run(); // Verify - ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); - verify(ip2GeoLockService).acquireLock(eq(jobParameter.getName()), anyLong(), captor.capture()); - - if (exception == null) { - // Run - captor.getValue().onResponse(lockModel); - - // Verify - verify(ip2GeoLockService, lockModel == null ? never() : times(1)).releaseLock(eq(lockModel)); - } else { - // Run - captor.getValue().onFailure(exception); - - // Verify - verify(ip2GeoLockService, never()).releaseLock(eq(lockModel)); - } + verify(ip2GeoLockService).releaseLock(any()); } @SneakyThrows