From 508fe9a052dc7ec93953ec9f57544f95ef7f49ca Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Wed, 21 Jun 2023 14:44:36 -0700 Subject: [PATCH] Acquire lock sychronously By acquiring lock asychronously, the remaining part of the code is being run by transport thread which does not allow blocking code. We want only single update happen in a node using single thread. However, it cannot be acheived if I acquire lock asynchronously and pass the listener. Signed-off-by: Heemin Kim --- .../ip2geo/common/Ip2GeoLockService.java | 33 +++++++++ .../ip2geo/jobscheduler/DatasourceRunner.java | 30 +++++--- .../ip2geo/common/Ip2GeoLockServiceTests.java | 8 +++ .../jobscheduler/DatasourceRunnerTests.java | 71 +++++++++---------- 4 files changed, 92 insertions(+), 50 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java index 6b3c83b7..7bd03316 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java @@ -8,6 +8,7 @@ import static org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension.JOB_INDEX_NAME; import java.time.Instant; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -56,6 +57,38 @@ public void acquireLock(final String datasourceName, final Long lockDurationSeco lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener); } + /** + * Synchronous method of #acquireLock + * + * @param datasourceName datasourceName to acquire lock on + * @param lockDurationSeconds the lock duration in seconds + * @return lock model + */ + public Optional acquireLock(final String datasourceName, final Long lockDurationSeconds) { + AtomicReference lockReference = new AtomicReference(); + CountDownLatch countDownLatch = new CountDownLatch(1); + lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, new ActionListener<>() { + @Override + public void onResponse(final LockModel lockModel) { + lockReference.set(lockModel); + countDownLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + lockReference.set(null); + countDownLatch.countDown(); + } + }); + + try { + countDownLatch.await(clusterService.getClusterSettings().get(Ip2GeoSettings.TIMEOUT).getSeconds(), TimeUnit.SECONDS); + return Optional.ofNullable(lockReference.get()); + } catch (InterruptedException e) { + return Optional.empty(); + } + } + /** * Wrapper method of LockService#release * diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java index 8d8938c9..b92a58a8 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java @@ -8,11 +8,11 @@ import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import lombok.extern.log4j.Log4j2; -import org.opensearch.action.ActionListener; import org.opensearch.cluster.service.ClusterService; import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; @@ -20,6 +20,7 @@ import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; @@ -108,16 +109,23 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC @VisibleForTesting protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter) { return () -> { - ip2GeoLockService.acquireLock(jobParameter.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { - if (lock == null) { - return; - } - try { - updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock))); - } finally { - ip2GeoLockService.releaseLock(lock); - } - }, exception -> { log.error("Failed to acquire lock for job [{}]", jobParameter.getName(), exception); })); + Optional lockModel = ip2GeoLockService.acquireLock( + jobParameter.getName(), + Ip2GeoLockService.LOCK_DURATION_IN_SECONDS + ); + if (lockModel.isEmpty()) { + log.error("Failed to update. Another processor is holding a lock for datasource[{}]", jobParameter.getName()); + return; + } + + LockModel lock = lockModel.get(); + try { + updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock))); + } catch (Exception e) { + log.error("Failed to update datasource[{}]", jobParameter.getName(), e); + } finally { + ip2GeoLockService.releaseLock(lock); + } }; } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java index d6bfcd13..5f4299fb 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java @@ -38,6 +38,14 @@ public void testAcquireLock_whenValidInput_thenSucceed() { noOpsLockService.acquireLock(GeospatialTestHelper.randomLowerCaseString(), randomPositiveLong(), mock(ActionListener.class)); } + public void testAcquireLock_whenCalled_thenNotBlocked() { + long expectedDurationInMillis = 1000; + Instant before = Instant.now(); + assertNull(ip2GeoLockService.acquireLock(null, null)); + Instant after = Instant.now(); + assertTrue(after.toEpochMilli() - before.toEpochMilli() < expectedDurationInMillis); + } + public void testReleaseLock_whenValidInput_thenSucceed() { // Cannot test because LockService is final class // Simply calling method to increase coverage diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java index d08dc19a..a5461f3a 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java @@ -6,7 +6,6 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -16,16 +15,15 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import static org.opensearch.geospatial.GeospatialTestHelper.randomLowerCaseString; -import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Optional; import lombok.SneakyThrows; import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.opensearch.action.ActionListener; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; @@ -44,11 +42,15 @@ public void init() { } public void testRunJob_whenInvalidClass_thenThrowException() { - JobExecutionContext jobExecutionContext = mock(JobExecutionContext.class); + JobDocVersion jobDocVersion = new JobDocVersion(randomInt(), randomInt(), randomInt()); + String jobIndexName = randomLowerCaseString(); + String jobId = randomLowerCaseString(); + JobExecutionContext jobExecutionContext = new JobExecutionContext(Instant.now(), jobDocVersion, lockService, jobIndexName, jobId); ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); expectThrows(IllegalStateException.class, () -> DatasourceRunner.getJobRunnerInstance().runJob(jobParameter, jobExecutionContext)); } + @SneakyThrows public void testRunJob_whenValidInput_thenSucceed() { JobDocVersion jobDocVersion = new JobDocVersion(randomInt(), randomInt(), randomInt()); String jobIndexName = randomLowerCaseString(); @@ -56,59 +58,50 @@ public void testRunJob_whenValidInput_thenSucceed() { JobExecutionContext jobExecutionContext = new JobExecutionContext(Instant.now(), jobDocVersion, lockService, jobIndexName, jobId); Datasource datasource = randomDatasource(); + LockModel lockModel = randomLockModel(); + when(ip2GeoLockService.acquireLock(datasource.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS)).thenReturn( + Optional.of(lockModel) + ); + // Run DatasourceRunner.getJobRunnerInstance().runJob(datasource, jobExecutionContext); // Verify - verify(ip2GeoLockService).acquireLock( - eq(datasource.getName()), - eq(Ip2GeoLockService.LOCK_DURATION_IN_SECONDS), - any(ActionListener.class) - ); + verify(ip2GeoLockService).acquireLock(datasource.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS); + verify(datasourceFacade).getDatasource(datasource.getName()); + verify(ip2GeoLockService).releaseLock(lockModel); } @SneakyThrows - public void testUpdateDatasourceRunner_whenFailedToAcquireLock_thenError() { - validateDoExecute(null, null); - } + public void testUpdateDatasourceRunner_whenExceptionBeforeAcquiringLock_thenNoReleaseLock() { + ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); + when(jobParameter.getName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); + when(ip2GeoLockService.acquireLock(jobParameter.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS)).thenThrow( + new RuntimeException() + ); - @SneakyThrows - public void testUpdateDatasourceRunner_whenValidInput_thenSucceed() { - String jobIndexName = GeospatialTestHelper.randomLowerCaseString(); - String jobId = GeospatialTestHelper.randomLowerCaseString(); - LockModel lockModel = new LockModel(jobIndexName, jobId, Instant.now(), randomPositiveLong(), false); - validateDoExecute(lockModel, null); - } + // Run + expectThrows(Exception.class, () -> DatasourceRunner.getJobRunnerInstance().updateDatasourceRunner(jobParameter).run()); - @SneakyThrows - public void testUpdateDatasourceRunner_whenException_thenError() { - validateDoExecute(null, new RuntimeException()); + // Verify + verify(ip2GeoLockService, never()).releaseLock(any()); } - private void validateDoExecute(final LockModel lockModel, final Exception exception) throws IOException { + @SneakyThrows + public void testUpdateDatasourceRunner_whenExceptionAfterAcquiringLock_thenReleaseLock() { ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); when(jobParameter.getName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); + LockModel lockModel = randomLockModel(); + when(ip2GeoLockService.acquireLock(jobParameter.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS)).thenReturn( + Optional.of(lockModel) + ); + when(datasourceFacade.getDatasource(jobParameter.getName())).thenThrow(new RuntimeException()); // Run DatasourceRunner.getJobRunnerInstance().updateDatasourceRunner(jobParameter).run(); // Verify - ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); - verify(ip2GeoLockService).acquireLock(eq(jobParameter.getName()), anyLong(), captor.capture()); - - if (exception == null) { - // Run - captor.getValue().onResponse(lockModel); - - // Verify - verify(ip2GeoLockService, lockModel == null ? never() : times(1)).releaseLock(eq(lockModel)); - } else { - // Run - captor.getValue().onFailure(exception); - - // Verify - verify(ip2GeoLockService, never()).releaseLock(eq(lockModel)); - } + verify(ip2GeoLockService).releaseLock(any()); } @SneakyThrows