-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* added class to centralize integration test collection management * converted a few tests touse IntegrationTestCollectionManager * added mechanism to lock a cosmos container against concurrent use * integrated cosmos lock into IntegrationTestCollectionManager * changed IntegrationTestCollectionManager package * actually acquire the lease on container creation * added headers * converted failing test to use IntegrationTestCollectionManager * removed use of Stopwatch * add support for reactive template to ContainerLock * added reactive support to IntegrationTestCollectionManager * moved inner classes to top level * converted remaining tests to use IntegrationTestCollectionManager * update locking mechanism to use separate container * wait until the test run completes to delete the created collections * use collection manager for cleanup
- Loading branch information
1 parent
94285f5
commit 1d905cd
Showing
41 changed files
with
824 additions
and
698 deletions.
There are no files selected for viewing
177 changes: 177 additions & 0 deletions
177
.../src/test/java/com/azure/spring/data/cosmos/AbstractIntegrationTestCollectionManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// Licensed under the MIT License. | ||
package com.azure.spring.data.cosmos; | ||
|
||
import com.azure.cosmos.models.CosmosContainerProperties; | ||
import com.azure.spring.data.cosmos.core.CosmosTemplate; | ||
import com.azure.spring.data.cosmos.core.ReactiveCosmosTemplate; | ||
import com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation; | ||
import org.junit.rules.TestRule; | ||
import org.junit.runner.Description; | ||
import org.junit.runners.model.Statement; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.time.Duration; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
|
||
public abstract class AbstractIntegrationTestCollectionManager<T> implements TestRule { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntegrationTestCollectionManager.class); | ||
private static final Duration LEASE_DURATION = Duration.ofMinutes(5); | ||
private static final ConcurrentMap<String, DeleteContainerAction> CONTAINER_CLEANUP_REGISTRY = new ConcurrentHashMap<>(); | ||
|
||
static { | ||
// since collections are sometimes re-used between tests, wait until the end of the test run to delete them | ||
Runtime.getRuntime().addShutdownHook(new Thread(AbstractIntegrationTestCollectionManager::deleteRegisteredCollections)); | ||
} | ||
|
||
public static void registerContainerForCleanup(Object template, String containerName) { | ||
DeleteContainerAction action; | ||
if (template instanceof CosmosTemplate) { | ||
action = new DeleteContainerAction((CosmosTemplate) template, containerName); | ||
} else if (template instanceof ReactiveCosmosTemplate) { | ||
action = new DeleteContainerAction((ReactiveCosmosTemplate) template, containerName); | ||
} else { | ||
throw new IllegalStateException("Template must be instance of CosmosTemplate or ReactiveCosmosTemplate, was " + template); | ||
} | ||
CONTAINER_CLEANUP_REGISTRY.putIfAbsent(containerName, action); | ||
} | ||
|
||
private static void deleteRegisteredCollections() { | ||
CONTAINER_CLEANUP_REGISTRY.values().forEach(DeleteContainerAction::deleteContainer); | ||
} | ||
|
||
protected T template; | ||
private Map<Class, ContainerRefs> containerRefs = new HashMap<>(); | ||
private boolean isSetupDone; | ||
|
||
protected abstract ContainerLock createLock(CosmosEntityInformation entityInfo, Duration leaseDuration); | ||
protected abstract CosmosContainerProperties createContainerIfNotExists(CosmosEntityInformation entityInfo); | ||
protected abstract void deleteContainerData(CosmosEntityInformation entityInfo); | ||
protected abstract void deleteContainer(CosmosEntityInformation entityInfo); | ||
|
||
public void ensureContainersCreated(T template, Class... entityTypes) { | ||
if (!isSetupDone) { | ||
this.template = template; | ||
initContainerRefs(entityTypes); | ||
isSetupDone = true; | ||
} else { | ||
refreshContainerLeases(); | ||
} | ||
} | ||
|
||
private void initContainerRefs(Class[] entityTypes) { | ||
for (Class entityType : entityTypes) { | ||
CosmosEntityInformation entityInfo = new CosmosEntityInformation(entityType); | ||
CosmosContainerProperties properties = createContainerIfNotExists(entityInfo); | ||
registerContainerForCleanup(template, entityInfo.getContainerName()); | ||
ContainerLock lock = createLock(entityInfo, LEASE_DURATION); | ||
lock.acquire(LEASE_DURATION.multipliedBy(2)); | ||
containerRefs.put(entityType, new ContainerRefs(entityInfo, properties, lock)); | ||
} | ||
} | ||
|
||
public void ensureContainersCreatedAndEmpty(T template, Class... entityTypes) { | ||
ensureContainersCreated(template, entityTypes); | ||
deleteContainerData(); | ||
} | ||
|
||
public <E> CosmosEntityInformation<E, ?> getEntityInformation(Class<E> entityType) { | ||
return containerRefs.get(entityType).cosmosEntityInformation; | ||
} | ||
|
||
public CosmosContainerProperties getContainerProperties(Class entityType) { | ||
return containerRefs.get(entityType).cosmosContainerProperties; | ||
} | ||
|
||
public String getContainerName(Class entityType) { | ||
return containerRefs.get(entityType).getContainerName(); | ||
} | ||
|
||
private void deleteContainerData() { | ||
for (ContainerRefs containerRef : containerRefs.values()) { | ||
deleteContainerData(containerRef.cosmosEntityInformation); | ||
} | ||
} | ||
|
||
private void refreshContainerLeases() { | ||
for (ContainerRefs containerRef : containerRefs.values()) { | ||
containerRef.lock.renew(); | ||
} | ||
} | ||
|
||
private void releaseLocks() { | ||
for (ContainerRefs containerRef : containerRefs.values()) { | ||
try { | ||
containerRef.lock.release(); | ||
} catch (Exception ex) { | ||
LOGGER.info("Failed to delete lock for container=" + containerRef.getContainerName()); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public Statement apply(Statement base, Description description) { | ||
return new Statement() { | ||
public void evaluate() throws Throwable { | ||
try { | ||
base.evaluate(); | ||
} finally { | ||
releaseLocks(); | ||
} | ||
} | ||
}; | ||
} | ||
|
||
private static class ContainerRefs { | ||
|
||
CosmosEntityInformation cosmosEntityInformation; | ||
CosmosContainerProperties cosmosContainerProperties; | ||
ContainerLock lock; | ||
|
||
public ContainerRefs(CosmosEntityInformation cosmosEntityInformation, CosmosContainerProperties cosmosContainerProperties, ContainerLock lock) { | ||
this.cosmosEntityInformation = cosmosEntityInformation; | ||
this.cosmosContainerProperties = cosmosContainerProperties; | ||
this.lock = lock; | ||
} | ||
|
||
public String getContainerName() { | ||
return cosmosEntityInformation.getContainerName(); | ||
} | ||
|
||
} | ||
|
||
private static class DeleteContainerAction { | ||
|
||
private CosmosTemplate template; | ||
private ReactiveCosmosTemplate reactiveTemplate; | ||
private String containerName; | ||
|
||
public DeleteContainerAction(CosmosTemplate template, String containerName) { | ||
this.template = template; | ||
this.containerName = containerName; | ||
} | ||
|
||
public DeleteContainerAction(ReactiveCosmosTemplate reactiveTemplate, String containerName) { | ||
this.reactiveTemplate = reactiveTemplate; | ||
this.containerName = containerName; | ||
} | ||
|
||
public void deleteContainer() { | ||
try { | ||
if (template != null) { | ||
template.deleteContainer(containerName); | ||
} else { | ||
reactiveTemplate.deleteContainer(containerName); | ||
} | ||
} catch (Exception ex) { | ||
LOGGER.info("Failed to delete container=" + containerName); | ||
} | ||
} | ||
} | ||
|
||
} |
212 changes: 212 additions & 0 deletions
212
...ure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/ContainerLock.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// Licensed under the MIT License. | ||
package com.azure.spring.data.cosmos; | ||
|
||
import com.azure.spring.data.cosmos.core.CosmosTemplate; | ||
import com.azure.spring.data.cosmos.core.ReactiveCosmosTemplate; | ||
import com.azure.spring.data.cosmos.core.mapping.PartitionKey; | ||
import com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation; | ||
import com.fasterxml.jackson.annotation.JsonIgnore; | ||
import org.springframework.data.annotation.Id; | ||
import org.springframework.data.annotation.Version; | ||
|
||
import java.time.Duration; | ||
import java.time.OffsetDateTime; | ||
|
||
public class ContainerLock { | ||
|
||
private static CosmosEntityInformation<LockEntry, String> lockEntityInfo; | ||
|
||
private LockStore lockStore; | ||
private Duration leaseDuration; | ||
private String lockName; | ||
private LockEntry acquiredLock; | ||
|
||
public ContainerLock(CosmosTemplate template, String lockName, Duration leaseDuration) { | ||
this.lockStore = new NonReactiveLockStore(template); | ||
this.lockName = lockName; | ||
this.leaseDuration = leaseDuration; | ||
initLockContainer(lockStore, template); | ||
} | ||
|
||
public ContainerLock(ReactiveCosmosTemplate reactiveTemplate, String lockName, Duration leaseDuration) { | ||
this.lockStore = new ReactiveLockStore(reactiveTemplate); | ||
this.lockName = lockName; | ||
this.leaseDuration = leaseDuration; | ||
initLockContainer(lockStore, reactiveTemplate); | ||
} | ||
|
||
private static synchronized void initLockContainer(LockStore lockStore, Object template) { | ||
if (lockEntityInfo == null) { | ||
CosmosEntityInformation<LockEntry, String> info = new CosmosEntityInformation<>(LockEntry.class); | ||
lockStore.createContainerIfNotExists(info); | ||
AbstractIntegrationTestCollectionManager.registerContainerForCleanup(template, info.getContainerName()); | ||
lockEntityInfo = info; | ||
} | ||
} | ||
|
||
public void acquire(Duration tryForDuration) { | ||
long started = System.currentTimeMillis(); | ||
LockEntry entry = new LockEntry(lockName, OffsetDateTime.now().plus(leaseDuration)); | ||
while (acquiredLock == null) { | ||
try { | ||
acquiredLock = lockStore.insertLock(entry); | ||
} catch (Exception ex) { | ||
if (shouldKeepTryingToAcquire(started, tryForDuration)) { | ||
sleep(500); | ||
releaseIfLeaseExpired(); | ||
} else { | ||
throw new LockAcquisitionFailedException(tryForDuration); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private boolean shouldKeepTryingToAcquire(long started, Duration tryForDuration) { | ||
long elapsedDuration = System.currentTimeMillis() - started; | ||
return elapsedDuration <= tryForDuration.toMillis(); | ||
} | ||
|
||
private void sleep(long millis) { | ||
try { | ||
Thread.sleep(millis); | ||
} catch (InterruptedException e) { | ||
// ignored | ||
} | ||
} | ||
|
||
private void releaseIfLeaseExpired() { | ||
LockEntry entry = lockStore.findActiveLock(lockName); | ||
if (entry != null && entry.isLeaseExpired()) { | ||
acquiredLock = entry; | ||
release(); | ||
} | ||
} | ||
|
||
public void release() { | ||
if (acquiredLock != null) { | ||
lockStore.deleteLock(acquiredLock); | ||
acquiredLock = null; | ||
} | ||
} | ||
|
||
public void renew() { | ||
if (acquiredLock != null) { | ||
acquiredLock.leaseExpiration = OffsetDateTime.now().plus(leaseDuration); | ||
acquiredLock = lockStore.refreshLock(acquiredLock); | ||
} | ||
} | ||
|
||
public OffsetDateTime getLeaseExpiration() { | ||
if (acquiredLock == null) { | ||
return null; | ||
} | ||
return acquiredLock.leaseExpiration; | ||
} | ||
|
||
private interface LockStore { | ||
LockEntry insertLock(LockEntry entry); | ||
LockEntry findActiveLock(String id); | ||
LockEntry refreshLock(LockEntry entry); | ||
void deleteLock(LockEntry entry); | ||
void createContainerIfNotExists(CosmosEntityInformation entityInfo); | ||
} | ||
|
||
private static class NonReactiveLockStore implements LockStore { | ||
|
||
private final CosmosTemplate template; | ||
|
||
public NonReactiveLockStore(CosmosTemplate template) { | ||
this.template = template; | ||
} | ||
|
||
@Override | ||
public LockEntry insertLock(LockEntry entry) { | ||
return template.insert(lockEntityInfo.getContainerName(), entry); | ||
} | ||
|
||
@Override | ||
public LockEntry findActiveLock(String id) { | ||
return template.findById(lockEntityInfo.getContainerName(), id, LockEntry.class); | ||
} | ||
|
||
@Override | ||
public LockEntry refreshLock(LockEntry entry) { | ||
return template.upsertAndReturnEntity(lockEntityInfo.getContainerName(), entry); | ||
} | ||
|
||
@Override | ||
public void deleteLock(LockEntry entry) { | ||
template.deleteEntity(lockEntityInfo.getContainerName(), entry); | ||
} | ||
|
||
@Override | ||
public void createContainerIfNotExists(CosmosEntityInformation entityInfo) { | ||
template.createContainerIfNotExists(entityInfo); | ||
} | ||
} | ||
|
||
private static class ReactiveLockStore implements LockStore { | ||
|
||
private final ReactiveCosmosTemplate template; | ||
|
||
public ReactiveLockStore(ReactiveCosmosTemplate template) { | ||
this.template = template; | ||
} | ||
|
||
@Override | ||
public LockEntry insertLock(LockEntry entry) { | ||
return template.insert(lockEntityInfo.getContainerName(), entry).block(); | ||
} | ||
|
||
@Override | ||
public LockEntry findActiveLock(String id) { | ||
return template.findById(lockEntityInfo.getContainerName(), id, LockEntry.class).block(); | ||
} | ||
|
||
@Override | ||
public LockEntry refreshLock(LockEntry entry) { | ||
return template.upsert(lockEntityInfo.getContainerName(), entry).block(); | ||
} | ||
|
||
@Override | ||
public void deleteLock(LockEntry entry) { | ||
template.deleteEntity(lockEntityInfo.getContainerName(), entry).block(); | ||
} | ||
|
||
@Override | ||
public void createContainerIfNotExists(CosmosEntityInformation entityInfo) { | ||
template.createContainerIfNotExists(entityInfo).block(); | ||
} | ||
} | ||
|
||
static class LockEntry { | ||
@Id | ||
@PartitionKey | ||
public String id; | ||
@Version | ||
public String version; | ||
public OffsetDateTime leaseExpiration; | ||
|
||
public LockEntry() { | ||
} | ||
|
||
public LockEntry(String id, OffsetDateTime leaseExpiration) { | ||
this.id = id; | ||
this.leaseExpiration = leaseExpiration; | ||
} | ||
|
||
@JsonIgnore | ||
public boolean isLeaseExpired() { | ||
return OffsetDateTime.now().isAfter(leaseExpiration); | ||
} | ||
|
||
} | ||
|
||
static class LockAcquisitionFailedException extends RuntimeException { | ||
public LockAcquisitionFailedException(Duration tryForDuration) { | ||
super("Failed to acquire lock within " + tryForDuration); | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.