Skip to content

Commit

Permalink
GH-9291: Enhanced unlock() method of JdbcLock to verify successful un…
Browse files Browse the repository at this point in the history
…locking

Fixes: #9291
* Modify `unlock()` method of `JdbcLock`: if the lock ownership can not be removed due to data expiration, a `ConcurrentModificationException` should be thrown.
* Modify `unlock()` method of `RedisLock`: if the lock ownership can not be removed due to data expiration, a `ConcurrentModificationException` should be thrown.
* Maintain test cases
  • Loading branch information
EddieChoCho committed Jun 30, 2024
1 parent ee6fd33 commit 511ea81
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* @author Gary Russell
* @author Alexandre Strubel
* @author Ruslan Stelmachenko
* @author Eddie Cho
*
* @since 4.3
*/
Expand Down Expand Up @@ -389,9 +390,9 @@ public void close() {
}

@Override
public void delete(String lock) {
this.defaultTransactionTemplate.executeWithoutResult(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id));
public boolean delete(String lock) {
return this.defaultTransactionTemplate.execute(
transactionStatus -> this.template.update(this.deleteQuery, this.region, lock, this.id)) > 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.integration.jdbc.lock;

import java.time.Duration;
import java.util.ConcurrentModificationException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -56,6 +57,7 @@
* @author Unseok Kim
* @author Christian Tzolov
* @author Myeonghyeon Lee
* @author Eddie Cho
*
* @since 4.3
*/
Expand Down Expand Up @@ -305,12 +307,21 @@ public void unlock() {
try {
while (true) {
try {
this.mutex.delete(this.path);
return;
if (this.mutex.delete(this.path)) {
return;
}
else {
throw new ConcurrentModificationException();
// the lock is no longer owned by current process, the exception should be handle and rollback the execution result
}
}
catch (TransientDataAccessException | TransactionTimedOutException | TransactionSystemException e) {
// try again
}
catch (ConcurrentModificationException e) {
throw new ConcurrentModificationException("Lock was released in the store due to expiration. " +
"The integrity of data protected by this lock may have been compromised.");
}
catch (Exception e) {
throw new DataAccessResourceFailureException("Failed to release mutex at " + this.path, e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
* @author Dave Syer
* @author Alexandre Strubel
* @author Artem Bilan
* @author Eddie Cho
*
* @since 4.3
*/
Expand All @@ -41,8 +42,9 @@ public interface LockRepository extends Closeable {
/**
* Remove a lock from this repository.
* @param lock the lock to remove.
* @return deleted or not.
*/
void delete(String lock);
boolean delete(String lock);

/**
* Remove all the expired locks.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package org.springframework.integration.jdbc.lock;

import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -31,17 +30,17 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* @author Olivier Hubaut
* @author Fran Aranda
* @author Eddie Cho
*
* @since 5.2.11
*/
public class JdbcLockRegistryDelegateTests {
class JdbcLockRegistryDelegateTests {

private JdbcLockRegistry registry;

Expand All @@ -56,7 +55,7 @@ public void clear() {
}

@Test
public void testLessAmountOfUnlockThanLock() {
void testLessAmountOfUnlockThanLock() {
final Random random = new Random();
final int lockCount = random.nextInt(5) + 1;
final int unlockCount = random.nextInt(lockCount);
Expand All @@ -73,11 +72,13 @@ public void testLessAmountOfUnlockThanLock() {
}

@Test
public void testSameAmountOfUnlockThanLock() {
void testSameAmountOfUnlockThanLock() {
final Random random = new Random();
final int lockCount = random.nextInt(5) + 1;

final Lock lock = registry.obtain("foo");
when(repository.delete(anyString())).thenReturn(true);

for (int i = 0; i < lockCount; i++) {
lock.tryLock();
}
Expand All @@ -89,53 +90,41 @@ public void testSameAmountOfUnlockThanLock() {
}

@Test
public void testTransientDataAccessException() {
void testTransientDataAccessException() {
final Lock lock = registry.obtain("foo");
lock.tryLock();

final AtomicBoolean shouldThrow = new AtomicBoolean(true);
doAnswer(invocation -> {
if (shouldThrow.getAndSet(false)) {
throw mock(TransientDataAccessException.class);
}
return null;
}).when(repository).delete(anyString());
when(repository.delete(anyString()))
.thenThrow(mock(TransientDataAccessException.class))
.thenReturn(true);

lock.unlock();

assertThat(TestUtils.getPropertyValue(lock, "delegate", ReentrantLock.class).isLocked()).isFalse();
}

@Test
public void testTransactionTimedOutException() {
void testTransactionTimedOutException() {
final Lock lock = registry.obtain("foo");
lock.tryLock();

final AtomicBoolean shouldThrow = new AtomicBoolean(true);
doAnswer(invocation -> {
if (shouldThrow.getAndSet(false)) {
throw mock(TransactionTimedOutException.class);
}
return null;
}).when(repository).delete(anyString());
when(repository.delete(anyString()))
.thenThrow(TransactionTimedOutException.class)
.thenReturn(true);

lock.unlock();

assertThat(TestUtils.getPropertyValue(lock, "delegate", ReentrantLock.class).isLocked()).isFalse();
}

@Test
public void testTransactionSystemException() {
void testTransactionSystemException() {
final Lock lock = registry.obtain("foo");
lock.tryLock();

final AtomicBoolean shouldThrow = new AtomicBoolean(true);
doAnswer(invocation -> {
if (shouldThrow.getAndSet(false)) {
throw mock(TransactionSystemException.class);
}
return null;
}).when(repository).delete(anyString());
when(repository.delete(anyString()))
.thenThrow(TransactionSystemException.class)
.thenReturn(true);

lock.unlock();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.integration.jdbc.lock;

import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -45,26 +46,28 @@
import org.springframework.util.StopWatch;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* @author Dave Syer
* @author Artem Bilan
* @author Glenn Renfro
* @author Alexandre Strubel
* @author Eddie Cho
*
* @since 4.3
*/
@SpringJUnitConfig(locations = "JdbcLockRegistryTests-context.xml")
@DirtiesContext
public class JdbcLockRegistryDifferentClientTests {
class JdbcLockRegistryDifferentClientTests {

private static final Log LOGGER = LogFactory.getLog(JdbcLockRegistryDifferentClientTests.class);

@Autowired
private JdbcLockRegistry registry;

@Autowired
private LockRepository client;
private DefaultLockRepository client;

@Autowired
private ConfigurableApplicationContext context;
Expand All @@ -78,6 +81,7 @@ public class JdbcLockRegistryDifferentClientTests {
public void clear() {
this.registry.expireUnusedOlderThan(0);
this.client.close();
this.client.afterPropertiesSet();
this.child = new AnnotationConfigApplicationContext();
this.child.registerBean("childLockRepository", DefaultLockRepository.class, this.dataSource);
this.child.setParent(this.context);
Expand All @@ -92,7 +96,7 @@ public void close() {
}

@Test
public void testSecondThreadLoses() throws Exception {
void testSecondThreadLoses() throws Exception {
for (int i = 0; i < 100; i++) {
final JdbcLockRegistry registry1 = this.registry;
final JdbcLockRegistry registry2 = this.child.getBean(JdbcLockRegistry.class);
Expand Down Expand Up @@ -129,7 +133,7 @@ public void testSecondThreadLoses() throws Exception {
}

@Test
public void testBothLock() throws Exception {
void testBothLock() throws Exception {
for (int i = 0; i < 100; i++) {
final JdbcLockRegistry registry1 = this.registry;
final JdbcLockRegistry registry2 = this.child.getBean(JdbcLockRegistry.class);
Expand Down Expand Up @@ -185,7 +189,7 @@ public void testBothLock() throws Exception {
}

@Test
public void testOnlyOneLock() throws Exception {
void testOnlyOneLock() throws Exception {
for (int i = 0; i < 100; i++) {
final BlockingQueue<String> locked = new LinkedBlockingQueue<>();
final CountDownLatch latch = new CountDownLatch(20);
Expand Down Expand Up @@ -231,7 +235,7 @@ public void testOnlyOneLock() throws Exception {
}

@Test
public void testExclusiveAccess() throws Exception {
void testExclusiveAccess() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setApplicationContext(this.context);
client1.afterPropertiesSet();
Expand Down Expand Up @@ -281,7 +285,7 @@ public void testExclusiveAccess() throws Exception {
}

@Test
public void testOutOfDateLockTaken() throws Exception {
void testOutOfDateLockTaken() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setTimeToLive(100);
client1.setApplicationContext(this.context);
Expand Down Expand Up @@ -314,7 +318,7 @@ public void testOutOfDateLockTaken() throws Exception {
});
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
data.add(2);
lock1.unlock();
assertThatThrownBy(lock1::unlock).isInstanceOf(ConcurrentModificationException.class);
for (int i = 0; i < 2; i++) {
Integer integer = data.poll(10, TimeUnit.SECONDS);
assertThat(integer).isNotNull();
Expand All @@ -323,7 +327,7 @@ public void testOutOfDateLockTaken() throws Exception {
}

@Test
public void testRenewLock() throws Exception {
void testRenewLock() throws Exception {
DefaultLockRepository client1 = new DefaultLockRepository(dataSource);
client1.setTimeToLive(500);
client1.setApplicationContext(this.context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

<bean id="lockClient" class="org.springframework.integration.jdbc.lock.DefaultLockRepository">
<constructor-arg name="dataSource" ref="dataSource"/>
<property name="insertQuery"
value="INSERT INTO INT_LOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE) VALUES (?, ?, ?, ?)"/>
</bean>

<tx:annotation-driven/>
Expand Down
Loading

0 comments on commit 511ea81

Please sign in to comment.