Skip to content

Commit

Permalink
Merge branch 'dynamic-services' into update-service-runtime-ECR-3588
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-timofeev committed Sep 30, 2019
2 parents 5fac9c2 + 3789142 commit 597ab93
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ public final class Fork extends View {
* to cancel it on peer ownership transfer, happening in {@link #intoPatch()}.
*/
private final ProxyDestructor destructor;
/**
* A cleaner for this fork.
*/
private final Cleaner forkCleaner;
/**
* A cleaner for objects depending on the fork. A separate cleaner is needed to be able to destroy
* the objects depending on the fork, primarily — indexes, when it is converted into patch
* and invalidated or rolled-back (which requires collection invalidation).
*
* <p>It is a "child" of the {@link #forkCleaner} which destroys the fork itself and,
* through this cleaner, any dependent objects.
*/
private Cleaner indexCleaner;

/**
* Creates a new owning Fork proxy.
Expand All @@ -59,7 +72,7 @@ public static Fork newInstance(long nativeHandle, Cleaner cleaner) {
* @param nativeHandle a handle of the native Fork object
* @param owningHandle whether a proxy owns the corresponding native object and is responsible
* to clean it up
* @param cleaner a cleaner to perform any operations
* @param cleaner a cleaner to destroy this fork and any dependent objects
*/
public static Fork newInstance(long nativeHandle, boolean owningHandle, Cleaner cleaner) {
checkNotNull(cleaner, "cleaner");
Expand All @@ -72,24 +85,26 @@ public static Fork newInstance(long nativeHandle, boolean owningHandle, Cleaner
}
});

// Create a cleaner for collections. A separate cleaner is needed to be able to destroy
// the objects depending on the fork when it is converted into patch and invalidated
Cleaner forkCleaner = new Cleaner();
cleaner.add(forkCleaner::close);

return new Fork(h, destructor, forkCleaner);
return new Fork(h, destructor, cleaner);
}

