Skip to content

Commit

Permalink
[Enhancement #231] Implement a version of UnitOfWork that calculates …
Browse files Browse the repository at this point in the history
…changes on commit and propagates them to the storage.
  • Loading branch information
ledsoft committed Mar 13, 2024
1 parent e4b6c46 commit 9ee732d
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package cz.cvut.kbss.jopa.sessions;

import cz.cvut.kbss.jopa.exceptions.EntityNotFoundException;
import cz.cvut.kbss.jopa.exceptions.OWLEntityExistsException;
import cz.cvut.kbss.jopa.exceptions.OWLPersistenceException;
import cz.cvut.kbss.jopa.model.CacheManager;
import cz.cvut.kbss.jopa.model.EntityState;
import cz.cvut.kbss.jopa.model.JOPAPersistenceProperties;
import cz.cvut.kbss.jopa.model.LoadState;
import cz.cvut.kbss.jopa.model.MetamodelImpl;
import cz.cvut.kbss.jopa.model.descriptors.Descriptor;
Expand Down Expand Up @@ -65,6 +67,7 @@
public abstract class AbstractUnitOfWork extends AbstractSession implements UnitOfWork {

private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class);
protected final IndirectWrapperHelper indirectWrapperHelper;

// Read-only!!! It is just the keyset of cloneToOriginals
final Set<Object> cloneMapping;
Expand Down Expand Up @@ -111,6 +114,7 @@ public AbstractUnitOfWork(AbstractSession parent, Configuration configuration) {
this.changeCalculator = new ChangeCalculator(this);
this.inferredAttributeChangeValidator = new InferredAttributeChangeValidator(storage);
this.isActive = true;
this.indirectWrapperHelper = new IndirectWrapperHelper(this);
}

@Override
Expand Down Expand Up @@ -366,7 +370,7 @@ public <T> T mergeDetached(T entity, Descriptor descriptor) {
}
}

private Object getIdentifier(Object entity) {
Object getIdentifier(Object entity) {
return EntityPropertiesUtils.getIdentifier(entity, getMetamodel());
}

Expand Down Expand Up @@ -551,6 +555,84 @@ void preventCachingIfReferenceIsNotLoaded(ChangeRecord changeRecord) {
}
}

protected ObjectChangeSet processInferredValueChanges(ObjectChangeSet changeSet) {
if (getConfiguration().is(JOPAPersistenceProperties.IGNORE_INFERRED_VALUE_REMOVAL_ON_MERGE)) {
final ObjectChangeSet copy = ChangeSetFactory.createObjectChangeSet(changeSet.getChangedObject(), changeSet.getCloneObject(), changeSet.getEntityDescriptor());
changeSet.getChanges().stream().filter(chr -> !(chr.getAttribute().isInferred() &&
inferredAttributeChangeValidator.isInferredValueRemoval(changeSet.getCloneObject(), changeSet.getChangedObject(),
(FieldSpecification) chr.getAttribute(),
changeSet.getEntityDescriptor()))).forEach(copy::addChangeRecord);
return copy;
} else {
changeSet.getChanges().stream().filter(chr -> chr.getAttribute().isInferred()).forEach(
chr -> inferredAttributeChangeValidator.validateChange(changeSet.getCloneObject(), changeSet.getChangedObject(),
(FieldSpecification) chr.getAttribute(),
changeSet.getEntityDescriptor()));
return changeSet;
}
}

protected <T> T getInstanceForMerge(URI identifier, EntityType<T> et, Descriptor descriptor) {
if (keysToClones.containsKey(identifier)) {
return (T) keysToClones.get(identifier);
}
final LoadingParameters<T> params = new LoadingParameters<>(et.getJavaType(), identifier, descriptor, true);
T original = storage.find(params);
assert original != null;

return (T) registerExistingObject(original, descriptor);
}

protected void evictAfterMerge(EntityType<?> et, URI identifier, Descriptor descriptor) {
if (getLiveObjectCache().contains(et.getJavaType(), identifier, descriptor)) {
getLiveObjectCache().evict(et.getJavaType(), identifier, descriptor.getSingleContext().orElse(null));
}
getMetamodel().getReferringTypes(et.getJavaType()).forEach(getLiveObjectCache()::evict);
}

protected static ObjectChangeSet copyChangeSet(ObjectChangeSet changeSet, Object original, Object clone,
Descriptor descriptor) {
final ObjectChangeSet newChangeSet = ChangeSetFactory.createObjectChangeSet(original, clone, descriptor);
changeSet.getChanges().forEach(newChangeSet::addChangeRecord);
return newChangeSet;
}

