Skip to content

Commit

Permalink
18.06.2024 Merge 'apache/main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
GG Integration User committed Jun 18, 2024
2 parents 996281c + 0b1b159 commit 0486837
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.metastorage.impl;

import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
Expand All @@ -27,8 +28,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import org.apache.ignite.configuration.ConfigurationValue;
Expand Down Expand Up @@ -69,6 +72,7 @@
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
Expand Down Expand Up @@ -143,6 +147,9 @@ public class MetaStorageManagerImpl

private volatile MetaStorageListener learnerListener;

// TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove, cache eviction should be triggered by MS GC instead.
private final ScheduledExecutorService idempotentCacheVacumizer;

/**
* The constructor.
*
Expand Down Expand Up @@ -178,6 +185,8 @@ public MetaStorageManagerImpl(
this.metricManager = metricManager;
this.idempotentCacheTtl = idempotentCacheTtl;
this.maxClockSkewMillisFuture = maxClockSkewMillisFuture;
this.idempotentCacheVacumizer = Executors.newSingleThreadScheduledExecutor(
NamedThreadFactory.create(clusterService.nodeName(), "idempotent-cache-vacumizer", LOG));
}

/**
Expand Down Expand Up @@ -442,6 +451,8 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {

busyLock.block();

idempotentCacheVacumizer.shutdownNow();

deployWatchesFuture.cancel(true);

recoveryFinishedFuture.cancel(true);
Expand Down Expand Up @@ -506,6 +517,8 @@ public void onRevisionApplied(long revision) {
MetaStorageManagerImpl.this.onRevisionApplied(revision);
}
});

idempotentCacheVacumizer.scheduleWithFixedDelay(this::evictIdempotentCommandsCache, 1, 1, MINUTES);
}))
.whenComplete((v, e) -> {
if (e == null) {
Expand Down Expand Up @@ -895,7 +908,6 @@ public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart() {
/**
* Removes obsolete entries from both volatile and persistent idempotent command cache.
*/
@TestOnly
@Deprecated(forRemoval = true)
// TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction should be triggered by MS GC instead.
public void evictIdempotentCommandsCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.command.GetAllCommand;
Expand All @@ -48,7 +47,6 @@
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
* Meta storage listener.
Expand Down Expand Up @@ -193,10 +191,9 @@ public void onShutdown() {
/**
* Removes obsolete entries from both volatile and persistent idempotent command cache.
*/
@TestOnly
@Deprecated(forRemoval = true)
// TODO: https://issues.apache.org/jira/browse/IGNITE-19417 cache eviction should be triggered by MS GC instead.
public void evictIdempotentCommandsCache() {
writeHandler.evictIdempotentCommandsCache(HybridTimestamp.MIN_VALUE);
writeHandler.evictIdempotentCommandsCache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ void handleWriteCommand(CommandClosure<WriteCommand> clo) {
IdempotentCommand idempotentCommand = ((IdempotentCommand) command);
CommandId commandId = idempotentCommand.id();

// TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove.
if (idempotentCommand.safeTime().getPhysical() % 100 == 0) {
evictIdempotentCommandsCache(((IdempotentCommand) command).safeTime());
}

IdempotentCommandCachedResult cachedResult = idempotentCommandCache.get(commandId);

if (cachedResult != null) {
Expand Down Expand Up @@ -378,34 +373,36 @@ void onSnapshotLoad() {

/**
* Removes obsolete entries from both volatile and persistent idempotent command cache.
*
* @param safeTime Trigger operation safe time. TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Remove.
*/
// TODO: https://issues.apache.org/jira/browse/IGNITE-19417 Call on meta storage compaction.
void evictIdempotentCommandsCache(HybridTimestamp safeTime) {
void evictIdempotentCommandsCache() {
HybridTimestamp cleanupTimestamp = clusterTime.now();
LOG.info("Idempotent command cache cleanup started [cleanupTimestamp={}].", cleanupTimestamp);

maxClockSkewMillisFuture.thenAccept(maxClockSkewMillis -> {
List<CommandId> commandIdsToRemove = idempotentCommandCache.entrySet().stream()
.filter(entry -> entry.getValue().commandStartTime.getPhysical()
<= cleanupTimestamp.getPhysical() - (idempotentCacheTtl.value() + maxClockSkewMillis.getAsLong()))
.map(entry -> entry.getKey())
.map(Map.Entry::getKey)
.collect(toList());

if (!commandIdsToRemove.isEmpty()) {
List<byte[]> commandIdStorageKeys = commandIdsToRemove.stream()
.map(commandId -> ArrayUtils.concat(new byte[]{}, ByteUtils.toBytes(commandId)))
.collect(toList());

storage.removeAll(commandIdStorageKeys, safeTime);
storage.removeAll(commandIdStorageKeys, null);

commandIdsToRemove.forEach(idempotentCommandCache.keySet()::remove);
}

LOG.info("Idempotent command cache cleanup finished [cleanupTimestamp={}, cleanupCompletionTimestamp={},"
+ " removedEntriesCount={}, cacheSize={}].", cleanupTimestamp, clusterTime.now(), commandIdsToRemove.size(),
idempotentCommandCache.size());
+ " removedEntriesCount={}, cacheSize={}].",
cleanupTimestamp,
clusterTime.now(),
commandIdsToRemove.size(),
idempotentCommandCache.size()
);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,7 @@ public void tableRecoveryOnMultipleRestartingNodes(int nodeThatWrittenAssignment
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-22521")
public void testSequentialAsyncTableCreationThenAlterZoneThenRestartOnMsSnapshot() throws InterruptedException {
IgniteImpl node0 = startNode(0);
IgniteImpl node1 = startNode(1);
Expand Down

0 comments on commit 0486837

Please sign in to comment.