Skip to content

Commit

Permalink
GG-39451 Coordinator flow update for incremental snapshots (apache#834)
Browse files Browse the repository at this point in the history
  • Loading branch information
stasmarkin authored Jun 18, 2024
1 parent 0486837 commit 87a51bb
Show file tree
Hide file tree
Showing 14 changed files with 405 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.gridgain.internal.snapshots.communication.metastorage.MetaStorageKeys.createSnapshotGlobalStateKey;
import static org.gridgain.internal.snapshots.communication.metastorage.MetaStorageKeys.createSnapshotGlobalStatePrefix;
import static org.gridgain.internal.snapshots.communication.metastorage.MetaStorageKeys.deleteSnapshotGlobalStatePrefix;
import static org.gridgain.internal.snapshots.communication.metastorage.MetaStorageKeys.restoreSnapshotGlobalStatePrefix;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand All @@ -35,14 +38,17 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.impl.MetaStorageServiceImpl;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.network.ClusterNode;
import org.gridgain.internal.snapshots.SnapshotsFileSystem.SnapshotPath;
import org.gridgain.internal.snapshots.communication.metastorage.CreateSnapshotGlobalState;
import org.gridgain.internal.snapshots.fixtures.TestSnapshotNode;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -407,6 +413,62 @@ void testStopCoordinatorOnSnapshotDeletion() throws Exception {
);
}

/**
* Tests a scenario when Snapshot Coordinator gets re-elected with a Snapshot in the cache.
* So, the Snapshot Cache should be reloaded after the coordinator change.
*/
@Test
void testSnapshotCacheIsReloadedAfterCoordinatorChange() throws Exception {
String tableName = "foo";

createTable(tableName);

CreateSnapshotGlobalState baseSnapshot = readState(createSnapshot(follower(), tableName));

changeCoordinator();

CreateSnapshotGlobalState incSnapshot = readState(createSnapshot(follower(), tableName));

assertThat(incSnapshot.baseSnapshotId(), is(baseSnapshot.snapshotId()));
}

private TestSnapshotNode follower() {
TestSnapshotNode coordinator = snapshotCoordinator();

return cluster.stream()
.filter(node -> node != coordinator)
.findAny()
.orElseThrow();
}

private static Long createSnapshot(TestSnapshotNode targetNode, String... tableNames) {
var createWatch = new TestGlobalSnapshotStateWatch();

targetNode.metaStorageManager.registerPrefixWatch(createSnapshotGlobalStatePrefix(), createWatch);

CompletableFuture<Long> snapshotCreateFuture = targetNode.snapshotFacade().createFullSnapshot(Set.of(tableNames));

assertThat(snapshotCreateFuture, willCompleteSuccessfully());

assertThat(createWatch.operationStartedFuture(), willCompleteSuccessfully());
assertThat(createWatch.operationFinishedFuture(), willCompleteSuccessfully());

return snapshotCreateFuture.join();
}

private CreateSnapshotGlobalState readState(long snapshotId) {
Entry entry;
try {
entry = cluster.get(0).metaStorageManager
.get(createSnapshotGlobalStateKey(snapshotId))
.get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}

return fromBytes(entry.value());
}

private List<CompletableFuture<Void>> blockTableScan(String tableName) {
return cluster.stream().map(node -> blockTableScan(tableName, node)).collect(toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void testSnapshotRestorationCancelledOnError() throws Exception {
Set.of(),
Set.of(),
timestamp,
"testSnapshotRestorationCancelledOnError"
"testSnapshotRestorationCancelledOnError",
null
)));

// Wait for the snapshot to be created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.gridgain.internal.snapshots.communication.metastorage.MetaStorageKeys.createSnapshotGlobalStateKey;
import static org.gridgain.internal.snapshots.communication.metastorage.MetaStorageKeys.createSnapshotGlobalStatePrefix;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.nullValue;