@Override
public <T> void refreshObject(T object) {
Objects.requireNonNull(object);
ensureManaged(object);

final IdentifiableEntityType<T> et = entityType((Class<T>) object.getClass());
final URI idUri = EntityPropertiesUtils.getIdentifier(object, et);
final Descriptor descriptor = getDescriptor(object);

final LoadingParameters<T> params = new LoadingParameters<>(et.getJavaType(), idUri, descriptor, true);
params.bypassCache();
final ConnectionWrapper connection = acquireConnection();
try {
uowChangeSet.cancelObjectChanges(getOriginal(object));
T original = connection.find(params);
if (original == null) {
throw new EntityNotFoundException("Entity " + object + " no longer exists in the repository.");
}
T source = (T) cloneBuilder.buildClone(original, new CloneConfiguration(descriptor, false));
final ObjectChangeSet chSet = ChangeSetFactory.createObjectChangeSet(source, object, descriptor);
changeCalculator.calculateChanges(chSet);
new RefreshInstanceMerger(indirectWrapperHelper).mergeChanges(chSet);
revertTransactionalChanges(object, descriptor, chSet);
registerClone(object, original, descriptor);
et.getLifecycleListenerManager().invokePostLoadCallbacks(object);
} finally {
connection.close();
}
}

private <T> void revertTransactionalChanges(T object, Descriptor descriptor, ObjectChangeSet chSet) {
for (ChangeRecord change : chSet.getChanges()) {
storage.merge(object, (FieldSpecification<? super T, ?>) change.getAttribute(), descriptor.getAttributeDescriptor(change.getAttribute()));
}
}

