Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sashapolo committed Feb 13, 2025
1 parent 3116074 commit 4ded5d4
Show file tree
Hide file tree
Showing 34 changed files with 647 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
import org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionStorageAccess;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
Expand Down Expand Up @@ -1427,7 +1427,7 @@ public void loadTableListenerToZoneReplica(
TablePartitionId tablePartitionId,
Function<RaftCommandRunner, ReplicaListener> tablePartitionReplicaListenerFactory,
RaftTableProcessor raftTableProcessor,
PartitionStorageAccess partitionStorageAccess
PartitionMvStorageAccess partitionMvStorageAccess
) {
Listeners listeners = listenersByZonePartitionId.get(zonePartitionId);

Expand All @@ -1442,7 +1442,7 @@ public void loadTableListenerToZoneReplica(

listeners.raftListener.addTableProcessor(tablePartitionId, raftTableProcessor);

listeners.snapshotStorageFactory.addMvPartition(tablePartitionId.tableId(), partitionStorageAccess);
listeners.snapshotStorageFactory.addMvPartition(tablePartitionId.tableId(), partitionMvStorageAccess);
}

private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId, Supplier<CompletableFuture<T>> action) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.partition.replicator.raft;

import static java.lang.Math.max;
import static java.util.concurrent.CompletableFuture.allOf;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;

/**
* Handler for the {@link RaftGroupListener#onSnapshotSave} event.
*/
public class OnSnapshotSaveHandler {
private final TxStatePartitionStorage txStatePartitionStorage;

private final PendingComparableValuesTracker<Long, Void> storageIndexTracker;

public OnSnapshotSaveHandler(
TxStatePartitionStorage txStatePartitionStorage,
PendingComparableValuesTracker<Long, Void> storageIndexTracker
) {
this.txStatePartitionStorage = txStatePartitionStorage;
this.storageIndexTracker = storageIndexTracker;
}

/**
* Called when {@link RaftGroupListener#onSnapshotSave} is triggered.
*/
public CompletableFuture<Void> onSnapshotSave(Collection<RaftTableProcessor> tableProcessors) {
// The max index here is required for local recovery and a possible scenario
// of false node failure when we actually have all required data. This might happen because we use the minimal index
// among storages on a node restart.
// Let's consider a more detailed example:
// 1) We don't propagate the maximal lastAppliedIndex among storages, and onSnapshotSave finishes, it leads to the raft log
// truncation until the maximal lastAppliedIndex.
// 2) Unexpected cluster restart happens.
// 3) Local recovery of a node is started, where we request data from the minimal lastAppliedIndex among storages, because
// some data for some node might not have been flushed before unexpected cluster restart.
// 4) When we try to restore data starting from the minimal lastAppliedIndex, we come to the situation
// that a raft node doesn't have such data, because the truncation until the maximal lastAppliedIndex from 1) has happened.
// 5) Node cannot finish local recovery.

long maxPartitionLastAppliedIndex = tableProcessors.stream()
.mapToLong(RaftTableProcessor::lastAppliedIndex)
.max()
.orElse(0);

long maxPartitionLastAppliedTerm = tableProcessors.stream()
.mapToLong(RaftTableProcessor::lastAppliedTerm)
.max()
.orElse(0);

long maxLastAppliedIndex = max(maxPartitionLastAppliedIndex, txStatePartitionStorage.lastAppliedIndex());

long maxLastAppliedTerm = max(maxPartitionLastAppliedTerm, txStatePartitionStorage.lastAppliedTerm());

tableProcessors.forEach(processor -> processor.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm));

txStatePartitionStorage.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm);

updateTrackerIgnoringTrackerClosedException(storageIndexTracker, maxLastAppliedIndex);

Stream<CompletableFuture<?>> flushFutures = Stream.concat(
tableProcessors.stream().map(RaftTableProcessor::flushStorage),
Stream.of(txStatePartitionStorage.flush())
);

return allOf(flushFutures.toArray(CompletableFuture[]::new));
}

