From 4ded5d4f34ed99644f572bfe5281df6b62843b84 Mon Sep 17 00:00:00 2001 From: Aleksandr Polovtsev Date: Thu, 13 Feb 2025 21:19:34 +0200 Subject: [PATCH] review fixes --- .../PartitionReplicaLifecycleManager.java | 6 +- .../raft/OnSnapshotSaveHandler.java | 102 ++++++++++++ .../replicator/raft/RaftTableProcessor.java | 2 +- .../raft/ZonePartitionRaftListener.java | 51 +----- ...ess.java => PartitionMvStorageAccess.java} | 4 +- .../snapshot/PartitionSnapshotStorage.java | 8 +- .../PartitionSnapshotStorageFactory.java | 10 +- .../raft/snapshot/PartitionTxStateAccess.java | 9 +- .../snapshot/RaftSnapshotPartitionMeta.java | 2 +- .../raft/snapshot/ZonePartitionKey.java | 2 +- .../incoming/IncomingSnapshotCopier.java | 22 +-- ...ate.java => MvPartitionDeliveryState.java} | 63 ++++---- .../snapshot/outgoing/OutgoingSnapshot.java | 45 +++--- .../outgoing/OutgoingSnapshotsManager.java | 16 +- .../outgoing/PartitionsSnapshots.java | 6 +- .../snapshot/outgoing/SnapshotMetaUtils.java | 6 +- .../raft/ZonePartitionRaftListenerTest.java | 113 ++++++++++++++ .../PartitionSnapshotStorageFactoryTest.java | 4 +- .../incoming/IncomingSnapshotCopierTest.java | 4 +- .../MvPartitionDeliveryStateTest.java | 145 ++++++++++++++++++ .../outgoing/OutgoingSnapshotCommonTest.java | 40 +++-- .../OutgoingSnapshotMvDataStreamingTest.java | 67 +++++++- .../outgoing/OutgoingSnapshotReaderTest.java | 8 +- .../OutgoingSnapshotTxDataStreamingTest.java | 4 +- .../OutgoingSnapshotsManagerTest.java | 8 +- ...SnapshotAwarePartitionDataStorageTest.java | 16 +- .../outgoing/SnapshotMetaUtilsTest.java | 6 +- .../table/distributed/TableManager.java | 10 +- .../distributed/raft/PartitionListener.java | 30 +--- .../FullStateTransferIndexChooser.java | 6 +- .../OutgoingPartitionSnapshotsCleaner.java | 28 ++++ ...java => PartitionMvStorageAccessImpl.java} | 8 +- .../SnapshotAwarePartitionDataStorage.java | 21 +-- ... => PartitionMvStorageAccessImplTest.java} | 22 +-- 34 files changed, 647 insertions(+), 247 deletions(-) create mode 100644 modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java rename modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/{PartitionStorageAccess.java => PartitionMvStorageAccess.java} (98%) rename modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/{PartitionDeliveryState.java => MvPartitionDeliveryState.java} (60%) create mode 100644 modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java create mode 100644 modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java create mode 100644 modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/OutgoingPartitionSnapshotsCleaner.java rename modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/{PartitionStorageAccessImpl.java => PartitionMvStorageAccessImpl.java} (98%) rename modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/{PartitionStorageAccessImplTest.java => PartitionMvStorageAccessImplTest.java} (94%) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java index bcf0142ed82..d42a807d3f0 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java @@ -113,8 +113,8 @@ import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor; import org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl; import org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey; import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager; @@ -1427,7 +1427,7 @@ public void loadTableListenerToZoneReplica( TablePartitionId tablePartitionId, Function tablePartitionReplicaListenerFactory, RaftTableProcessor raftTableProcessor, - PartitionStorageAccess partitionStorageAccess + PartitionMvStorageAccess partitionMvStorageAccess ) { Listeners listeners = listenersByZonePartitionId.get(zonePartitionId); @@ -1442,7 +1442,7 @@ public void loadTableListenerToZoneReplica( listeners.raftListener.addTableProcessor(tablePartitionId, raftTableProcessor); - listeners.snapshotStorageFactory.addMvPartition(tablePartitionId.tableId(), partitionStorageAccess); + listeners.snapshotStorageFactory.addMvPartition(tablePartitionId.tableId(), partitionMvStorageAccess); } private CompletableFuture executeUnderZoneWriteLock(int zoneId, Supplier> action) { diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java new file mode 100644 index 00000000000..b383e2894e8 --- /dev/null +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/OnSnapshotSaveHandler.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft; + +import static java.lang.Math.max; +import static java.util.concurrent.CompletableFuture.allOf; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; +import org.apache.ignite.internal.raft.service.RaftGroupListener; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.internal.util.TrackerClosedException; + +/** + * Handler for the {@link RaftGroupListener#onSnapshotSave} event. + */ +public class OnSnapshotSaveHandler { + private final TxStatePartitionStorage txStatePartitionStorage; + + private final PendingComparableValuesTracker storageIndexTracker; + + public OnSnapshotSaveHandler( + TxStatePartitionStorage txStatePartitionStorage, + PendingComparableValuesTracker storageIndexTracker + ) { + this.txStatePartitionStorage = txStatePartitionStorage; + this.storageIndexTracker = storageIndexTracker; + } + + /** + * Called when {@link RaftGroupListener#onSnapshotSave} is triggered. + */ + public CompletableFuture onSnapshotSave(Collection tableProcessors) { + // The max index here is required for local recovery and a possible scenario + // of false node failure when we actually have all required data. This might happen because we use the minimal index + // among storages on a node restart. + // Let's consider a more detailed example: + // 1) We don't propagate the maximal lastAppliedIndex among storages, and onSnapshotSave finishes, it leads to the raft log + // truncation until the maximal lastAppliedIndex. + // 2) Unexpected cluster restart happens. + // 3) Local recovery of a node is started, where we request data from the minimal lastAppliedIndex among storages, because + // some data for some node might not have been flushed before unexpected cluster restart. + // 4) When we try to restore data starting from the minimal lastAppliedIndex, we come to the situation + // that a raft node doesn't have such data, because the truncation until the maximal lastAppliedIndex from 1) has happened. + // 5) Node cannot finish local recovery. + + long maxPartitionLastAppliedIndex = tableProcessors.stream() + .mapToLong(RaftTableProcessor::lastAppliedIndex) + .max() + .orElse(0); + + long maxPartitionLastAppliedTerm = tableProcessors.stream() + .mapToLong(RaftTableProcessor::lastAppliedTerm) + .max() + .orElse(0); + + long maxLastAppliedIndex = max(maxPartitionLastAppliedIndex, txStatePartitionStorage.lastAppliedIndex()); + + long maxLastAppliedTerm = max(maxPartitionLastAppliedTerm, txStatePartitionStorage.lastAppliedTerm()); + + tableProcessors.forEach(processor -> processor.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm)); + + txStatePartitionStorage.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm); + + updateTrackerIgnoringTrackerClosedException(storageIndexTracker, maxLastAppliedIndex); + + Stream> flushFutures = Stream.concat( + tableProcessors.stream().map(RaftTableProcessor::flushStorage), + Stream.of(txStatePartitionStorage.flush()) + ); + + return allOf(flushFutures.toArray(CompletableFuture[]::new)); + } + + private static > void updateTrackerIgnoringTrackerClosedException( + PendingComparableValuesTracker tracker, + T newValue + ) { + try { + tracker.update(newValue, null); + } catch (TrackerClosedException ignored) { + // No-op. + } + } +} diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java index a4aa30b45f5..4f2f20844cf 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTableProcessor.java @@ -60,7 +60,7 @@ void onConfigurationCommitted( long lastAppliedIndex(); /** - * Returns the last applied Raft term. + * Returns the term of the last applied Raft index. */ long lastAppliedTerm(); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java index 171bcd6b2c1..780efd0a4e5 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java @@ -17,17 +17,12 @@ package org.apache.ignite.internal.partition.replicator.raft; -import static java.lang.Math.max; -import static java.util.concurrent.CompletableFuture.allOf; - import java.io.Serializable; import java.nio.file.Path; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; -import java.util.stream.Stream; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.logger.IgniteLogger; @@ -71,8 +66,6 @@ public class ZonePartitionRaftListener implements RaftGroupListener { private final PartitionKey partitionKey; - private final TxStatePartitionStorage txStatePartitionStorage; - /** * Latest committed configuration of the zone-wide Raft group. * @@ -98,6 +91,8 @@ private static class CommittedConfiguration { private final FinishTxCommandHandler finishTxCommandHandler; + private final OnSnapshotSaveHandler onSnapshotSaveHandler; + /** Constructor. */ public ZonePartitionRaftListener( TxStatePartitionStorage txStatePartitionStorage, @@ -110,7 +105,6 @@ public ZonePartitionRaftListener( this.safeTimeTracker = safeTimeTracker; this.storageIndexTracker = storageIndexTracker; this.partitionsSnapshots = partitionsSnapshots; - this.txStatePartitionStorage = txStatePartitionStorage; this.partitionKey = new ZonePartitionKey(zonePartitionId.zoneId(), zonePartitionId.partitionId()); finishTxCommandHandler = new FinishTxCommandHandler( @@ -119,6 +113,8 @@ public ZonePartitionRaftListener( new TablePartitionId(zonePartitionId.zoneId(), zonePartitionId.partitionId()), txManager ); + + onSnapshotSaveHandler = new OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker); } @Override @@ -228,32 +224,7 @@ public void onConfigurationCommitted(RaftGroupConfiguration config, long lastApp @Override public void onSnapshotSave(Path path, Consumer doneClo) { - long maxPartitionLastAppliedIndex = tableProcessors.values().stream() - .mapToLong(RaftTableProcessor::lastAppliedIndex) - .max() - .orElse(0); - - long maxPartitionLastAppliedTerm = tableProcessors.values().stream() - .mapToLong(RaftTableProcessor::lastAppliedTerm) - .max() - .orElse(0); - - long maxLastAppliedIndex = max(maxPartitionLastAppliedIndex, txStatePartitionStorage.lastAppliedIndex()); - - long maxLastAppliedTerm = max(maxPartitionLastAppliedTerm, txStatePartitionStorage.lastAppliedTerm()); - - tableProcessors.values().forEach(processor -> processor.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm)); - - txStatePartitionStorage.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm); - - updateTrackerIgnoringTrackerClosedException(storageIndexTracker, maxLastAppliedIndex); - - Stream> flushFutures = Stream.concat( - tableProcessors.values().stream().map(RaftTableProcessor::flushStorage), - Stream.of(txStatePartitionStorage.flush()) - ); - - allOf(flushFutures.toArray(CompletableFuture[]::new)) + onSnapshotSaveHandler.onSnapshotSave(tableProcessors.values()) .whenComplete((unused, throwable) -> doneClo.accept(throwable)); } @@ -300,17 +271,7 @@ private static > void updateTrackerIgnoringTrackerClosed } private void cleanupSnapshots() { - PartitionSnapshots partitionSnapshots = partitionSnapshots(); - - partitionSnapshots.acquireReadLock(); - - try { - partitionSnapshots.ongoingSnapshots().forEach(snapshot -> partitionsSnapshots.finishOutgoingSnapshot(snapshot.id())); - - partitionsSnapshots.removeSnapshots(partitionKey); - } finally { - partitionSnapshots.releaseReadLock(); - } + partitionsSnapshots.cleanupOutgoingSnapshots(partitionKey); } private PartitionSnapshots partitionSnapshots() { diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionStorageAccess.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java similarity index 98% rename from modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionStorageAccess.java rename to modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java index 4d3c881f7b1..b57f2914a1d 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionStorageAccess.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java @@ -33,9 +33,9 @@ import org.jetbrains.annotations.Nullable; /** - * Small abstractions for partition storages that includes only methods, mandatory for the snapshot storage. + * Small abstractions for MV partition storages that includes only methods, mandatory for the snapshot storage. */ -public interface PartitionStorageAccess { +public interface PartitionMvStorageAccess { /** Table ID of the table that this partition storage is associated with. */ int tableId(); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java index 790cce41e89..f0e54dde7f7 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java @@ -65,7 +65,7 @@ public class PartitionSnapshotStorage implements SnapshotStorage { * *

This map is modified externally by the {@link PartitionSnapshotStorageFactory}. */ - private final Int2ObjectMap partitionsByTableId; + private final Int2ObjectMap partitionsByTableId; private final PartitionTxStateAccess txState; @@ -108,7 +108,7 @@ public PartitionSnapshotStorage( OutgoingSnapshotsManager outgoingSnapshotsManager, String snapshotUri, RaftOptions raftOptions, - Int2ObjectMap partitionsByTableId, + Int2ObjectMap partitionsByTableId, PartitionTxStateAccess txState, CatalogService catalogService, @Nullable SnapshotMeta startupSnapshotMeta, @@ -147,7 +147,7 @@ public PartitionSnapshotStorage( OutgoingSnapshotsManager outgoingSnapshotsManager, String snapshotUri, RaftOptions raftOptions, - Int2ObjectMap partitionsByTableId, + Int2ObjectMap partitionsByTableId, PartitionTxStateAccess txState, CatalogService catalogService, @Nullable SnapshotMeta startupSnapshotMeta, @@ -202,7 +202,7 @@ public RaftOptions raftOptions() { /** * Returns partitions by table ID. */ - public Int2ObjectMap partitionsByTableId() { + public Int2ObjectMap partitionsByTableId() { return partitionsByTableId; } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java index 500065d1999..0526a77ef31 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java @@ -62,7 +62,7 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory { /** * Partition storages grouped by table ID. */ - private final Int2ObjectMap partitionsByTableId = synchronize(new Int2ObjectOpenHashMap<>()); + private final Int2ObjectMap partitionsByTableId = synchronize(new Int2ObjectOpenHashMap<>()); private final PartitionTxStateAccess txStateStorage; @@ -91,8 +91,8 @@ public PartitionSnapshotStorageFactory( /** * Adds a given table partition storage to the snapshot storage, managed by this factory. */ - public void addMvPartition(int tableId, PartitionStorageAccess partition) { - PartitionStorageAccess prev = partitionsByTableId.put(tableId, partition); + public void addMvPartition(int tableId, PartitionMvStorageAccess partition) { + PartitionMvStorageAccess prev = partitionsByTableId.put(tableId, partition); assert prev == null : "Partition storage for table ID " + tableId + " already exists."; } @@ -121,7 +121,7 @@ public void removeMvPartition(int tableId) { // We must choose the minimum applied index for local recovery so that we don't skip the raft commands for the storage with the // lowest applied index and thus no data loss occurs. return partitionsByTableId.values().stream() - .min(comparingLong(PartitionStorageAccess::lastAppliedIndex)) + .min(comparingLong(PartitionMvStorageAccess::lastAppliedIndex)) .map(storageWithMinLastAppliedIndex -> { long minLastAppliedIndex = min(storageWithMinLastAppliedIndex.lastAppliedIndex(), txStateStorage.lastAppliedIndex()); @@ -132,7 +132,7 @@ public void removeMvPartition(int tableId) { int lastCatalogVersionAtStart = catalogService.latestCatalogVersion(); return snapshotMetaAt( - min(storageWithMinLastAppliedIndex.lastAppliedIndex(), txStateStorage.lastAppliedIndex()), + minLastAppliedIndex, min(storageWithMinLastAppliedIndex.lastAppliedTerm(), txStateStorage.lastAppliedTerm()), Objects.requireNonNull(storageWithMinLastAppliedIndex.committedGroupConfiguration()), lastCatalogVersionAtStart, diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionTxStateAccess.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionTxStateAccess.java index 9edbcd9d879..5202a44acc2 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionTxStateAccess.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionTxStateAccess.java @@ -26,7 +26,7 @@ /** * Small abstractions for TX storages that includes only methods, mandatory for the snapshot storage. * - * @see PartitionStorageAccess + * @see PartitionMvStorageAccess */ public interface PartitionTxStateAccess { /** @@ -51,18 +51,19 @@ public interface PartitionTxStateAccess { long lastAppliedTerm(); /** - * Prepares the TX storage for rebalance with the same guarantees and requirements as {@link PartitionStorageAccess#startRebalance}. + * Prepares the TX storage for rebalance with the same guarantees and requirements as {@link PartitionMvStorageAccess#startRebalance}. */ CompletableFuture startRebalance(); /** * Aborts an ongoing TX storage rebalance rebalance with the same guarantees and requirements as - * {@link PartitionStorageAccess#abortRebalance}. + * {@link PartitionMvStorageAccess#abortRebalance}. */ CompletableFuture abortRebalance(); /** - * Completes rebalancing of the TX storage with the same guarantees and requirements as {@link PartitionStorageAccess#finishRebalance}. + * Completes rebalancing of the TX storage with the same guarantees and requirements as + * {@link PartitionMvStorageAccess#finishRebalance}. */ CompletableFuture finishRebalance(RaftSnapshotPartitionMeta partitionMeta); } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/RaftSnapshotPartitionMeta.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/RaftSnapshotPartitionMeta.java index 4ae5036bd4c..55d12275f18 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/RaftSnapshotPartitionMeta.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/RaftSnapshotPartitionMeta.java @@ -25,7 +25,7 @@ import org.jetbrains.annotations.Nullable; /** - * Partition metadata for {@link PartitionStorageAccess}. + * Partition metadata for {@link PartitionMvStorageAccess}. */ public class RaftSnapshotPartitionMeta extends PrimitivePartitionMeta { private final RaftGroupConfiguration raftGroupConfig; diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java index 980ca16d6c9..43bfe476c0e 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java @@ -31,7 +31,7 @@ public class ZonePartitionKey implements PartitionKey { /** * Returns ID of the zone. */ - public int tableId() { + public int zoneId() { return zoneId; } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java index a591ad8bb87..56851e0e641 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java @@ -54,8 +54,8 @@ import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse.ResponseEntry; import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse; import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.RaftSnapshotPartitionMeta; import org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri; import org.apache.ignite.internal.raft.RaftGroupConfiguration; @@ -90,7 +90,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier { private final PartitionSnapshotStorage partitionSnapshotStorage; - private final Int2ObjectMap partitionsByTableId; + private final Int2ObjectMap partitionsByTableId; private final SnapshotUri snapshotUri; @@ -294,7 +294,7 @@ public SnapshotReader getReader() { } /** - * Requests and the snapshot meta. + * Requests the snapshot meta. */ private CompletableFuture loadSnapshotMeta(ClusterNode snapshotSender) { if (!busyLock.enterBusy()) { @@ -517,18 +517,18 @@ private String createPartitionInfo() { return partitionSnapshotStorage.partitionKey().toString(); } - private void writeVersion(PartitionSnapshotMeta snapshotMeta, ResponseEntry entry, int i) { - PartitionStorageAccess partition = partitionsByTableId.get(entry.tableId()); + private void writeVersion(PartitionSnapshotMeta snapshotMeta, ResponseEntry entry, int entryIndex) { + PartitionMvStorageAccess partition = partitionsByTableId.get(entry.tableId()); RowId rowId = new RowId(partId(), entry.rowId()); - BinaryRowMessage rowVersion = entry.rowVersions().get(i); + BinaryRowMessage rowVersion = entry.rowVersions().get(entryIndex); BinaryRow binaryRow = rowVersion == null ? null : rowVersion.asBinaryRow(); int snapshotCatalogVersion = snapshotMeta.requiredCatalogVersion(); - if (i == entry.timestamps().length) { + if (entryIndex == entry.timestamps().length) { // Writes an intent to write (uncommitted version). assert entry.txId() != null; assert entry.commitTableId() != null; @@ -537,7 +537,7 @@ private void writeVersion(PartitionSnapshotMeta snapshotMeta, ResponseEntry entr partition.addWrite(rowId, binaryRow, entry.txId(), entry.commitTableId(), entry.commitPartitionId(), snapshotCatalogVersion); } else { // Writes committed version. - partition.addWriteCommitted(rowId, binaryRow, hybridTimestamp(entry.timestamps()[i]), snapshotCatalogVersion); + partition.addWriteCommitted(rowId, binaryRow, hybridTimestamp(entry.timestamps()[entryIndex]), snapshotCatalogVersion); } } @@ -598,7 +598,7 @@ private CompletableFuture tryUpdateLowWatermark(ClusterNode snapshotSender private CompletableFuture startRebalance() { return allOf( - aggregateFutureFromPartitions(PartitionStorageAccess::startRebalance), + aggregateFutureFromPartitions(PartitionMvStorageAccess::startRebalance), partitionSnapshotStorage.txState().startRebalance() ); } @@ -612,12 +612,12 @@ private CompletableFuture finishRebalance(RaftSnapshotPartitionMeta meta) private CompletableFuture abortRebalance() { return allOf( - aggregateFutureFromPartitions(PartitionStorageAccess::abortRebalance), + aggregateFutureFromPartitions(PartitionMvStorageAccess::abortRebalance), partitionSnapshotStorage.txState().abortRebalance() ); } - private CompletableFuture aggregateFutureFromPartitions(Function> action) { + private CompletableFuture aggregateFutureFromPartitions(Function> action) { CompletableFuture[] futures = partitionsByTableId.values().stream() .map(action) .toArray(CompletableFuture[]::new); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java similarity index 60% rename from modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java rename to modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java index 18cb4e2a6c6..a13fc12ba07 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionDeliveryState.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java @@ -19,15 +19,15 @@ import java.util.Collection; import java.util.Iterator; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.storage.RowId; import org.jetbrains.annotations.Nullable; /** * Outgoing snapshot delivery state for a given partition. */ -class PartitionDeliveryState { - private final Iterator partitionStoragesIterator; +class MvPartitionDeliveryState { + private final Iterator partitionStoragesIterator; /** * Current row ID within the current partition storage. Can be {@code null} only if the snapshot has delivered all possible data. @@ -39,9 +39,9 @@ class PartitionDeliveryState { * Current partition storage. Can be {@code null} only if the snapshot has delivered all possible data. */ @Nullable - private PartitionStorageAccess currentPartitionStorage; + private PartitionMvStorageAccess currentPartitionStorage; - PartitionDeliveryState(Collection partitionStorages) { + MvPartitionDeliveryState(Collection partitionStorages) { this.partitionStoragesIterator = partitionStorages.iterator(); advance(); @@ -53,7 +53,7 @@ RowId currentRowId() { return currentRowId; } - PartitionStorageAccess currentPartitionStorage() { + PartitionMvStorageAccess currentPartitionStorage() { assert currentPartitionStorage != null; return currentPartitionStorage; @@ -63,46 +63,39 @@ int currentTableId() { return currentPartitionStorage().tableId(); } - boolean isEmpty() { + /** + * Returns {@code true} if all data for the snapshot has been retrieved. + */ + boolean isExhausted() { return currentPartitionStorage == null; } void advance() { - if (currentPartitionStorage == null) { - if (!partitionStoragesIterator.hasNext()) { - return; - } - - currentPartitionStorage = partitionStoragesIterator.next(); - - currentRowId = currentPartitionStorage.closestRowId(RowId.lowestRowId(currentPartitionStorage.partitionId())); - - // Partition is empty, try the next one. - if (currentRowId == null) { - moveToNextPartitionStorage(); - } - } else { - assert currentRowId != null; + while (true) { + if (currentPartitionStorage == null) { + if (!partitionStoragesIterator.hasNext()) { + return; + } - RowId nextRowId = currentRowId.increment(); + currentPartitionStorage = partitionStoragesIterator.next(); - // We've exhausted all possible row IDs in the partition, switch to the next one. - if (nextRowId == null) { - moveToNextPartitionStorage(); + currentRowId = currentPartitionStorage.closestRowId(RowId.lowestRowId(currentPartitionStorage.partitionId())); } else { - currentRowId = currentPartitionStorage.closestRowId(nextRowId); + assert currentRowId != null; + + currentRowId = currentRowId.increment(); - // We've read all data from this partition, switch to the next one. - if (currentRowId == null) { - moveToNextPartitionStorage(); + if (currentRowId != null) { + currentRowId = currentPartitionStorage.closestRowId(currentRowId); } } - } - } - private void moveToNextPartitionStorage() { - currentPartitionStorage = null; + if (currentRowId != null) { + return; + } - advance(); + // We've read all data from this partition, continue to the next one. + currentPartitionStorage = null; + } } } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java index 05236ba169d..02f6261d427 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java @@ -50,7 +50,7 @@ import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess; import org.apache.ignite.internal.raft.RaftGroupConfiguration; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; @@ -83,7 +83,7 @@ public class OutgoingSnapshot { private final PartitionKey partitionKey; - private final Int2ObjectSortedMap partitionsByTableId; + private final Int2ObjectSortedMap partitionsByTableId; private final PartitionTxStateAccess txState; @@ -113,10 +113,10 @@ public class OutgoingSnapshot { private final Queue outOfOrderMvData = new ArrayDeque<>(); /** - * Current delivery state of partition data. Can be {@code null} only if the delivery has not started yet.. + * Current delivery state of MV partition data. Can be {@code null} only if the delivery has not started yet. */ @Nullable - private PartitionDeliveryState partitionDeliveryState; + private MvPartitionDeliveryState mvPartitionDeliveryState; private Cursor> txDataCursor; @@ -133,7 +133,7 @@ public class OutgoingSnapshot { public OutgoingSnapshot( UUID id, PartitionKey partitionKey, - Int2ObjectMap partitionsByTableId, + Int2ObjectMap partitionsByTableId, PartitionTxStateAccess txState, CatalogService catalogService ) { @@ -177,8 +177,10 @@ void freezeScopeUnderMvLock() { } private PartitionSnapshotMeta takeSnapshotMeta() { - PartitionStorageAccess partitionStorageWithMaxAppliedIndex = partitionsByTableId.values().stream() - .max(comparingLong(PartitionStorageAccess::lastAppliedIndex)) + // TODO: partitionsByTableId will be empty for zones without tables, need another way to get meta in that case, + // see https://issues.apache.org/jira/browse/IGNITE-24517 + PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex = partitionsByTableId.values().stream() + .max(comparingLong(PartitionMvStorageAccess::lastAppliedIndex)) .orElseThrow(); RaftGroupConfiguration config = partitionStorageWithMaxAppliedIndex.committedGroupConfiguration(); @@ -329,16 +331,16 @@ private long tryProcessRowFromPartition( return totalBatchSize; } - if (partitionDeliveryState == null) { - partitionDeliveryState = new PartitionDeliveryState(partitionsByTableId.values()); + if (mvPartitionDeliveryState == null) { + mvPartitionDeliveryState = new MvPartitionDeliveryState(partitionsByTableId.values()); } else { - partitionDeliveryState.advance(); + mvPartitionDeliveryState.advance(); } - if (!partitionDeliveryState.isEmpty()) { - RowId rowId = partitionDeliveryState.currentRowId(); + if (!finishedMvData()) { + RowId rowId = mvPartitionDeliveryState.currentRowId(); - PartitionStorageAccess partition = partitionDeliveryState.currentPartitionStorage(); + PartitionMvStorageAccess partition = mvPartitionDeliveryState.currentPartitionStorage(); if (!rowIdsToSkip.remove(rowId)) { SnapshotMvDataResponse.ResponseEntry rowEntry = rowEntry(partition, rowId); @@ -359,7 +361,7 @@ private static boolean batchIsFull(SnapshotMvDataRequest request, long totalBatc } @Nullable - private static SnapshotMvDataResponse.ResponseEntry rowEntry(PartitionStorageAccess partition, RowId rowId) { + private static SnapshotMvDataResponse.ResponseEntry rowEntry(PartitionMvStorageAccess partition, RowId rowId) { List rowVersionsN2O = partition.getAllRowVersions(rowId); if (rowVersionsN2O.isEmpty()) { @@ -483,7 +485,7 @@ public void releaseMvLock() { * @return {@code true} if finished. */ private boolean finishedMvData() { - return partitionDeliveryState != null && partitionDeliveryState.isEmpty(); + return mvPartitionDeliveryState != null && mvPartitionDeliveryState.isExhausted(); } /** @@ -512,13 +514,20 @@ public boolean addRowIdToSkip(RowId rowId) { public boolean alreadyPassed(int tableId, RowId rowId) { assert mvOperationsLock.isLocked() : "MV operations lock must be acquired!"; + if (mvPartitionDeliveryState == null) { + // We haven't started sending MV data yet. + return false; + } + if (finishedMvData()) { return true; } - return partitionDeliveryState != null - && tableId <= partitionDeliveryState.currentTableId() - && rowId.compareTo(partitionDeliveryState.currentRowId()) <= 0; + if (tableId == mvPartitionDeliveryState.currentTableId()) { + return rowId.compareTo(mvPartitionDeliveryState.currentRowId()) <= 0; + } else { + return tableId < mvPartitionDeliveryState.currentTableId(); + } } /** diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java index 6193f5ca081..882789fc659 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java @@ -219,8 +219,20 @@ public PartitionSnapshots partitionSnapshots(PartitionKey partitionKey) { } @Override - public void removeSnapshots(PartitionKey partitionKey) { - snapshotsByPartition.remove(partitionKey); + public void cleanupOutgoingSnapshots(PartitionKey partitionKey) { + PartitionSnapshots partitionSnapshots = snapshotsByPartition.remove(partitionKey); + + if (partitionSnapshots == null) { + return; + } + + partitionSnapshots.acquireReadLock(); + + try { + partitionSnapshots.ongoingSnapshots().forEach(snapshot -> finishOutgoingSnapshot(snapshot.id())); + } finally { + partitionSnapshots.releaseReadLock(); + } } private static class PartitionSnapshotsImpl implements PartitionSnapshots { diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionsSnapshots.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionsSnapshots.java index 8641357231e..7e67e806181 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionsSnapshots.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/PartitionsSnapshots.java @@ -33,11 +33,13 @@ public interface PartitionsSnapshots { PartitionSnapshots partitionSnapshots(PartitionKey partitionKey); /** - * Removes the underlying collection for snapshots of this partition. + * Cleans up all outgoing snapshots for the given partition. + * + *

Cleaning up includes calling {@link #finishOutgoingSnapshot} on all snapshots for this partition. * * @param partitionKey Partition key. */ - void removeSnapshots(PartitionKey partitionKey); + void cleanupOutgoingSnapshots(PartitionKey partitionKey); /** * Finishes a snapshot. This closes the snapshot and deregisters it. diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotMetaUtils.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotMetaUtils.java index 7b77647125e..cd1cbc19f39 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotMetaUtils.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotMetaUtils.java @@ -30,7 +30,7 @@ import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; import org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta; import org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMetaBuilder; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.raft.RaftGroupConfiguration; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta; @@ -97,14 +97,14 @@ public static PartitionSnapshotMeta snapshotMetaAt( */ public static Map collectNextRowIdToBuildIndexes( CatalogService catalogService, - Collection mvPartitions, + Collection mvPartitions, int catalogVersion ) { var nextRowIdToBuildByIndexId = new HashMap(); Catalog catalog = catalogService.catalog(catalogVersion); - for (PartitionStorageAccess mvPartition : mvPartitions) { + for (PartitionMvStorageAccess mvPartition : mvPartitions) { int tableId = mvPartition.tableId(); for (CatalogIndexDescriptor index : catalog.indexes(tableId)) { diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java new file mode 100644 index 00000000000..b91af3ed9a0 --- /dev/null +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.UUID; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.schema.SchemaRegistry; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; +import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; +import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService; +import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.internal.util.SafeTimeValuesTracker; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ZonePartitionRaftListenerTest extends BaseIgniteAbstractTest { + private static final int ZONE_ID = 0; + + private static final int PARTITION_ID = 0; + + private static final ZonePartitionKey ZONE_PARTITION_KEY = new ZonePartitionKey(ZONE_ID, PARTITION_ID); + + private ZonePartitionRaftListener listener; + + @Mock + private OutgoingSnapshotsManager outgoingSnapshotsManager; + + @Mock + private TxManager txManager; + + @Mock + private TxStatePartitionStorage txStatePartitionStorage; + + @BeforeEach + void setUp() { + listener = new ZonePartitionRaftListener( + txStatePartitionStorage, + txManager, + new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE), + new PendingComparableValuesTracker<>(0L), + new ZonePartitionId(ZONE_ID, PARTITION_ID), + outgoingSnapshotsManager + ); + } + + @Test + void closesOngoingSnapshots(@Mock PartitionSnapshots partitionSnapshots) { + var tablePartition1 = new TablePartitionId(1, PARTITION_ID); + var tablePartition2 = new TablePartitionId(2, PARTITION_ID); + + listener.addTableProcessor(tablePartition1, partitionListener(tablePartition1)); + listener.addTableProcessor(tablePartition2, partitionListener(tablePartition2)); + + listener.onShutdown(); + + verify(outgoingSnapshotsManager).cleanupOutgoingSnapshots(ZONE_PARTITION_KEY); + } + + private PartitionListener partitionListener(TablePartitionId tablePartitionId) { + return new PartitionListener( + txManager, + new SnapshotAwarePartitionDataStorage( + tablePartitionId.tableId(), + mock(MvPartitionStorage.class), + outgoingSnapshotsManager, + ZONE_PARTITION_KEY + ), + mock(StorageUpdateHandler.class), + txStatePartitionStorage, + new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE), + new PendingComparableValuesTracker<>(0L), + mock(CatalogService.class), + mock(SchemaRegistry.class), + mock(IndexMetaStorage.class), + UUID.randomUUID(), + mock(MinimumRequiredTimeCollectorService.class) + ); + } +} diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java index 6e82b7d0cf9..ab384d44721 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java @@ -48,10 +48,10 @@ public class PartitionSnapshotStorageFactoryTest extends BaseIgniteAbstractTest private static final int TABLE_ID_2 = 2; @Mock - private PartitionStorageAccess partitionAccess1; + private PartitionMvStorageAccess partitionAccess1; @Mock - private PartitionStorageAccess partitionAccess2; + private PartitionMvStorageAccess partitionAccess2; @Mock private PartitionTxStateAccess txStateAccess; diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java index 07f75180fab..1247cb1f3d6 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java @@ -105,7 +105,7 @@ import org.apache.ignite.internal.table.distributed.gc.MvGc; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser; -import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionStorageAccessImpl; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl; import org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; @@ -380,7 +380,7 @@ private PartitionSnapshotStorage createPartitionSnapshotStorage( outgoingSnapshotsManager, SnapshotUri.toStringUri(snapshotId, NODE_NAME), mock(RaftOptions.class), - singleton(TABLE_ID, spy(new PartitionStorageAccessImpl( + singleton(TABLE_ID, spy(new PartitionMvStorageAccessImpl( PARTITION_ID, incomingTableStorage, mvGc, diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java new file mode 100644 index 00000000000..3cae52762c4 --- /dev/null +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing; + +import static java.util.stream.Collectors.toList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.stream.IntStream; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class MvPartitionDeliveryStateTest extends BaseIgniteAbstractTest { + private static final int PARTITION_ID = 1; + + @Test + void emptyStateIsExhausted() { + var state = new MvPartitionDeliveryState(List.of()); + + assertThat(state.isExhausted(), is(true)); + } + + @Test + void correctlyIteratesOverAllStorages( + @Mock PartitionMvStorageAccess storage1, + @Mock PartitionMvStorageAccess storage2, + @Mock PartitionMvStorageAccess storage3 + ) { + List storages = List.of(storage1, storage2, storage3); + + var rowIdsByTable = new HashMap>(); + + for (int i = 0; i < storages.size(); i++) { + List rowIds = generateRowIds(3); + + rowIdsByTable.put(i, rowIds); + + PartitionMvStorageAccess storage = storages.get(i); + + when(storage.partitionId()).thenReturn(PARTITION_ID); + when(storage.tableId()).thenReturn(i); + + when(storage.closestRowId(any())) + .thenReturn(rowIds.get(0)) + .thenReturn(rowIds.get(1)) + .thenReturn(rowIds.get(2)) + .thenReturn(null); + } + + var state = new MvPartitionDeliveryState(storages); + + for (int i = 0; i < storages.size(); i++) { + assertThat(state.isExhausted(), is(false)); + + assertThat(state.currentTableId(), is(i)); + assertThat(state.currentRowId(), is(rowIdsByTable.get(i).get(0))); + + state.advance(); + + assertThat(state.isExhausted(), is(false)); + + assertThat(state.currentTableId(), is(i)); + assertThat(state.currentRowId(), is(rowIdsByTable.get(i).get(1))); + + state.advance(); + + assertThat(state.isExhausted(), is(false)); + + assertThat(state.currentTableId(), is(i)); + assertThat(state.currentRowId(), is(rowIdsByTable.get(i).get(2))); + + state.advance(); + } + + assertThat(state.isExhausted(), is(true)); + } + + @Test + void handlesRowIdBoundaries( + @Mock PartitionMvStorageAccess storage1, + @Mock PartitionMvStorageAccess storage2 + ) { + // We are using two storages to test that table IDs are correctly iterated as well. + when(storage1.partitionId()).thenReturn(PARTITION_ID); + when(storage1.tableId()).thenReturn(1); + when(storage2.partitionId()).thenReturn(PARTITION_ID); + when(storage2.tableId()).thenReturn(2); + + RowId lowestRowId = RowId.lowestRowId(PARTITION_ID); + RowId highestRowId = RowId.highestRowId(PARTITION_ID); + + when(storage1.closestRowId(lowestRowId)).thenReturn(lowestRowId); + when(storage1.closestRowId(lowestRowId.increment())).thenReturn(highestRowId); + + when(storage2.closestRowId(lowestRowId)).thenReturn(lowestRowId); + when(storage2.closestRowId(lowestRowId.increment())).thenReturn(highestRowId); + + var state = new MvPartitionDeliveryState(List.of(storage1, storage2)); + + IntStream.rangeClosed(1, 2).forEach(i -> { + assertThat(state.isExhausted(), is(false)); + assertThat(state.currentTableId(), is(i)); + assertThat(state.currentRowId(), is(lowestRowId)); + + state.advance(); + + assertThat(state.isExhausted(), is(false)); + assertThat(state.currentTableId(), is(i)); + assertThat(state.currentRowId(), is(highestRowId)); + + state.advance(); + }); + + assertThat(state.isExhausted(), is(true)); + } + + private static List generateRowIds(int count) { + return IntStream.range(0, count).mapToObj(RowId::new).sorted().collect(toList()); + } +} diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java index 874f03155b0..3d88962deed 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing; -import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.singleton; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -27,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.util.List; import java.util.UUID; import org.apache.ignite.internal.catalog.Catalog; @@ -35,10 +35,10 @@ import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaRequest; import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey; import org.apache.ignite.internal.raft.RaftGroupConfiguration; -import org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; @@ -49,10 +49,14 @@ @ExtendWith(MockitoExtension.class) class OutgoingSnapshotCommonTest extends BaseIgniteAbstractTest { - private static final int TABLE_ID = 1; + private static final int TABLE_ID_1 = 1; + private static final int TABLE_ID_2 = 2; @Mock - private PartitionStorageAccess partitionAccess; + private PartitionMvStorageAccess partitionAccess1; + + @Mock + private PartitionMvStorageAccess partitionAccess2; @Mock private CatalogService catalogService; @@ -61,7 +65,7 @@ class OutgoingSnapshotCommonTest extends BaseIgniteAbstractTest { private final PartitionReplicationMessagesFactory messagesFactory = new PartitionReplicationMessagesFactory(); - private final PartitionKey partitionKey = new TablePartitionKey(TABLE_ID, 1); + private final PartitionKey partitionKey = new ZonePartitionKey(0, 1); private static final int REQUIRED_CATALOG_VERSION = 42; @@ -69,10 +73,15 @@ class OutgoingSnapshotCommonTest extends BaseIgniteAbstractTest { void createTestInstance() { lenient().when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class)); + var partitionsByTableId = new Int2ObjectOpenHashMap(); + + partitionsByTableId.put(TABLE_ID_1, partitionAccess1); + partitionsByTableId.put(TABLE_ID_2, partitionAccess2); + snapshot = new OutgoingSnapshot( UUID.randomUUID(), partitionKey, - singleton(TABLE_ID, partitionAccess), + partitionsByTableId, mock(PartitionTxStateAccess.class), catalogService ); @@ -85,9 +94,12 @@ void returnsKeyFromStorage() { @Test void sendsSnapshotMeta() { - when(partitionAccess.lastAppliedIndex()).thenReturn(100L); - when(partitionAccess.lastAppliedTerm()).thenReturn(3L); - when(partitionAccess.committedGroupConfiguration()).thenReturn(new RaftGroupConfiguration( + // Test that max applied index across all storages is sent. + when(partitionAccess1.lastAppliedIndex()).thenReturn(99L); + + when(partitionAccess2.lastAppliedIndex()).thenReturn(100L); + when(partitionAccess2.lastAppliedTerm()).thenReturn(3L); + when(partitionAccess2.committedGroupConfiguration()).thenReturn(new RaftGroupConfiguration( 13L, 37L, List.of("peer1:3000", "peer2:3000"), @@ -98,9 +110,9 @@ void sendsSnapshotMeta() { when(catalogService.latestCatalogVersion()).thenReturn(REQUIRED_CATALOG_VERSION); - when(partitionAccess.leaseStartTime()).thenReturn(333L); - when(partitionAccess.primaryReplicaNodeId()).thenReturn(new UUID(1, 2)); - when(partitionAccess.primaryReplicaNodeName()).thenReturn("primary"); + when(partitionAccess2.leaseStartTime()).thenReturn(333L); + when(partitionAccess2.primaryReplicaNodeId()).thenReturn(new UUID(1, 2)); + when(partitionAccess2.primaryReplicaNodeName()).thenReturn("primary"); snapshot.freezeScopeUnderMvLock(); @@ -139,7 +151,7 @@ private SnapshotMetaResponse getNullableSnapshotMetaResponse() { @Test void doesNotSendOldConfigWhenItIsNotThere() { - when(partitionAccess.committedGroupConfiguration()).thenReturn(new RaftGroupConfiguration( + when(partitionAccess1.committedGroupConfiguration()).thenReturn(new RaftGroupConfiguration( 13L, 37L, List.of(), List.of(), null, null )); diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java index 45b9827c5e8..72b05eb79b9 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java @@ -43,7 +43,7 @@ import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataRequest; import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey; import org.apache.ignite.internal.schema.BinaryRow; @@ -69,10 +69,10 @@ class OutgoingSnapshotMvDataStreamingTest extends BaseIgniteAbstractTest { private static final int PARTITION_ID = 1; @Mock - private PartitionStorageAccess partitionAccess1; + private PartitionMvStorageAccess partitionAccess1; @Mock - private PartitionStorageAccess partitionAccess2; + private PartitionMvStorageAccess partitionAccess2; @Mock private CatalogService catalogService; @@ -98,12 +98,12 @@ class OutgoingSnapshotMvDataStreamingTest extends BaseIgniteAbstractTest { @BeforeEach void createTestInstance() { lenient().when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1); - lenient().when(partitionAccess1.partitionId()).thenReturn(partitionKey.partitionId()); + lenient().when(partitionAccess1.partitionId()).thenReturn(PARTITION_ID); lenient().when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2); - lenient().when(partitionAccess2.partitionId()).thenReturn(partitionKey.partitionId()); + lenient().when(partitionAccess2.partitionId()).thenReturn(PARTITION_ID); - var partitionsByTableId = new Int2ObjectOpenHashMap(); + var partitionsByTableId = new Int2ObjectOpenHashMap(); partitionsByTableId.put(TABLE_ID_1, partitionAccess1); partitionsByTableId.put(TABLE_ID_2, partitionAccess2); @@ -539,6 +539,61 @@ void notYetSentRowIdIsNotPassed() { } } + @Test + void notYetSentRowIdIsNotPassedAcrossMultipleTables() { + ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); + ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now()); + + when(partitionAccess1.closestRowId(lowestRowId)).thenReturn(rowId1); + when(partitionAccess1.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2)); + when(partitionAccess1.closestRowId(rowId2)).thenReturn(rowId2); + when(partitionAccess1.getAllRowVersions(rowId2)).thenReturn(List.of(version1)); + + when(partitionAccess2.closestRowId(lowestRowId)).thenReturn(rowId1); + when(partitionAccess2.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2)); + when(partitionAccess2.closestRowId(rowId2)).thenReturn(rowId2); + when(partitionAccess2.getAllRowVersions(rowId2)).thenReturn(List.of(version1)); + + snapshot.acquireMvLock(); + + try { + assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId1)); + assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId2)); + assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId1)); + assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2)); + + getMvDataResponse(1); + + assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1)); + assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId2)); + assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId1)); + assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2)); + + getMvDataResponse(1); + + assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1)); + assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId2)); + assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId1)); + assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2)); + + getMvDataResponse(1); + + assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1)); + assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId2)); + assertTrue(snapshot.alreadyPassed(TABLE_ID_2, rowId1)); + assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2)); + + getMvDataResponse(1); + + assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1)); + assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId2)); + assertTrue(snapshot.alreadyPassed(TABLE_ID_2, rowId1)); + assertTrue(snapshot.alreadyPassed(TABLE_ID_2, rowId2)); + } finally { + snapshot.releaseMvLock(); + } + } + @Test void anyRowIdIsPassedForFinishedSnapshot() { ReadResult version = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now()); diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java index fa2ce97a885..b55df3819d4 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java @@ -30,8 +30,8 @@ import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.network.TopologyService; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey; import org.apache.ignite.internal.raft.RaftGroupConfiguration; @@ -49,8 +49,8 @@ public class OutgoingSnapshotReaderTest extends BaseIgniteAbstractTest { @Test void testForChoosingMaximumAppliedIndexForMeta() throws IOException { - PartitionStorageAccess partitionAccess1 = mock(PartitionStorageAccess.class); - PartitionStorageAccess partitionAccess2 = mock(PartitionStorageAccess.class); + PartitionMvStorageAccess partitionAccess1 = mock(PartitionMvStorageAccess.class); + PartitionMvStorageAccess partitionAccess2 = mock(PartitionMvStorageAccess.class); when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1); when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2); @@ -70,7 +70,7 @@ void testForChoosingMaximumAppliedIndexForMeta() throws IOException { PartitionTxStateAccess txStateAccess = mock(PartitionTxStateAccess.class); - var partitionsByTableId = new Int2ObjectOpenHashMap(); + var partitionsByTableId = new Int2ObjectOpenHashMap(); partitionsByTableId.put(TABLE_ID_1, partitionAccess1); partitionsByTableId.put(TABLE_ID_2, partitionAccess2); diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java index 63a1bb28180..ef047147763 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java @@ -45,7 +45,7 @@ import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest; import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess; import org.apache.ignite.internal.raft.RaftGroupConfiguration; import org.apache.ignite.internal.replicator.TablePartitionId; @@ -64,7 +64,7 @@ @ExtendWith(MockitoExtension.class) class OutgoingSnapshotTxDataStreamingTest extends BaseIgniteAbstractTest { @Mock - private PartitionStorageAccess partitionAccess; + private PartitionMvStorageAccess partitionAccess; @Mock private PartitionTxStateAccess txAccess; diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java index e9328d7bcfb..04d49b5e17a 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -31,7 +32,7 @@ import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess; import org.apache.ignite.internal.raft.RaftGroupConfiguration; import org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey; @@ -50,7 +51,7 @@ class OutgoingSnapshotsManagerTest extends BaseIgniteAbstractTest { private OutgoingSnapshotsManager manager; @Mock - private PartitionStorageAccess partitionAccess; + private PartitionMvStorageAccess partitionAccess; @Mock private CatalogService catalogService; @@ -101,6 +102,7 @@ void finishesSnapshot() { private UUID startSnapshot() { UUID snapshotId = UUID.randomUUID(); OutgoingSnapshot snapshot = mock(OutgoingSnapshot.class); + lenient().when(snapshot.id()).thenReturn(snapshotId); doReturn(partitionKey).when(snapshot).partitionKey(); manager.startOutgoingSnapshot(snapshotId, snapshot); @@ -111,7 +113,7 @@ private UUID startSnapshot() { void removesPartitionsCollection() { startSnapshot(); - manager.removeSnapshots(partitionKey); + manager.cleanupOutgoingSnapshots(partitionKey); assertThat(manager.partitionSnapshots(partitionKey).ongoingSnapshots(), is(empty())); } diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java index 6324096ffe7..6df959128f3 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing; -import static java.util.Collections.singletonList; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -90,6 +89,8 @@ void configureMocks() { ); lenient().when(partitionsSnapshots.partitionSnapshots(any())).thenReturn(partitionSnapshots); + + lenient().when(snapshot.id()).thenReturn(UUID.randomUUID()); } @Test @@ -282,22 +283,11 @@ void interceptsWritesToMvStorageOnMultipleSnapshots(MvWriteAction writeAction) { verify(snapshot2).enqueueForSending(TABLE_ID, rowId); } - @Test - void finishesSnapshotsOnStop() { - when(partitionSnapshots.ongoingSnapshots()).thenReturn(singletonList(snapshot)); - - testedStorage.close(); - - verify(partitionsSnapshots).finishOutgoingSnapshot(snapshot.id()); - } - @Test void removesSnapshotsCollectionOnStop() { - when(partitionSnapshots.ongoingSnapshots()).thenReturn(singletonList(snapshot)); - testedStorage.close(); - verify(partitionsSnapshots).removeSnapshots(partitionKey); + verify(partitionsSnapshots).cleanupOutgoingSnapshots(partitionKey); } private enum MvWriteAction { diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java index b4e8db6120f..458a21d0cb4 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java @@ -46,7 +46,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.raft.RaftGroupConfiguration; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; @@ -144,8 +144,8 @@ void testCollectNextRowIdToBuildIndexes() throws Exception { startBuildingIndex(catalogManager, indexId2); startBuildingIndex(catalogManager, indexId3); - PartitionStorageAccess partitionAccess0 = mock(PartitionStorageAccess.class, withSettings().strictness(LENIENT)); - PartitionStorageAccess partitionAccess1 = mock(PartitionStorageAccess.class, withSettings().strictness(LENIENT)); + PartitionMvStorageAccess partitionAccess0 = mock(PartitionMvStorageAccess.class, withSettings().strictness(LENIENT)); + PartitionMvStorageAccess partitionAccess1 = mock(PartitionMvStorageAccess.class, withSettings().strictness(LENIENT)); int tableId0 = getTableIdStrict(catalogManager, tableName0, clock.nowLong()); int tableId1 = getTableIdStrict(catalogManager, tableName1, clock.nowLong()); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index c39fdda3b71..3cae39d0a64 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -211,7 +211,7 @@ import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService; import org.apache.ignite.internal.table.distributed.raft.PartitionListener; import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser; -import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionStorageAccessImpl; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl; import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage; import org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; @@ -907,7 +907,7 @@ private void preparePartitionResourcesAndLoadToZoneReplica(TableImpl table, Zone minTimeCollectorService ); - var partitionStorageAccess = new PartitionStorageAccessImpl( + var partitionStorageAccess = new PartitionMvStorageAccessImpl( partId, table.internalTable().storage(), mvGc, @@ -1277,8 +1277,10 @@ private CompletableFuture startPartitionAndStartClient( PartitionStorages partitionStorages = getPartitionStorages(table, partId); + var partitionKey = new TablePartitionKey(tableId, partId); + PartitionDataStorage partitionDataStorage = partitionDataStorage( - new TablePartitionKey(tableId, partId), + partitionKey, internalTbl.tableId(), partitionStorages.getMvPartitionStorage() ); @@ -2511,7 +2513,7 @@ private SnapshotStorageFactory createSnapshotStorageFactory( ) { int partitionId = replicaGrpId.partitionId(); - var partitionAccess = new PartitionStorageAccessImpl( + var partitionAccess = new PartitionMvStorageAccessImpl( partitionId, internalTable.storage(), mvGc, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 034f3f7152a..c7a99f60025 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand; import org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand; import org.apache.ignite.internal.partition.replicator.raft.FinishTxCommandHandler; +import org.apache.ignite.internal.partition.replicator.raft.OnSnapshotSaveHandler; import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor; import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; @@ -130,6 +131,8 @@ public class PartitionListener implements RaftGroupListener, RaftTableProcessor private final FinishTxCommandHandler finishTxCommandHandler; + private final OnSnapshotSaveHandler onSnapshotSaveHandler; + /** Constructor. */ public PartitionListener( TxManager txManager, @@ -162,6 +165,7 @@ public PartitionListener( new TablePartitionId(storage.tableId(), storage.partitionId()), txManager ); + onSnapshotSaveHandler = new OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker); } @Override @@ -481,31 +485,7 @@ public void onConfigurationCommitted( @Override public void onSnapshotSave(Path path, Consumer doneClo) { - // The max index here is required for local recovery and a possible scenario - // of false node failure when we actually have all required data. This might happen because we use the minimal index - // among storages on a node restart. - // Let's consider a more detailed example: - // 1) We don't propagate the maximal lastAppliedIndex among storages, and onSnapshotSave finishes, it leads to the raft log - // truncation until the maximal lastAppliedIndex. - // 2) Unexpected cluster restart happens. - // 3) Local recovery of a node is started, where we request data from the minimal lastAppliedIndex among storages, because - // some data for some node might not have been flushed before unexpected cluster restart. - // 4) When we try to restore data starting from the minimal lastAppliedIndex, we come to the situation - // that a raft node doesn't have such data, because the truncation until the maximal lastAppliedIndex from 1) has happened. - // 5) Node cannot finish local recovery. - long maxLastAppliedIndex = Math.max(storage.lastAppliedIndex(), txStatePartitionStorage.lastAppliedIndex()); - long maxLastAppliedTerm = Math.max(storage.lastAppliedTerm(), txStatePartitionStorage.lastAppliedTerm()); - - storage.runConsistently(locker -> { - storage.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm); - - return null; - }); - - txStatePartitionStorage.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm); - updateTrackerIgnoringTrackerClosedException(storageIndexTracker, maxLastAppliedIndex); - - CompletableFuture.allOf(storage.flush(), txStatePartitionStorage.flush()) + onSnapshotSaveHandler.onSnapshotSave(List.of(this)) .whenComplete((unused, throwable) -> doneClo.accept(throwable)); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java index 9248fefa7fa..3e5d95b3448 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java @@ -45,7 +45,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lowwatermark.LowWatermark; import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.table.distributed.index.IndexMeta; import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; import org.apache.ignite.internal.table.distributed.index.MetaIndexStatus; @@ -99,7 +99,7 @@ public void close() { } /** - * Collect indexes for {@link PartitionStorageAccess#addWrite} (write intent). + * Collect indexes for {@link PartitionMvStorageAccess#addWrite} (write intent). * *

NOTE: When updating a low watermark, the index storages that were returned from the method may begin to be destroyed, such a * situation should be handled by the calling code.

@@ -140,7 +140,7 @@ public List chooseForAddWrite(int catalogVersion, int ta } /** - * Collect indexes for {@link PartitionStorageAccess#addWriteCommitted} (write committed only). + * Collect indexes for {@link PartitionMvStorageAccess#addWriteCommitted} (write committed only). * *

NOTE: When updating a low watermark, the index storages that were returned from the method may begin to be destroyed, such a * situation should be handled by the calling code.

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/OutgoingPartitionSnapshotsCleaner.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/OutgoingPartitionSnapshotsCleaner.java new file mode 100644 index 00000000000..81684ff1620 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/OutgoingPartitionSnapshotsCleaner.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot; + +/** + * Stops and cleans up outgoing snapshots for a partition. + */ +@FunctionalInterface +public interface OutgoingPartitionSnapshotsCleaner { + OutgoingPartitionSnapshotsCleaner NO_OP = () -> {}; + + void cleanupOutgoingSnapshots(); +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionStorageAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java similarity index 98% rename from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionStorageAccessImpl.java rename to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java index f0f7dc24b18..9aa2d036be4 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionStorageAccessImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.lowwatermark.LowWatermark; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.RaftSnapshotPartitionMeta; import org.apache.ignite.internal.raft.RaftGroupConfiguration; import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter; @@ -47,8 +47,8 @@ import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; -/** {@link PartitionStorageAccess} implementation. */ -public class PartitionStorageAccessImpl implements PartitionStorageAccess { +/** {@link PartitionMvStorageAccess} implementation. */ +public class PartitionMvStorageAccessImpl implements PartitionMvStorageAccess { private final int partitionId; private final MvTableStorage mvTableStorage; @@ -79,7 +79,7 @@ public class PartitionStorageAccessImpl implements PartitionStorageAccess { * @param schemaRegistry Schema registry. * @param lowWatermark Low watermark. */ - public PartitionStorageAccessImpl( + public PartitionMvStorageAccessImpl( int partitionId, MvTableStorage mvTableStorage, MvGc mvGc, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java index e43148b0ddf..9d44ecf2302 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey; import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshot; import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots; import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionsSnapshots; @@ -194,21 +195,13 @@ private void handleSnapshotInterference(RowId rowId) { @Override public void close() { - cleanupSnapshots(); - } - - private void cleanupSnapshots() { - PartitionSnapshots partitionSnapshots = getPartitionSnapshots(); - - partitionSnapshots.acquireReadLock(); - - try { - partitionSnapshots.ongoingSnapshots().forEach(snapshot -> partitionsSnapshots.finishOutgoingSnapshot(snapshot.id())); - - partitionsSnapshots.removeSnapshots(partitionKey); - } finally { - partitionSnapshots.releaseReadLock(); + if (partitionKey instanceof ZonePartitionKey) { + // This is a hack for the colocation feature, for zone-based partitions snapshots are cleaned up for a bunch of storages at + // once and this is done in a separate place. + return; } + + partitionsSnapshots.cleanupOutgoingSnapshots(partitionKey); } @Override diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionStorageAccessImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImplTest.java similarity index 94% rename from modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionStorageAccessImplTest.java rename to modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImplTest.java index 2256d0891fc..fae96727e38 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionStorageAccessImplTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImplTest.java @@ -45,7 +45,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lowwatermark.LowWatermark; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.DefaultValueProvider; @@ -63,8 +63,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -/** For {@link PartitionStorageAccessImpl} testing. */ -public class PartitionStorageAccessImplTest extends BaseIgniteAbstractTest { +/** For {@link PartitionMvStorageAccessImpl} testing. */ +public class PartitionMvStorageAccessImplTest extends BaseIgniteAbstractTest { private static final int TABLE_ID = 1; private static final int INDEX_ID_0 = 2; @@ -109,7 +109,7 @@ void testLastAppliedIndex() { MvPartitionStorage mvPartitionStorage = createMvPartition(mvTableStorage, TEST_PARTITION_ID); - PartitionStorageAccess mvPartitionAccess = createPartitionAccessImpl(mvTableStorage); + PartitionMvStorageAccess mvPartitionAccess = createPartitionAccessImpl(mvTableStorage); assertEquals(0, mvPartitionAccess.lastAppliedIndex()); @@ -128,7 +128,7 @@ void testLastAppliedTerm() { MvPartitionStorage mvPartitionStorage = createMvPartition(mvTableStorage, TEST_PARTITION_ID); - PartitionStorageAccess mvPartitionAccess = createPartitionAccessImpl(mvTableStorage); + PartitionMvStorageAccess mvPartitionAccess = createPartitionAccessImpl(mvTableStorage); assertEquals(0, mvPartitionAccess.lastAppliedTerm()); @@ -159,7 +159,7 @@ void testAddWrite() { FullStateTransferIndexChooser fullStateTransferIndexChooser = mock(FullStateTransferIndexChooser.class); - PartitionStorageAccess mvPartitionAccess = + PartitionMvStorageAccess mvPartitionAccess = createPartitionAccessImpl(mvTableStorage, indexUpdateHandler, fullStateTransferIndexChooser); List indexIds = List.of( @@ -210,7 +210,7 @@ void testAddWriteCommitted() { FullStateTransferIndexChooser fullStateTransferIndexChooser = mock(FullStateTransferIndexChooser.class); - PartitionStorageAccess mvPartitionAccess = + PartitionMvStorageAccess mvPartitionAccess = createPartitionAccessImpl(mvTableStorage, indexUpdateHandler, fullStateTransferIndexChooser); List indexIdTableVersionList = List.of( @@ -257,8 +257,8 @@ private static MvPartitionStorage createMvPartition(MvTableStorage tableStorage, return createMvPartitionFuture.join(); } - private static PartitionStorageAccessImpl createPartitionAccessImpl(MvTableStorage mvTableStorage) { - return new PartitionStorageAccessImpl( + private static PartitionMvStorageAccessImpl createPartitionAccessImpl(MvTableStorage mvTableStorage) { + return new PartitionMvStorageAccessImpl( TEST_PARTITION_ID, mvTableStorage, mock(MvGc.class), @@ -270,12 +270,12 @@ private static PartitionStorageAccessImpl createPartitionAccessImpl(MvTableStora ); } - private static PartitionStorageAccessImpl createPartitionAccessImpl( + private static PartitionMvStorageAccessImpl createPartitionAccessImpl( MvTableStorage mvTableStorage, IndexUpdateHandler indexUpdateHandler, FullStateTransferIndexChooser fullStateTransferIndexChooser ) { - return new PartitionStorageAccessImpl( + return new PartitionMvStorageAccessImpl( TEST_PARTITION_ID, mvTableStorage, mock(MvGc.class),