/**
* Create a new owning Fork.
*
* @param nativeHandle a handle of the native Fork object
* @param destructor a destructor of the native peer, registered with the parent cleaner
* @param forkCleaner a cleaner for objects depending on the fork
* @param parentCleaner a cleaner for this fork
*/
private Fork(NativeHandle nativeHandle, ProxyDestructor destructor, Cleaner forkCleaner) {
super(nativeHandle, forkCleaner, true);
private Fork(NativeHandle nativeHandle, ProxyDestructor destructor, Cleaner parentCleaner) {
super(nativeHandle, true);
this.destructor = destructor;
this.forkCleaner = parentCleaner;
replaceIndexCleaner();
}

@Override
public Cleaner getCleaner() {
return indexCleaner;
}

/**
Expand All @@ -111,7 +126,7 @@ NativeHandle intoPatch() {

// Close all resources depending on this fork
try {
getCleaner().close();
indexCleaner.close();
} catch (CloseFailuresException e) {
// Destroy this fork and abort the operation if there are any failures
destructor.clean();
Expand Down Expand Up @@ -143,8 +158,7 @@ void createCheckpoint() {
checkState(nativeCanRollback(getNativeHandle()),
"This fork does not support checkpoints");

// TODO: Invalidate all indexes created with the fork or the Core won't let us
// do anything.
closeDependentObjects();

nativeCreateCheckpoint(getNativeHandle());
}
Expand All @@ -159,12 +173,40 @@ void rollback() {
checkState(nativeCanRollback(getNativeHandle()),
"This fork does not support rollbacks");

// TODO: Invalidate all indexes created with the fork or the Core won't let us
// do anything.
closeDependentObjects();

nativeRollback(getNativeHandle());
}

private void closeDependentObjects() {
// Clear the registry of opened indexes as they will be closed
clearOpenIndexes();

// Close the active collections (and any other dependent objects),
// as rollback requires their invalidation
try {
indexCleaner.close();
} catch (CloseFailuresException e) {
// Close failures must not normally happen and usually indicate a serious framework error,
// hence we abort the operation. However, it is not always caused by an error
// in the framework, as the client code can register its own operations in the Cleaner,
// provided by this fork.
destructor.clean();
throw new IllegalStateException(
"Operation aborted due to some objects that had failed to close", e);
}

// Create a new cleaner for indexes instead of the recently closed
replaceIndexCleaner();
}

private void replaceIndexCleaner() {
// Create a new cleaner for collections
indexCleaner = new Cleaner();
// Register in the parent cleaner
forkCleaner.add(indexCleaner::close);
}

/**
* Returns true if this fork can be converted into patch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ void registerIndex(StorageIndex index) {
Optional<StorageIndex> findIndex(IndexAddress address) {
return Optional.ofNullable(indexes.get(address));
}

void clear() {
indexes.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
*/
public final class Snapshot extends View {

private final Cleaner cleaner;

/**
* Creates a new owning Snapshot proxy.
*
Expand Down Expand Up @@ -71,6 +73,12 @@ public static Snapshot newInstance(long nativeHandle, boolean owningHandle, Clea
}

private Snapshot(NativeHandle nativeHandle, Cleaner cleaner) {
super(nativeHandle, cleaner, false);
super(nativeHandle, false);
this.cleaner = cleaner;
}

@Override
public Cleaner getCleaner() {
return cleaner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,22 @@
* <li>A fork, which is a <em>read-write</em> view.</li>
* </ul>
*
* <p>As in some cases the clients need to detect any changes made to a database, a view also
* holds a modification counter, which any clients changing the database state must notify.
*
* @see Snapshot
* @see Fork
*/
public abstract class View extends AbstractNativeProxy {

private final Cleaner cleaner;
private final OpenIndexRegistry indexRegistry = new OpenIndexRegistry();
private final boolean canModify;

/**
* Create a new view proxy.
*
* @param nativeHandle a native handle: an implementation-specific reference to a native object
* @param cleaner a cleaner of resources
* @param canModify if the view allows modifications
*/
View(NativeHandle nativeHandle, Cleaner cleaner, boolean canModify) {
View(NativeHandle nativeHandle, boolean canModify) {
super(nativeHandle);
this.cleaner = cleaner;
this.canModify = canModify;
}

Expand Down Expand Up @@ -103,9 +97,18 @@ public void registerIndex(StorageIndex index) {
}

/**
* Returns the cleaner of this view.
* Clears the registry of open indexes.
*
* <p>This operation does not destroy the indexes in the registry, therefore,
* if it might be needed to access them again, they must be destroyed separately.
*/
public Cleaner getCleaner() {
return cleaner;
void clearOpenIndexes() {
indexRegistry.clear();
}

/**
* Returns the cleaner of this view. It is supposed to be used with collections and other objects
* depending on this view.
*/
public abstract Cleaner getCleaner();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.exonum.binding.core.storage.indices.TestStorageItems.V2;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.exonum.binding.common.serialization.StandardSerializers;
Expand All @@ -30,8 +29,6 @@
import com.exonum.binding.core.storage.indices.ListIndexProxy;
import com.exonum.binding.test.RequiresNativeLibrary;
import java.util.Iterator;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

@RequiresNativeLibrary
Expand Down Expand Up @@ -121,74 +118,110 @@ void forkRegisteredProxiesAreInvalidatedWhenParentCleanerClosed() throws Excepti
);
}

// TODO: Fix this test
@Disabled
@Test
void rollbacksChangesMadeSinceLastCheckpoint() throws Exception {
try (TemporaryDb db = TemporaryDb.newInstance();
Cleaner cleaner = new Cleaner("parent")) {
Fork fork = db.createFork(cleaner);

ListIndex<String> list = newList("list", fork);
list.add("string1");
// Create a list with a single element
String listName = "list";
ListIndex<String> l1 = newList(listName, fork);
l1.add("s1");

// Create a checkpoint
fork.createCheckpoint();

list.add("string2");
assertEquals(2, list.size());
ListIndex<String> l2 = newList(listName, fork);
// Modify the list
l2.add("s2");
assertThat(l2).containsExactly("s1", "s2");

// Rollback the changes
fork.rollback();

assertEquals(1, list.size());
assertEquals("string1", list.get(0));
// Verify the state of the list is equal to the one just before
// the checkpoint was created
ListIndex<String> l3 = newList(listName, fork);
assertThat(l3).containsExactly("s1");
}
}

// TODO: Fix this test
@Disabled
@Test
void rollbackDoesNotAffectDatabase() throws Exception {
try (TemporaryDb db = TemporaryDb.newInstance();
Cleaner cleaner = new Cleaner("cleaner")) {
final String indexName = "list";
String indexName = "list";

Fork fork1 = db.createFork(cleaner);
ListIndex<String> list1 = newList(indexName, fork1);
list1.add("string1");
list1.add("s1");
// Merge the fork, so that the database has a list ["s1"]
db.merge(fork1);

Fork fork2 = db.createFork(cleaner);
ListIndex<String> list2 = newList(indexName, fork2);
assertEquals(1, list2.size());
assertEquals("string1", list2.get(0));
list2.add("string2");
list2.add("string3");
assertEquals(3, list2.size());

list2.add("s2");
list2.add("s3");
assertThat(list2).containsExactly("s1", "s2", "s3");
// Rollback the 2nd fork
fork2.rollback();
// and merge it (a fork with no changes)
db.merge(fork2);

// Only changes from first fork persist in the database, because
// Only changes from the first fork persist in the database, because
// second fork was rolled back.
assertEquals(1, list2.size());
assertEquals("string1", list2.get(0));
Snapshot s = db.createSnapshot(cleaner);
ListIndex<String> list3 = newList(indexName, s);
assertThat(list3).containsExactly("s1");
}
}

// TODO: Fix this test
@Disabled
@Test
void rollbacksAllChangesIfNoCheckpointWasCreated() throws Exception {
try (TemporaryDb db = TemporaryDb.newInstance();
Cleaner cleaner = new Cleaner("parent")) {
Fork fork = db.createFork(cleaner);

ListIndex<String> list = newList("list", fork);
list.add("string1");
list.add("s1");

fork.rollback();

ListIndex<String> list2 = newList("list", fork);
assertEquals(0, list2.size());
assertThat(list2).isEmpty();
}
}

@Test
void createCheckpointInvalidatesDependentObjects() throws CloseFailuresException {
try (TemporaryDb db = TemporaryDb.newInstance();
Cleaner cleaner = new Cleaner("parent")) {
Fork fork = db.createFork(cleaner);

ListIndex<String> l1 = newList("test_list", fork);

// Create a checkpoint
fork.createCheckpoint();

// Check the collections created before checkpoint are inaccessible
assertThrows(IllegalStateException.class, l1::size);
}
}

@Test
void rollbackInvalidatesDependentObjects() throws CloseFailuresException {
try (TemporaryDb db = TemporaryDb.newInstance();
Cleaner cleaner = new Cleaner("parent")) {
Fork fork = db.createFork(cleaner);

ListIndex<String> l1 = newList("test_list", fork);

// Rollback
fork.rollback();

// Check the collections created before rollback are inaccessible
assertThrows(IllegalStateException.class, l1::size);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ void registerThrowsIfAlreadyRegisteredSameAddress() {
.contains(String.valueOf(index))
.contains(String.valueOf(otherIndex));
}

@Test
void clearRemovesTheIndex() {
registry.clear();

Optional<StorageIndex> actual = registry.findIndex(address);

assertThat(actual).isEmpty();
}
}

@Test
Expand Down

0 comments on commit 597ab93

Please sign in to comment.