import java.io.IOException;
import java.math.BigInteger;
Expand All @@ -31,14 +34,17 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32C;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.gridgain.internal.snapshots.SnapshotsFileSystem.SnapshotPath;
import org.gridgain.internal.snapshots.SnapshotsFileSystem.TableSnapshotPath;
import org.gridgain.internal.snapshots.communication.metastorage.CreateSnapshotGlobalState;
import org.gridgain.internal.snapshots.meta.SnapshotMeta;
import org.gridgain.internal.snapshots.meta.TableSchemaView;
import org.gridgain.internal.snapshots.meta.TableSnapshotMeta;
Expand Down Expand Up @@ -95,6 +101,65 @@ void testCreateSnapshotMultipleTables() {
assertTuple(tuples.get(4), BigInteger.valueOf(5), 20, "day");
}

/**
* Tests a scenario when a Incremental Snapshot is advised after Full Snapshot.
*/
@Test
void testCreateIncrementalSnapshotIsAdvised() {
String tableName1 = "TEST1";
String tableName2 = "TEST2";
String tableName3 = "TEST3";

executeSql(String.format("CREATE TABLE %s (key BIGINT PRIMARY KEY, valInt INT, valStr VARCHAR)", tableName1));
executeSql(String.format("CREATE TABLE %s (key BIGINT PRIMARY KEY, valInt INT, valStr VARCHAR)", tableName2));
executeSql(String.format("CREATE TABLE %s (key BIGINT PRIMARY KEY, valInt INT, valStr VARCHAR)", tableName3));

executeSql(String.format("INSERT INTO %s VALUES (1, 42, 'foo'), (2, 69, 'bar'), (3, 239, 'baz')", tableName1));
executeSql(String.format("INSERT INTO %s VALUES (4, 4, 'nice'), (5, 20, 'day')", tableName2));
executeSql(String.format("INSERT INTO %s VALUES (6, 1337, 'table 3 value')", tableName3));

CreateSnapshotGlobalState baseSnapshot = readState(createSnapshot());

executeSql(String.format("INSERT INTO %s VALUES (7, 37, 'table 3 inc value1')", tableName3));

CreateSnapshotGlobalState incSnapshot1 = readState(createSnapshot());

assertThat(incSnapshot1.baseSnapshotId(), is(baseSnapshot.snapshotId()));

executeSql(String.format("INSERT INTO %s VALUES (8, 96, 'table 3 inc value2')", tableName3));

CreateSnapshotGlobalState incSnapshot2 = readState(createSnapshot(tableName2, tableName3));

assertThat(incSnapshot2.baseSnapshotId(), is(incSnapshot1.snapshotId()));
}

/**
* Tests a scenario when a Incremental Snapshot is not advised after Full Snapshot with some missing tables.
*/
@Test
void testCreateIncrementalSnapshotIsNotAdvised() {
String tableName1 = "TEST1";
String tableName2 = "TEST2";
String tableName3 = "TEST3";

executeSql(String.format("CREATE TABLE %s (key BIGINT PRIMARY KEY, valInt INT, valStr VARCHAR)", tableName1));
executeSql(String.format("CREATE TABLE %s (key BIGINT PRIMARY KEY, valInt INT, valStr VARCHAR)", tableName2));
executeSql(String.format("CREATE TABLE %s (key BIGINT PRIMARY KEY, valInt INT, valStr VARCHAR)", tableName3));

executeSql(String.format("INSERT INTO %s VALUES (1, 42, 'foo'), (2, 69, 'bar'), (3, 239, 'baz')", tableName1));
executeSql(String.format("INSERT INTO %s VALUES (4, 4, 'nice'), (5, 20, 'day')", tableName2));
executeSql(String.format("INSERT INTO %s VALUES (6, 1337, 'table 3 value')", tableName3));

createSnapshot(tableName2, tableName3);

executeSql(String.format("INSERT INTO %s VALUES (7, 37, 'table 1 inc value')", tableName1));
executeSql(String.format("INSERT INTO %s VALUES (8, 96, 'table 3 inc value')", tableName3));

CreateSnapshotGlobalState incSnapshot = readState(createSnapshot(tableName1));

assertThat(incSnapshot.baseSnapshotId(), nullValue());
}

/**
* Tests a scenario when multiple tables exist, but only one participates in a Full Snapshot.
*/
Expand Down Expand Up @@ -326,6 +391,20 @@ private SnapshotsFileSystem snapshotsFileSystem(Ignite node) {
return new SnapshotsFileSystem(snapshotsDir);
}

