Skip to content

Commit

Permalink
refactor inmem acquireLock
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelmag110 committed Oct 17, 2024
1 parent f7afb9b commit bc14e85
Showing 1 changed file with 50 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,64 +25,83 @@
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;

import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static java.lang.Thread.sleep;

public class InMemoryEdrLock implements EndpointDataReferenceLock {

private static final int LOCK_TIMEOUT = 5000;
private static final int LOCK_TIMEOUT = 10000;
private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();

private final EndpointDataReferenceEntryIndex entryIndex;
private final TransactionContext transactionContext;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Set<String> lockedEdrs = new HashSet<>();
private final Map<String, ReentrantReadWriteLock> lockedEdrs = new ConcurrentHashMap<>();

public InMemoryEdrLock(EndpointDataReferenceEntryIndex entryIndex, TransactionContext transactionContext) {
this.entryIndex = entryIndex;
this.transactionContext = transactionContext;
}

/*
* This InMemory variant tries to mimic the behaviour of a SELECT WITH FOR UPDATE sql query, which enables a row-level lock.
* The result is either true if the thread acquiring the lock should refresh the token or false if it was already refreshed by another thread.
* A map contains the locks for each row, which should be created by the first thread to get the rights to create it.
* The thread that gets the rights to refresh the EDR should leave this method with a row-level lock,
* which should be terminated by the same thread upon successful refresh.
* Another lock is used to synchronize the read and write to the row-level locks map.
*
* */
@Override
public StoreResult<Boolean> acquireLock(String edrId, DataAddress edr) {
lock.readLock().lock();

LOCK.writeLock().lock();
try {
if (!lockedEdrs.contains(edrId)) {
lock.readLock().unlock();
lock.writeLock().lock();
var edrEntry = transactionContext.execute(() -> entryIndex.findById(edrId));
try {
if (isExpired(edr, edrEntry) && !lockedEdrs.contains(edrId)) {
lockedEdrs.add(edrId);
return StoreResult.success(true);
}
} finally {
lock.readLock().lock();
lock.writeLock().unlock();
var rowLock = lockedEdrs.get(edrId);

if (rowLock == null) {
rowLock = lockedEdrs.get(edrId);
if (rowLock != null) {
LOCK.writeLock().unlock();
rowLock.writeLock().tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
LOCK.writeLock().lock(); // gets the write lock again because it might need to unlock it.
} else {
var newRowLock = new ReentrantReadWriteLock();
newRowLock.writeLock().lock();
lockedEdrs.put(edrId, newRowLock);
}
}

var timeout = 0;
while (lockedEdrs.contains(edrId) && timeout <= LOCK_TIMEOUT) {
//block until row updated
try {
sleep(LOCK_TIMEOUT);
} catch (InterruptedException e) {
throw new RuntimeException(e);
var edrEntry = transactionContext.execute(() -> entryIndex.findById(edrId));
if (isExpired(edr, edrEntry)) {
return StoreResult.success(true); // leaves with the row-level write lock
} else {
lockedEdrs.get(edrId).writeLock().unlock();
return StoreResult.success(false);
}
timeout += 1000;
}
return StoreResult.success(false);
LOCK.writeLock().unlock();
rowLock.writeLock().tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
LOCK.writeLock().lock();


var edrEntry = transactionContext.execute(() -> entryIndex.findById(edrId));
if (isExpired(edr, edrEntry)) {
return StoreResult.success(true); // leaves with the row-level write lock
} else {
rowLock.writeLock().unlock();
return StoreResult.success(false);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.readLock().unlock();
LOCK.writeLock().unlock();
}
}

@Override
public void releaseLock(String edrId) {
lockedEdrs.remove(edrId);
lockedEdrs.get(edrId).writeLock().unlock();
}
}

0 comments on commit bc14e85

Please sign in to comment.