private static <T extends Comparable<T>> void updateTrackerIgnoringTrackerClosedException(
PendingComparableValuesTracker<T, Void> tracker,
T newValue
) {
try {
tracker.update(newValue, null);
} catch (TrackerClosedException ignored) {
// No-op.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void onConfigurationCommitted(
long lastAppliedIndex();

/**
* Returns the last applied Raft term.
* Returns the term of the last applied Raft index.
*/
long lastAppliedTerm();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@

package org.apache.ignite.internal.partition.replicator.raft;

import static java.lang.Math.max;
import static java.util.concurrent.CompletableFuture.allOf;

import java.io.Serializable;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.logger.IgniteLogger;
Expand Down Expand Up @@ -71,8 +66,6 @@ public class ZonePartitionRaftListener implements RaftGroupListener {

private final PartitionKey partitionKey;

private final TxStatePartitionStorage txStatePartitionStorage;

/**
* Latest committed configuration of the zone-wide Raft group.
*
Expand All @@ -98,6 +91,8 @@ private static class CommittedConfiguration {

private final FinishTxCommandHandler finishTxCommandHandler;

private final OnSnapshotSaveHandler onSnapshotSaveHandler;

/** Constructor. */
public ZonePartitionRaftListener(
TxStatePartitionStorage txStatePartitionStorage,
Expand All @@ -110,7 +105,6 @@ public ZonePartitionRaftListener(
this.safeTimeTracker = safeTimeTracker;
this.storageIndexTracker = storageIndexTracker;
this.partitionsSnapshots = partitionsSnapshots;
this.txStatePartitionStorage = txStatePartitionStorage;
this.partitionKey = new ZonePartitionKey(zonePartitionId.zoneId(), zonePartitionId.partitionId());

finishTxCommandHandler = new FinishTxCommandHandler(
Expand All @@ -119,6 +113,8 @@ public ZonePartitionRaftListener(
new TablePartitionId(zonePartitionId.zoneId(), zonePartitionId.partitionId()),
txManager
);

onSnapshotSaveHandler = new OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker);
}

@Override
Expand Down Expand Up @@ -228,32 +224,7 @@ public void onConfigurationCommitted(RaftGroupConfiguration config, long lastApp

@Override
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
long maxPartitionLastAppliedIndex = tableProcessors.values().stream()
.mapToLong(RaftTableProcessor::lastAppliedIndex)
.max()
.orElse(0);

long maxPartitionLastAppliedTerm = tableProcessors.values().stream()
.mapToLong(RaftTableProcessor::lastAppliedTerm)
.max()
.orElse(0);

long maxLastAppliedIndex = max(maxPartitionLastAppliedIndex, txStatePartitionStorage.lastAppliedIndex());

long maxLastAppliedTerm = max(maxPartitionLastAppliedTerm, txStatePartitionStorage.lastAppliedTerm());

tableProcessors.values().forEach(processor -> processor.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm));

txStatePartitionStorage.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm);

updateTrackerIgnoringTrackerClosedException(storageIndexTracker, maxLastAppliedIndex);

Stream<CompletableFuture<?>> flushFutures = Stream.concat(
tableProcessors.values().stream().map(RaftTableProcessor::flushStorage),
Stream.of(txStatePartitionStorage.flush())
);

allOf(flushFutures.toArray(CompletableFuture[]::new))
onSnapshotSaveHandler.onSnapshotSave(tableProcessors.values())
.whenComplete((unused, throwable) -> doneClo.accept(throwable));
}

Expand Down Expand Up @@ -300,17 +271,7 @@ private static <T extends Comparable<T>> void updateTrackerIgnoringTrackerClosed
}

private void cleanupSnapshots() {
PartitionSnapshots partitionSnapshots = partitionSnapshots();

partitionSnapshots.acquireReadLock();

try {
partitionSnapshots.ongoingSnapshots().forEach(snapshot -> partitionsSnapshots.finishOutgoingSnapshot(snapshot.id()));

partitionsSnapshots.removeSnapshots(partitionKey);
} finally {
partitionSnapshots.releaseReadLock();
}
partitionsSnapshots.cleanupOutgoingSnapshots(partitionKey);
}

private PartitionSnapshots partitionSnapshots() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.jetbrains.annotations.Nullable;

/**
* Small abstractions for partition storages that includes only methods, mandatory for the snapshot storage.
* Small abstractions for MV partition storages that includes only methods, mandatory for the snapshot storage.
*/
public interface PartitionStorageAccess {
public interface PartitionMvStorageAccess {
/** Table ID of the table that this partition storage is associated with. */
int tableId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class PartitionSnapshotStorage implements SnapshotStorage {
*
* <p>This map is modified externally by the {@link PartitionSnapshotStorageFactory}.
*/
private final Int2ObjectMap<PartitionStorageAccess> partitionsByTableId;
private final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId;

private final PartitionTxStateAccess txState;

Expand Down Expand Up @@ -108,7 +108,7 @@ public PartitionSnapshotStorage(
OutgoingSnapshotsManager outgoingSnapshotsManager,
String snapshotUri,
RaftOptions raftOptions,
Int2ObjectMap<PartitionStorageAccess> partitionsByTableId,
Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId,
PartitionTxStateAccess txState,
CatalogService catalogService,
@Nullable SnapshotMeta startupSnapshotMeta,
Expand Down Expand Up @@ -147,7 +147,7 @@ public PartitionSnapshotStorage(
OutgoingSnapshotsManager outgoingSnapshotsManager,
String snapshotUri,
RaftOptions raftOptions,
Int2ObjectMap<PartitionStorageAccess> partitionsByTableId,
Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId,
PartitionTxStateAccess txState,
CatalogService catalogService,
@Nullable SnapshotMeta startupSnapshotMeta,
Expand Down Expand Up @@ -202,7 +202,7 @@ public RaftOptions raftOptions() {
/**
* Returns partitions by table ID.
*/
public Int2ObjectMap<PartitionStorageAccess> partitionsByTableId() {
public Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId() {
return partitionsByTableId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
/**
* Partition storages grouped by table ID.
*/
private final Int2ObjectMap<PartitionStorageAccess> partitionsByTableId = synchronize(new Int2ObjectOpenHashMap<>());
private final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId = synchronize(new Int2ObjectOpenHashMap<>());

private final PartitionTxStateAccess txStateStorage;

Expand Down Expand Up @@ -91,8 +91,8 @@ public PartitionSnapshotStorageFactory(
/**
* Adds a given table partition storage to the snapshot storage, managed by this factory.
*/
public void addMvPartition(int tableId, PartitionStorageAccess partition) {
PartitionStorageAccess prev = partitionsByTableId.put(tableId, partition);
public void addMvPartition(int tableId, PartitionMvStorageAccess partition) {
PartitionMvStorageAccess prev = partitionsByTableId.put(tableId, partition);

assert prev == null : "Partition storage for table ID " + tableId + " already exists.";
}
Expand Down Expand Up @@ -121,7 +121,7 @@ public void removeMvPartition(int tableId) {
// We must choose the minimum applied index for local recovery so that we don't skip the raft commands for the storage with the
// lowest applied index and thus no data loss occurs.
return partitionsByTableId.values().stream()
.min(comparingLong(PartitionStorageAccess::lastAppliedIndex))
.min(comparingLong(PartitionMvStorageAccess::lastAppliedIndex))
.map(storageWithMinLastAppliedIndex -> {
long minLastAppliedIndex = min(storageWithMinLastAppliedIndex.lastAppliedIndex(), txStateStorage.lastAppliedIndex());

Expand All @@ -132,7 +132,7 @@ public void removeMvPartition(int tableId) {
int lastCatalogVersionAtStart = catalogService.latestCatalogVersion();

return snapshotMetaAt(
min(storageWithMinLastAppliedIndex.lastAppliedIndex(), txStateStorage.lastAppliedIndex()),
minLastAppliedIndex,
min(storageWithMinLastAppliedIndex.lastAppliedTerm(), txStateStorage.lastAppliedTerm()),
Objects.requireNonNull(storageWithMinLastAppliedIndex.committedGroupConfiguration()),
lastCatalogVersionAtStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Small abstractions for TX storages that includes only methods, mandatory for the snapshot storage.
*
* @see PartitionStorageAccess
* @see PartitionMvStorageAccess
*/
public interface PartitionTxStateAccess {
/**
Expand All @@ -51,18 +51,19 @@ public interface PartitionTxStateAccess {
long lastAppliedTerm();

/**
* Prepares the TX storage for rebalance with the same guarantees and requirements as {@link PartitionStorageAccess#startRebalance}.
* Prepares the TX storage for rebalance with the same guarantees and requirements as {@link PartitionMvStorageAccess#startRebalance}.
*/
CompletableFuture<Void> startRebalance();

/**
* Aborts an ongoing TX storage rebalance rebalance with the same guarantees and requirements as
* {@link PartitionStorageAccess#abortRebalance}.
* {@link PartitionMvStorageAccess#abortRebalance}.
*/
CompletableFuture<Void> abortRebalance();

/**
* Completes rebalancing of the TX storage with the same guarantees and requirements as {@link PartitionStorageAccess#finishRebalance}.
* Completes rebalancing of the TX storage with the same guarantees and requirements as
* {@link PartitionMvStorageAccess#finishRebalance}.
*/
CompletableFuture<Void> finishRebalance(RaftSnapshotPartitionMeta partitionMeta);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.jetbrains.annotations.Nullable;

/**
* Partition metadata for {@link PartitionStorageAccess}.
* Partition metadata for {@link PartitionMvStorageAccess}.
*/
public class RaftSnapshotPartitionMeta extends PrimitivePartitionMeta {
private final RaftGroupConfiguration raftGroupConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ZonePartitionKey implements PartitionKey {
/**
* Returns ID of the zone.
*/
public int tableId() {
public int zoneId() {
return zoneId;
}

Expand Down
Loading

0 comments on commit 4ded5d4

Please sign in to comment.