@Override
public void registerNewObject(Object entity, Descriptor descriptor) {
Objects.requireNonNull(entity);
Expand Down Expand Up @@ -592,26 +674,6 @@ private boolean isIndividualManaged(Object identifier, Object entity) {
return keysToClones.containsKey(identifier) || newObjectsKeyToClone.containsKey(identifier) && !cloneMapping.contains(entity);
}

@Override
public void removeObject(Object entity) {
assert entity != null;
ensureManaged(entity);

final IdentifiableEntityType<?> et = entityType(entity.getClass());
et.getLifecycleListenerManager().invokePreRemoveCallbacks(entity);
final Object primaryKey = getIdentifier(entity);
final Descriptor descriptor = getDescriptor(entity);

if (hasNew && newObjectsCloneToOriginal.containsKey(entity)) {
unregisterObject(entity);
newObjectsKeyToClone.remove(primaryKey);
} else {
deletedObjects.put(entity, entity);
this.hasDeleted = true;
}
storage.remove(primaryKey, et.getJavaType(), descriptor);
et.getLifecycleListenerManager().invokePostRemoveCallbacks(entity);
}

<T> void ensureManaged(T object) {
if (!isObjectManaged(object)) {
Expand Down Expand Up @@ -814,4 +876,14 @@ public <T> T unwrap(Class<T> cls) {
}
return storage.unwrap(cls);
}

protected void markCloneForDeletion(Object entity, Object identifier) {
if (hasNew && newObjectsCloneToOriginal.containsKey(entity)) {
unregisterObject(entity);
newObjectsKeyToClone.remove(identifier);
} else {
deletedObjects.put(entity, entity);
this.hasDeleted = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package cz.cvut.kbss.jopa.sessions;

import cz.cvut.kbss.jopa.adapters.IndirectWrapper;
import cz.cvut.kbss.jopa.exceptions.EntityNotFoundException;
import cz.cvut.kbss.jopa.exceptions.OWLEntityExistsException;
import cz.cvut.kbss.jopa.model.JOPAPersistenceProperties;
import cz.cvut.kbss.jopa.model.LoadState;
import cz.cvut.kbss.jopa.model.Manageable;
import cz.cvut.kbss.jopa.model.descriptors.Descriptor;
Expand All @@ -24,13 +22,10 @@

import static cz.cvut.kbss.jopa.utils.EntityPropertiesUtils.getValueAsURI;

public class ChangeTrackingUnitOfWork extends AbstractUnitOfWork{

private final IndirectWrapperHelper indirectWrapperHelper;
public class ChangeTrackingUnitOfWork extends AbstractUnitOfWork {

public ChangeTrackingUnitOfWork(AbstractSession parent, Configuration configuration) {
super(parent, configuration);
this.indirectWrapperHelper = new IndirectWrapperHelper(this);
}

@Override
Expand Down Expand Up @@ -59,8 +54,8 @@ private void checkForIndirectObjects(Object entity) {
/**
* Create and set indirect collection on the specified entity field.
* <p>
* If the specified field is of Collection type, and it is not already an indirect collection, create new one and set
* it as the value of the specified field on the specified entity.
* If the specified field is of Collection type, and it is not already an indirect collection, create new one and
* set it as the value of the specified field on the specified entity.
*
* @param entity The entity collection will be set on
* @param field The field to set
Expand Down Expand Up @@ -231,10 +226,10 @@ <T> T mergeDetachedInternal(T entity, Descriptor descriptor) {

final T clone = getInstanceForMerge(idUri, et, descriptor);
try {
// Merge only the changed attributes
ObjectChangeSet chSet = ChangeSetFactory.createObjectChangeSet(clone, entity, descriptor);
// Have to check for inferred attribute changes before the actual merge
// Merge only the changed attributes
changeCalculator.calculateChanges(chSet);
// Have to check for inferred attribute changes before the actual merge
chSet = processInferredValueChanges(chSet);
if (chSet.hasChanges()) {
et.getLifecycleListenerManager().invokePreUpdateCallbacks(clone);
Expand All @@ -258,84 +253,6 @@ <T> T mergeDetachedInternal(T entity, Descriptor descriptor) {
return et.getJavaType().cast(clone);
}

private <T> T getInstanceForMerge(URI identifier, EntityType<T> et, Descriptor descriptor) {
if (keysToClones.containsKey(identifier)) {
return (T) keysToClones.get(identifier);
}
final LoadingParameters<T> params = new LoadingParameters<>(et.getJavaType(), identifier, descriptor, true);
T original = storage.find(params);
assert original != null;

return (T) registerExistingObject(original, descriptor);
}

private ObjectChangeSet processInferredValueChanges(ObjectChangeSet changeSet) {
if (getConfiguration().is(JOPAPersistenceProperties.IGNORE_INFERRED_VALUE_REMOVAL_ON_MERGE)) {
final ObjectChangeSet copy = ChangeSetFactory.createObjectChangeSet(changeSet.getChangedObject(), changeSet.getCloneObject(), changeSet.getEntityDescriptor());
changeSet.getChanges().stream().filter(chr -> !(chr.getAttribute().isInferred() &&
inferredAttributeChangeValidator.isInferredValueRemoval(changeSet.getCloneObject(), changeSet.getChangedObject(),
(FieldSpecification) chr.getAttribute(),
changeSet.getEntityDescriptor()))).forEach(copy::addChangeRecord);
return copy;
} else {
changeSet.getChanges().stream().filter(chr -> chr.getAttribute().isInferred()).forEach(
chr -> inferredAttributeChangeValidator.validateChange(changeSet.getCloneObject(), changeSet.getChangedObject(),
(FieldSpecification) chr.getAttribute(),
changeSet.getEntityDescriptor()));
return changeSet;
}
}

private static ObjectChangeSet copyChangeSet(ObjectChangeSet changeSet, Object original, Object clone,
Descriptor descriptor) {
final ObjectChangeSet newChangeSet = ChangeSetFactory.createObjectChangeSet(original, clone, descriptor);
changeSet.getChanges().forEach(newChangeSet::addChangeRecord);
return newChangeSet;
}

private void evictAfterMerge(EntityType<?> et, URI identifier, Descriptor descriptor) {
if (getLiveObjectCache().contains(et.getJavaType(), identifier, descriptor)) {
getLiveObjectCache().evict(et.getJavaType(), identifier, descriptor.getSingleContext().orElse(null));
}
getMetamodel().getReferringTypes(et.getJavaType()).forEach(getLiveObjectCache()::evict);
}

@Override
public <T> void refreshObject(T object) {
Objects.requireNonNull(object);
ensureManaged(object);

final IdentifiableEntityType<T> et = entityType((Class<T>) object.getClass());
final URI idUri = EntityPropertiesUtils.getIdentifier(object, et);
final Descriptor descriptor = getDescriptor(object);

final LoadingParameters<T> params = new LoadingParameters<>(et.getJavaType(), idUri, descriptor, true);
params.bypassCache();
final ConnectionWrapper connection = acquireConnection();
try {
uowChangeSet.cancelObjectChanges(getOriginal(object));
T original = connection.find(params);
if (original == null) {
throw new EntityNotFoundException("Entity " + object + " no longer exists in the repository.");
}
T source = (T) cloneBuilder.buildClone(original, new CloneConfiguration(descriptor, false));
final ObjectChangeSet chSet = ChangeSetFactory.createObjectChangeSet(source, object, descriptor);
changeCalculator.calculateChanges(chSet);
new RefreshInstanceMerger(indirectWrapperHelper).mergeChanges(chSet);
revertTransactionalChanges(object, descriptor, chSet);
registerClone(object, original, descriptor);
et.getLifecycleListenerManager().invokePostLoadCallbacks(object);
} finally {
connection.close();
}
}

private <T> void revertTransactionalChanges(T object, Descriptor descriptor, ObjectChangeSet chSet) {
for (ChangeRecord change : chSet.getChanges()) {
storage.merge(object, (FieldSpecification<? super T, ?>) change.getAttribute(), descriptor.getAttributeDescriptor(change.getAttribute()));
}
}

@Override
public void registerNewObject(Object entity, Descriptor descriptor) {
super.registerNewObject(entity, descriptor);
Expand All @@ -348,4 +265,20 @@ public void unregisterObject(Object object) {
removeIndirectWrappers(object);
deregisterEntityFromPersistenceContext(object);
}

@Override
public void removeObject(Object entity) {
assert entity != null;
ensureManaged(entity);

final IdentifiableEntityType<?> et = entityType(entity.getClass());
et.getLifecycleListenerManager().invokePreRemoveCallbacks(entity);
final Object identifier = getIdentifier(entity);
// Get the descriptor before clone is removed
final Descriptor descriptor = getDescriptor(entity);

markCloneForDeletion(entity, identifier);
storage.remove(identifier, et.getJavaType(), descriptor);
et.getLifecycleListenerManager().invokePostRemoveCallbacks(entity);
}
}
Loading

0 comments on commit 9ee732d

Please sign in to comment.