diff --git a/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLock.java b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLock.java index 75d6a2ac1..d114294b4 100644 --- a/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLock.java +++ b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLock.java @@ -25,64 +25,83 @@ import org.eclipse.edc.transaction.spi.TransactionContext; import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; -import java.util.HashSet; -import java.util.Set; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static java.lang.Thread.sleep; public class InMemoryEdrLock implements EndpointDataReferenceLock { - private static final int LOCK_TIMEOUT = 5000; + private static final int LOCK_TIMEOUT = 10000; + private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock(); private final EndpointDataReferenceEntryIndex entryIndex; private final TransactionContext transactionContext; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final Set lockedEdrs = new HashSet<>(); + private final Map lockedEdrs = new ConcurrentHashMap<>(); public InMemoryEdrLock(EndpointDataReferenceEntryIndex entryIndex, TransactionContext transactionContext) { this.entryIndex = entryIndex; this.transactionContext = transactionContext; } + /* + * This InMemory variant tries to mimic the behaviour of a SELECT WITH FOR UPDATE sql query, which enables a row-level lock. + * The result is either true if the thread acquiring the lock should refresh the token or false if it was already refreshed by another thread. + * A map contains the locks for each row, which should be created by the first thread to get the rights to create it. + * The thread that gets the rights to refresh the EDR should leave this method with a row-level lock, + * which should be terminated by the same thread upon successful refresh. + * Another lock is used to synchronize the read and write to the row-level locks map. + * + * */ @Override public StoreResult acquireLock(String edrId, DataAddress edr) { - lock.readLock().lock(); + + LOCK.writeLock().lock(); try { - if (!lockedEdrs.contains(edrId)) { - lock.readLock().unlock(); - lock.writeLock().lock(); - var edrEntry = transactionContext.execute(() -> entryIndex.findById(edrId)); - try { - if (isExpired(edr, edrEntry) && !lockedEdrs.contains(edrId)) { - lockedEdrs.add(edrId); - return StoreResult.success(true); - } - } finally { - lock.readLock().lock(); - lock.writeLock().unlock(); + var rowLock = lockedEdrs.get(edrId); + + if (rowLock == null) { + rowLock = lockedEdrs.get(edrId); + if (rowLock != null) { + LOCK.writeLock().unlock(); + rowLock.writeLock().tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS); + LOCK.writeLock().lock(); // gets the write lock again because it might need to unlock it. + } else { + var newRowLock = new ReentrantReadWriteLock(); + newRowLock.writeLock().lock(); + lockedEdrs.put(edrId, newRowLock); } - } - var timeout = 0; - while (lockedEdrs.contains(edrId) && timeout <= LOCK_TIMEOUT) { - //block until row updated - try { - sleep(LOCK_TIMEOUT); - } catch (InterruptedException e) { - throw new RuntimeException(e); + var edrEntry = transactionContext.execute(() -> entryIndex.findById(edrId)); + if (isExpired(edr, edrEntry)) { + return StoreResult.success(true); // leaves with the row-level write lock + } else { + lockedEdrs.get(edrId).writeLock().unlock(); + return StoreResult.success(false); } - timeout += 1000; } - return StoreResult.success(false); + LOCK.writeLock().unlock(); + rowLock.writeLock().tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS); + LOCK.writeLock().lock(); + + var edrEntry = transactionContext.execute(() -> entryIndex.findById(edrId)); + if (isExpired(edr, edrEntry)) { + return StoreResult.success(true); // leaves with the row-level write lock + } else { + rowLock.writeLock().unlock(); + return StoreResult.success(false); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); } finally { - lock.readLock().unlock(); + LOCK.writeLock().unlock(); } } @Override public void releaseLock(String edrId) { - lockedEdrs.remove(edrId); + lockedEdrs.get(edrId).writeLock().unlock(); } }