private CreateSnapshotGlobalState readState(long snapshotId) {
Entry entry;
try {
entry = cluster.aliveNode().metaStorageManager()
.get(createSnapshotGlobalStateKey(snapshotId))
.get(1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}

CreateSnapshotGlobalState state = fromBytes(entry.value());
return state;
}

private static SnapshotMeta readSnapshotMeta(SnapshotsFileSystem fileSystem, long snapshotId) {
var snapshotMetaSerializer = new SnapshotMetaSerializer(fileSystem);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static org.gridgain.internal.snapshots.communication.metastorage.MetaStorageKeys.createSnapshotGlobalStatePrefix;
import static org.gridgain.internal.snapshots.communication.metastorage.MetaStorageKeys.deleteSnapshotGlobalStatePrefix;
Expand Down Expand Up @@ -110,9 +109,8 @@ public SnapshotManager(
this.snapshotCoordinatorRole = new Lazy<>(() -> new SnapshotCoordinatorRole(context));

metaStorageManager.listen(MetaStorageEvent.ON_LEADER_ELECTED, parameters -> {
snapshotCoordinatorRole.get().onBecomeCoordinator(parameters.term());

return trueCompletedFuture();
return snapshotCoordinatorRole.get().onBecomeCoordinator(parameters.term())
.thenApply(ignored -> true);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;

/**
* Represents a Global Snapshot State, that is, state of the whole Snapshot operation (used during Snapshot creation).
Expand All @@ -30,6 +31,8 @@ public class CreateSnapshotGlobalState extends GlobalSnapshotState {
@IgniteToStringInclude
private final Set<Integer> tableIds;

private final @Nullable Long baseSnapshotId;

/**
* Constructor.
*
Expand All @@ -45,11 +48,13 @@ public CreateSnapshotGlobalState(
Set<Integer> tableIds,
Set<String> tableNames,
HybridTimestamp timestamp,
String description
String description,
@Nullable Long baseSnapshotId
) {
super(status, nodeNames, tableNames, timestamp, description);

this.tableIds = tableIds;
this.baseSnapshotId = baseSnapshotId;
}

public long snapshotId() {
Expand All @@ -60,6 +65,10 @@ public Set<Integer> tableIds() {
return tableIds;
}

public @Nullable Long baseSnapshotId() {
return baseSnapshotId;
}

@Override
public String toString() {
return S.toString(CreateSnapshotGlobalState.class, this, super.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Set;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;

/**
* Represents a Global Snapshot State, that is, state of the whole Snapshot operation (used during Snapshot deletion).
Expand All @@ -28,6 +29,8 @@ public class DeleteSnapshotGlobalState extends GlobalSnapshotState {

private final long targetSnapshotId;

private final @Nullable Long baseSnapshotId;

/**
* Constructor.
*
Expand All @@ -42,17 +45,23 @@ public DeleteSnapshotGlobalState(
Set<String> nodeNames,
long targetSnapshotId,
HybridTimestamp timestamp,
String description
String description,
@Nullable Long baseSnapshotId
) {
super(status, nodeNames, Set.of(), timestamp, description);

this.targetSnapshotId = targetSnapshotId;
this.baseSnapshotId = baseSnapshotId;
}

public long targetSnapshotId() {
return targetSnapshotId;
}

public @Nullable Long baseSnapshotId() {
return baseSnapshotId;
}

@Override
public String toString() {
return S.toString(DeleteSnapshotGlobalState.class, this, super.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private CompletableFuture<List<Operation>> completeSnapshot(SnapshotStatus statu
snapshotState.tableIds(),
snapshotState.tableNames(),
snapshotState.timestamp(),
description
description,
snapshotState.baseSnapshotId()
);

var operations = new ArrayList<Operation>(snapshotState.nodeNames().size() + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private CompletableFuture<List<Operation>> updateStateAndRemoveLocalStates(
snapshotState.nodeNames(),
snapshotState.targetSnapshotId(),
snapshotState.timestamp(),
message
message,
snapshotState.baseSnapshotId()
);

var operations = new ArrayList<Operation>(snapshotState.nodeNames().size() + 1);
Expand Down
Loading

0 comments on commit 87a51bb

Please sign in to comment.