From bc6c42757419e8c17c36f7fb0e9eebd71b39e735 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Mon, 15 Jul 2024 16:55:11 +0300 Subject: [PATCH] IGNITE-22715 Support HybridTimestamp in Network serialization (#4078) --- .../MessageCollectionItemType.java | 6 +- .../DistributionZoneRebalanceEngineTest.java | 5 +- .../RebalanceUtilUpdateAssignmentsTest.java | 5 +- .../impl/ItIdempotentCommandCacheTest.java | 6 +- .../command/MetaStorageWriteCommand.java | 27 ++----- .../impl/MetaStorageServiceImpl.java | 14 ++-- .../server/raft/MetaStorageWriteHandler.java | 2 +- .../impl/IdempotentCommandCacheTest.java | 16 ++--- .../serialization/BaseMethodNameResolver.java | 7 +- .../MessageCollectionItemTypeConverter.java | 7 +- .../network/annotations/Transferable.java | 2 + .../network/serialization/MessageReader.java | 8 +++ .../network/serialization/MessageWriter.java | 10 +++ .../network/direct/DirectMessageReader.java | 18 +++++ .../network/direct/DirectMessageWriter.java | 11 +++ .../network/messages/AllTypesMessage.java | 3 + .../network/command/FinishTxCommand.java | 15 +--- .../command/TimedBinaryRowMessage.java | 9 +-- .../network/command/UpdateAllCommand.java | 4 +- .../network/command/UpdateCommand.java | 8 +-- .../command/WriteIntentSwitchCommand.java | 15 +--- .../replication/ReadOnlyReplicaRequest.java | 6 +- .../message/LeaseGrantedMessage.java | 14 +--- .../negotiation/LeaseNegotiator.java | 4 +- .../ignite/raft/jraft/core/NodeImpl.java | 10 +-- .../ignite/raft/jraft/core/Replicator.java | 5 +- .../ignite/raft/jraft/rpc/RpcRequests.java | 12 +--- .../internal/replicator/ReplicaManager.java | 6 +- .../command/SafeTimePropagatingCommand.java | 18 ++--- .../replicator/message/TimestampAware.java | 20 +----- .../PlacementDriverReplicaSideTest.java | 4 +- .../sql/engine/exec/ExchangeServiceImpl.java | 2 +- .../sql/engine/exec/ExecutionServiceImpl.java | 4 +- .../sql/engine/exec/UpdatableTableImpl.java | 6 +- .../sql/engine/message/QueryStartRequest.java | 15 +--- .../distributed/ReplicaUnavailableTest.java | 2 +- .../ReplicasSafeTimePropagationTest.java | 17 ++--- .../replicator/PartitionReplicaListener.java | 39 +++++------ .../replicator/TransactionStateResolver.java | 2 +- .../storage/InternalTableImpl.java | 34 ++++----- ...pdateCommandsMarshalingMicroBenchmark.java | 13 ++-- ...artitionRaftCommandsSerializationTest.java | 17 +++-- .../raft/PartitionCommandListenerTest.java | 54 +++++++------- ...titionReplicaListenerIndexLockingTest.java | 4 ++ .../PartitionReplicaListenerTest.java | 70 ++++++++++++------- .../org/apache/ignite/internal/tx/TxMeta.java | 3 +- .../ignite/internal/tx/TxStateMeta.java | 3 +- .../internal/tx/TxStateMetaAbandoned.java | 3 +- .../internal/tx/TxStateMetaFinishing.java | 3 +- .../tx/impl/TxCleanupRequestHandler.java | 6 +- .../internal/tx/impl/TxMessageSender.java | 15 ++-- .../tx/message/TransactionMetaMessage.java | 11 +-- .../internal/tx/message/TxCleanupMessage.java | 19 +---- .../tx/message/TxFinishReplicaRequest.java | 15 +--- .../tx/message/TxStateCoordinatorRequest.java | 8 +-- .../WriteIntentSwitchReplicaRequest.java | 19 +---- 56 files changed, 300 insertions(+), 381 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java index 38fbca9b7dd..3551df0fa12 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java @@ -17,6 +17,7 @@ package org.apache.ignite.plugin.extensions.communication; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.jetbrains.annotations.Nullable; /** @@ -87,7 +88,10 @@ public enum MessageCollectionItemType { IGNITE_UUID, /** Message. */ - MSG; + MSG, + + /** {@link HybridTimestamp}. */ + HYBRID_TIMESTAMP; /** Enum values. */ private static final MessageCollectionItemType[] VALS = values(); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java index 1fd8f549b67..8aebee4ff9a 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java @@ -30,6 +30,7 @@ import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; @@ -206,7 +207,7 @@ public void setUp() { Command cmd = invocationClose.getArgument(0); if (cmd instanceof MetaStorageWriteCommand) { - ((MetaStorageWriteCommand) cmd).safeTimeLong(10); + ((MetaStorageWriteCommand) cmd).safeTime(hybridTimestamp(10)); } long commandIndex = raftIndex.incrementAndGet(); @@ -254,7 +255,7 @@ public void result(@Nullable Serializable r) { MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand() .iif(iif) .id(commandIdGenerator.newId()) - .initiatorTimeLong(clusterTime.nowLong()) + .initiatorTime(clusterTime.now()) .build(); return metaStorageService.run(multiInvokeCommand); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java index ad0e0a8fef9..e87547b909b 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition; import static org.apache.ignite.internal.affinity.Assignments.toBytes; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -154,7 +155,7 @@ public void setUp() { long commandIndex = raftIndex.incrementAndGet(); if (cmd instanceof MetaStorageWriteCommand) { - ((MetaStorageWriteCommand) cmd).safeTimeLong(10); + ((MetaStorageWriteCommand) cmd).safeTime(hybridTimestamp(10)); } CompletableFuture res = new CompletableFuture<>(); @@ -200,7 +201,7 @@ public void result(@Nullable Serializable r) { MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand() .iif(iif) .id(commandIdGenerator.newId()) - .initiatorTimeLong(clusterTime.nowLong()) + .initiatorTime(clusterTime.now()) .build(); return metaStorageService.run(multiInvokeCommand); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java index f7b65a8142a..da3cf1b836e 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java @@ -480,7 +480,7 @@ private static IdempotentCommand buildKeyNotExistsInvokeCommand( .condition(notExists(testKey)) .success(List.of(put(testKey, testValue))) .failure(List.of(put(testKey, anotherValue))) - .initiatorTimeLong(clock.nowLong()) + .initiatorTime(clock.now()) .id(commandIdGenerator.newId()) .build(); } @@ -504,8 +504,8 @@ private static IdempotentCommand buildKeyNotExistsMultiInvokeCommand( return CMD_FACTORY.multiInvokeCommand() .id(commandIdGenerator.newId()) .iif(iif) - .safeTimeLong(clock.now().longValue()) - .initiatorTimeLong(clock.now().longValue()) + .safeTime(clock.now()) + .initiatorTime(clock.now()) .build(); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java index 91f60948d71..abe497f4710 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetaStorageWriteCommand.java @@ -17,28 +17,18 @@ package org.apache.ignite.internal.metastorage.command; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.WithSetter; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.BeforeApplyHandler; +import org.jetbrains.annotations.Nullable; /** Base meta storage write command. */ public interface MetaStorageWriteCommand extends WriteCommand { - /** - * Returns time on the initiator node. - */ - long initiatorTimeLong(); - - /** - * Returns time on the initiator node. - */ - default HybridTimestamp initiatorTime() { - return hybridTimestamp(initiatorTimeLong()); - } + /** Time on the initiator node. */ + HybridTimestamp initiatorTime(); /** * This is a dirty hack. This time is set by the leader node to disseminate new safe time across @@ -47,19 +37,12 @@ default HybridTimestamp initiatorTime() { * command is saved into the Raft log (see {@link BeforeApplyHandler#onBeforeApply(Command)}. */ @WithSetter - long safeTimeLong(); + @Nullable HybridTimestamp safeTime(); /** * Setter for the safeTime field. */ - default void safeTimeLong(long safeTime) { + default void safeTime(HybridTimestamp safeTime) { // No-op. } - - /** - * Convenient getter for {@link #safeTimeLong()}. - */ - default HybridTimestamp safeTime() { - return hybridTimestamp(safeTimeLong()); - } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java index c855688f4b4..86b323f4e89 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java @@ -132,7 +132,7 @@ public CompletableFuture put(ByteArray key, byte[] value) { PutCommand putCommand = context.commandsFactory().putCommand() .key(ByteBuffer.wrap(key.bytes())) .value(ByteBuffer.wrap(value)) - .initiatorTimeLong(clusterTime.nowLong()) + .initiatorTime(clusterTime.now()) .build(); return context.raftService().run(putCommand); @@ -148,7 +148,7 @@ public CompletableFuture putAll(Map vals) { @Override public CompletableFuture remove(ByteArray key) { RemoveCommand removeCommand = context.commandsFactory().removeCommand().key(ByteBuffer.wrap(key.bytes())) - .initiatorTimeLong(clusterTime.nowLong()).build(); + .initiatorTime(clusterTime.now()).build(); return context.raftService().run(removeCommand); } @@ -175,7 +175,7 @@ public CompletableFuture invoke( .condition(condition) .success(success) .failure(failure) - .initiatorTimeLong(clusterTime.nowLong()) + .initiatorTime(clusterTime.now()) .id(commandIdGenerator.newId()) .build(); @@ -186,7 +186,7 @@ public CompletableFuture invoke( public CompletableFuture invoke(Iif iif) { MultiInvokeCommand multiInvokeCommand = context.commandsFactory().multiInvokeCommand() .iif(iif) - .initiatorTimeLong(clusterTime.nowLong()) + .initiatorTime(clusterTime.now()) .id(commandIdGenerator.newId()) .build(); @@ -248,7 +248,7 @@ public Publisher prefix(ByteArray prefix, long revUpperBound) { */ public CompletableFuture syncTime(HybridTimestamp safeTime, long term) { SyncTimeCommand syncTimeCommand = context.commandsFactory().syncTimeCommand() - .initiatorTimeLong(safeTime.longValue()) + .initiatorTime(safeTime) .initiatorTerm(term) .build(); @@ -312,7 +312,7 @@ private PutAllCommand putAllCommand(MetaStorageCommandsFactory commandsFactory, return commandsFactory.putAllCommand() .keys(keys) .values(values) - .initiatorTimeLong(ts.longValue()) + .initiatorTime(ts) .build(); } @@ -330,6 +330,6 @@ private RemoveAllCommand removeAllCommand(MetaStorageCommandsFactory commandsFac list.add(ByteBuffer.wrap(key.bytes())); } - return commandsFactory.removeAllCommand().keys(list).initiatorTimeLong(ts.longValue()).build(); + return commandsFactory.removeAllCommand().keys(list).initiatorTime(ts).build(); } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java index 647f3fde194..bc226520f33 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java @@ -336,7 +336,7 @@ boolean beforeApply(Command command) { clusterTime.adjust(writeCommand.initiatorTime()); - writeCommand.safeTimeLong(clusterTime.nowLong()); + writeCommand.safeTime(clusterTime.now()); return true; } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java index cfb068c5ba0..b79593ad5e3 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/IdempotentCommandCacheTest.java @@ -106,8 +106,8 @@ public void testIdempotentInvoke() { .condition(notExists(testKey)) .success(List.of(put(testKey, testValue.bytes()))) .failure(List.of(put(testKey, anotherValue.bytes()))) - .safeTimeLong(clock.now().longValue()) - .initiatorTimeLong(clock.now().longValue()) + .safeTime(clock.now()) + .initiatorTime(clock.now()) .build(); metaStorageListener.onWrite(commandIterator(command)); @@ -138,8 +138,8 @@ public void testIdempotentMultiInvoke() { MultiInvokeCommand command = CMD_FACTORY.multiInvokeCommand() .id(commandIdGenerator.newId()) .iif(iif) - .safeTimeLong(clock.now().longValue()) - .initiatorTimeLong(clock.now().longValue()) + .safeTime(clock.now()) + .initiatorTime(clock.now()) .build(); metaStorageListener.onWrite(commandIterator(command)); @@ -166,8 +166,8 @@ public void testNonIdempotentCommand() { PutCommand command0 = CMD_FACTORY.putCommand() .key(ByteBuffer.wrap(testKey.bytes())) .value(ByteBuffer.wrap(testValue0.bytes())) - .safeTimeLong(clock.now().longValue()) - .initiatorTimeLong(clock.now().longValue()) + .safeTime(clock.now()) + .initiatorTime(clock.now()) .build(); metaStorageListener.onWrite(commandIterator(command0)); @@ -183,8 +183,8 @@ public void testNonIdempotentCommand() { PutCommand command1 = CMD_FACTORY.putCommand() .key(ByteBuffer.wrap(testKey.bytes())) .value(ByteBuffer.wrap(testValue1.bytes())) - .safeTimeLong(clock.now().longValue()) - .initiatorTimeLong(clock.now().longValue()) + .safeTime(clock.now()) + .initiatorTime(clock.now()) .build(); metaStorageListener.onWrite(commandIterator(command1)); diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java index b2eb52ea790..98612d56eb0 100644 --- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java +++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java @@ -30,6 +30,7 @@ import javax.lang.model.type.PrimitiveType; import javax.lang.model.type.TypeKind; import javax.lang.model.type.TypeMirror; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteUuid; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.processor.ProcessingException; @@ -140,8 +141,10 @@ private String resolveReferenceMethodName(DeclaredType parameterType) { return "Map"; } else if (typeUtils.isSameType(parameterType, ByteBuffer.class)) { return "ByteBuffer"; - } else { - throw new ProcessingException("Unsupported reference type for message (de-)serialization: " + parameterType); + } else if (typeUtils.isSameType(parameterType, HybridTimestamp.class)) { + return "HybridTimestamp"; } + + throw new ProcessingException("Unsupported reference type for message (de-)serialization: " + parameterType); } } diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageCollectionItemTypeConverter.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageCollectionItemTypeConverter.java index 4e892526110..327d8c13b69 100644 --- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageCollectionItemTypeConverter.java +++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageCollectionItemTypeConverter.java @@ -25,6 +25,7 @@ import javax.lang.model.type.DeclaredType; import javax.lang.model.type.PrimitiveType; import javax.lang.model.type.TypeMirror; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteUuid; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.processor.ProcessingException; @@ -128,8 +129,10 @@ private MessageCollectionItemType fromDeclaredType(DeclaredType parameterType) { return MessageCollectionItemType.BIT_SET; } else if (typeUtils.isSameType(parameterType, ByteBuffer.class)) { return MessageCollectionItemType.BYTE_BUFFER; - } else { - throw new ProcessingException("Unsupported MessageCollectionItemType: " + parameterType); + } else if (typeUtils.isSameType(parameterType, HybridTimestamp.class)) { + return MessageCollectionItemType.HYBRID_TIMESTAMP; } + + throw new ProcessingException("Unsupported MessageCollectionItemType: " + parameterType); } } diff --git a/modules/network-api/src/main/java/org/apache/ignite/internal/network/annotations/Transferable.java b/modules/network-api/src/main/java/org/apache/ignite/internal/network/annotations/Transferable.java index ff33271612f..7c47a9b6ffd 100644 --- a/modules/network-api/src/main/java/org/apache/ignite/internal/network/annotations/Transferable.java +++ b/modules/network-api/src/main/java/org/apache/ignite/internal/network/annotations/Transferable.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.BitSet; import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteUuid; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.serialization.MessageDeserializer; @@ -72,6 +73,7 @@ *
  • Array of primitive types, corresponding boxed types or other directly marshallable types;
  • *
  • {@code Collection} of boxed primitive types or other directly marshallable types;
  • *
  • {@code Map} where both keys and values can be of a directly marshallable type.
  • + *
  • {@link HybridTimestamp}.
  • * * *

    After all marked interfaces in a module have been processed, the processor will use the diff --git a/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageReader.java b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageReader.java index 0cb2afa9b03..615b5b1ba83 100644 --- a/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageReader.java +++ b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageReader.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteUuid; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -304,6 +305,13 @@ public interface MessageReader { */ IgniteUuid readIgniteUuid(String name); + /** + * Reads an {@link HybridTimestamp}. + * + * @param name Field name. + */ + @Nullable HybridTimestamp readHybridTimestamp(String name); + /** * Reads a group type or a message type of the message. Unlike regular {@link #readShort(String)}, this method never accepts a field * name, because there are no names assigned to the header values. diff --git a/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageWriter.java b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageWriter.java index 8d312203279..ab7bd0f144c 100644 --- a/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageWriter.java +++ b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageWriter.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteUuid; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -339,6 +340,15 @@ public interface MessageWriter { */ boolean writeIgniteUuid(String name, IgniteUuid val); + /** + * Writes an {@link HybridTimestamp}. + * + * @param name Field name. + * @param val {@link HybridTimestamp}. + * @return Whether a value was fully written. + */ + boolean writeHybridTimestamp(String name, @Nullable HybridTimestamp val); + /** * Writes a nested message. * diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java index fea8290e165..fe135f9f3fb 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.network.direct; +import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; + import java.nio.ByteBuffer; import java.util.BitSet; import java.util.Collection; @@ -24,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteUuid; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.direct.state.DirectMessageState; @@ -435,6 +438,21 @@ public IgniteUuid readIgniteUuid(String name) { return val; } + @Override + public @Nullable HybridTimestamp readHybridTimestamp(String name) { + DirectByteBufferStream stream = state.item().stream; + + long val = stream.readLong(); + + lastRead = stream.lastFinished(); + + if (!lastRead) { + return null; + } + + return nullableHybridTimestamp(val); + } + @Override public short readHeaderShort() { DirectByteBufferStream stream = state.item().stream; diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java index 5da16a51ac3..5c7d5b7b5fb 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.network.direct; +import static org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP; import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; import java.nio.ByteBuffer; @@ -26,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteUuid; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.direct.state.DirectMessageState; @@ -397,6 +399,15 @@ public boolean writeIgniteUuid(String name, IgniteUuid val) { return stream.lastFinished(); } + @Override + public boolean writeHybridTimestamp(String name, @Nullable HybridTimestamp val) { + DirectByteBufferStream stream = this.stream; + + stream.writeLong(val == null ? NULL_HYBRID_TIMESTAMP : val.longValue()); + + return stream.lastFinished(); + } + /** {@inheritDoc} */ @Override public boolean writeMessage(String name, @Nullable NetworkMessage msg) { diff --git a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/messages/AllTypesMessage.java b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/messages/AllTypesMessage.java index ddffdffc085..a01f932b515 100644 --- a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/messages/AllTypesMessage.java +++ b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/messages/AllTypesMessage.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteUuid; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; @@ -124,4 +125,6 @@ public interface AllTypesMessage extends NetworkMessage, Serializable { @IgniteToStringInclude(sensitive = true) @Nullable String sensitiveString(); + + @Nullable HybridTimestamp hybridTs(); } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/FinishTxCommand.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/FinishTxCommand.java index 63ab0ffdb18..2e2d858c18f 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/FinishTxCommand.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/FinishTxCommand.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.partition.replicator.network.command; -import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; - import java.util.List; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Transferable; @@ -36,17 +34,8 @@ public interface FinishTxCommand extends PartitionCommand { */ boolean commit(); - /** - * Returns a transaction commit timestamp. - */ - long commitTimestampLong(); - - /** - * Returns a transaction commit timestamp. - */ - default @Nullable HybridTimestamp commitTimestamp() { - return nullableHybridTimestamp(commitTimestampLong()); - } + /** Transaction commit timestamp. */ + @Nullable HybridTimestamp commitTimestamp(); /** Returns ordered replication groups IDs. */ List partitionIds(); diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/TimedBinaryRowMessage.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/TimedBinaryRowMessage.java index aea0b3a3a1e..baf637fa6e3 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/TimedBinaryRowMessage.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/TimedBinaryRowMessage.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.partition.replicator.network.command; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; @@ -36,12 +37,8 @@ public interface TimedBinaryRowMessage extends NetworkMessage { */ @Nullable BinaryRowMessage binaryRowMessage(); - /** - * Gets a timestamp. - * - * @return Timestamp as long. - */ - long timestamp(); + /** Timestamp. */ + @Nullable HybridTimestamp timestamp(); /** * Gets a binary row form this message or {@code null} if the binary row message is {@code null}. diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java index b65a6c12b1a..c29511f9e36 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.partition.replicator.network.command; -import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; - import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -54,7 +52,7 @@ default Map rowsToUpdate() { if (!CollectionUtils.nullOrEmpty(timedRowMap)) { timedRowMap.forEach( - (uuid, trMsg) -> map.put(uuid, new TimedBinaryRow(trMsg.binaryRow(), nullableHybridTimestamp(trMsg.timestamp())))); + (uuid, trMsg) -> map.put(uuid, new TimedBinaryRow(trMsg.binaryRow(), trMsg.timestamp()))); } return map; diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java index 879989b7c77..7385a6cf6db 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.partition.replicator.network.command; -import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; - import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Transferable; @@ -50,12 +48,10 @@ public interface UpdateCommand extends PartitionCommand { return tsRoMsg == null ? null : tsRoMsg.binaryRow(); } - /** - * Returns the timestamp of the last committed entry. - */ + /** Returns the timestamp of the last committed entry. */ default @Nullable HybridTimestamp lastCommitTimestamp() { TimedBinaryRowMessage tsRoMsg = messageRowToUpdate(); - return tsRoMsg == null ? null : nullableHybridTimestamp(tsRoMsg.timestamp()); + return tsRoMsg == null ? null : tsRoMsg.timestamp(); } } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/WriteIntentSwitchCommand.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/WriteIntentSwitchCommand.java index fb7ab45b25b..a65876783d4 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/WriteIntentSwitchCommand.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/WriteIntentSwitchCommand.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.partition.replicator.network.command; -import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; - import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; @@ -34,15 +32,6 @@ public interface WriteIntentSwitchCommand extends PartitionCommand { */ boolean commit(); - /** - * Returns a transaction commit timestamp. - */ - long commitTimestampLong(); - - /** - * Returns a transaction commit timestamp. - */ - default @Nullable HybridTimestamp commitTimestamp() { - return nullableHybridTimestamp(commitTimestampLong()); - } + /** Transaction commit timestamp. */ + @Nullable HybridTimestamp commitTimestamp(); } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java index 6671a67739f..1acdbba89e0 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/ReadOnlyReplicaRequest.java @@ -24,9 +24,5 @@ * Read only replica request. */ public interface ReadOnlyReplicaRequest extends ReplicaRequest { - long readTimestampLong(); - - default HybridTimestamp readTimestamp() { - return HybridTimestamp.hybridTimestamp(readTimestampLong()); - } + HybridTimestamp readTimestamp(); } diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java index e6968a47bf7..ed058f2757a 100644 --- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.placementdriver.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Transferable; @@ -27,17 +25,9 @@ */ @Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE) public interface LeaseGrantedMessage extends PlacementDriverReplicaMessage { - long leaseStartTimeLong(); - - default HybridTimestamp leaseStartTime() { - return hybridTimestamp(leaseStartTimeLong()); - } - - long leaseExpirationTimeLong(); + HybridTimestamp leaseStartTime(); - default HybridTimestamp leaseExpirationTime() { - return hybridTimestamp(leaseExpirationTimeLong()); - } + HybridTimestamp leaseExpirationTime(); boolean force(); } diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java index ca3b719888f..2e9f2160bb0 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseNegotiator.java @@ -78,8 +78,8 @@ public void negotiate(Lease lease, boolean force) { lease.getLeaseholder(), PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessage() .groupId(groupId) - .leaseStartTimeLong(lease.getStartTime().longValue()) - .leaseExpirationTimeLong(lease.getExpirationTime().longValue()) + .leaseStartTime(lease.getStartTime()) + .leaseExpirationTime(lease.getExpirationTime()) .force(force) .build(), leaseInterval) diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index 57202200b5a..0227293c3e4 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -2160,7 +2160,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi .term(this.currTerm); if (request.timestamp() != null) { - rb.timestampLong(clock.update(request.timestamp()).longValue()); + rb.timestamp(clock.update(request.timestamp())); } return rb.build(); @@ -2181,7 +2181,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi .term(request.term() + 1); if (request.timestamp() != null) { - rb.timestampLong(clock.update(request.timestamp()).longValue()); + rb.timestamp(clock.update(request.timestamp())); } return rb.build(); @@ -2214,7 +2214,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi .lastLogIndex(lastLogIndex); if (request.timestamp() != null) { - rb.timestampLong(clock.update(request.timestamp()).longValue()); + rb.timestamp(clock.update(request.timestamp())); } return rb.build(); @@ -2228,7 +2228,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi .term(this.currTerm) .lastLogIndex(this.logManager.getLastLogIndex()); if (request.timestamp() != null) { - respBuilder.timestampLong(clock.update(request.timestamp()).longValue()); + respBuilder.timestamp(clock.update(request.timestamp())); } doUnlock = false; this.writeLock.unlock(); @@ -2249,7 +2249,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi .term(this.currTerm); if (request.timestamp() != null) { - rb.timestampLong(clock.update(request.timestamp()).longValue()); + rb.timestamp(clock.update(request.timestamp())); } return rb.build(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java index 6a760a95efa..0028b062533 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java @@ -18,7 +18,6 @@ import static com.codahale.metrics.MetricRegistry.name; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; import com.codahale.metrics.Gauge; @@ -744,7 +743,7 @@ private void sendEmptyEntries(final boolean isHeartbeat) { private void sendEmptyEntries(final boolean isHeartbeat, final RpcResponseClosure heartBeatClosure) { final AppendEntriesRequestBuilder rb = raftOptions.getRaftMessagesFactory().appendEntriesRequest(); - rb.timestampLong(hybridTimestampToLong(options.getNode().clockNow())); + rb.timestamp(options.getNode().clockNow()); if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) { // id is unlock in installSnapshot installSnapshot(); @@ -1672,7 +1671,7 @@ private boolean sendEntries(final long nextSendingIndex) { RecycleUtil.recycle(byteBufList); } - rb.timestampLong(hybridTimestampToLong(this.options.getNode().clockNow())); + rb.timestamp(this.options.getNode().clockNow()); final AppendEntriesRequest request = rb.build(); if (LOG.isDebugEnabled()) { diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java index ca37da3ce5f..0028324c5b5 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java @@ -179,11 +179,7 @@ public interface AppendEntriesRequest extends Message { @Nullable ByteBuffer data(); - long timestampLong(); - - default HybridTimestamp timestamp() { - return hybridTimestamp(timestampLong()); - } + @Nullable HybridTimestamp timestamp(); } @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_RESPONSE) @@ -194,11 +190,7 @@ public interface AppendEntriesResponse extends ErrorResponse { long lastLogIndex(); - long timestampLong(); - - default @Nullable HybridTimestamp timestamp() { - return nullableHybridTimestamp(timestampLong()); - } + @Nullable HybridTimestamp timestamp(); } @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.GET_FILE_REQUEST) diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 7b8f0a72e3c..90d99aac95e 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -1055,7 +1055,7 @@ private void sendReplicaUnavailableErrorResponse( groupId, clusterNetSvc.topologyService().localMember()) ) - .timestampLong(clockService.updateClock(requestTimestamp).longValue()) + .timestamp(clockService.updateClock(requestTimestamp)) .build(), correlationId); } else { @@ -1093,7 +1093,7 @@ private NetworkMessage prepareReplicaResponse(boolean sendTimestamp, Object resu return REPLICA_MESSAGES_FACTORY .timestampAwareReplicaResponse() .result(result) - .timestampLong(clockService.nowLong()) + .timestamp(clockService.now()) .build(); } else { return REPLICA_MESSAGES_FACTORY @@ -1111,7 +1111,7 @@ private NetworkMessage prepareReplicaErrorResponse(boolean sendTimestamp, Throwa return REPLICA_MESSAGES_FACTORY .errorTimestampAwareReplicaResponse() .throwable(ex) - .timestampLong(clockService.nowLong()) + .timestamp(clockService.now()) .build(); } else { return REPLICA_MESSAGES_FACTORY diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java index 1622c239016..182c6bf6e1f 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimePropagatingCommand.java @@ -17,33 +17,23 @@ package org.apache.ignite.internal.replicator.command; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.WithSetter; import org.apache.ignite.internal.raft.WriteCommand; +import org.jetbrains.annotations.Nullable; /** * Common interface for commands carrying safe time. */ public interface SafeTimePropagatingCommand extends WriteCommand { - /** - * Returns safe time. - */ + /** Safe time. */ @WithSetter - long safeTimeLong(); + @Nullable HybridTimestamp safeTime(); /** * Setter for the safeTime field. */ - default void safeTimeLong(long safeTime) { + default void safeTime(HybridTimestamp safeTime) { // No-op. } - - /** - * Returns safe time. - */ - default HybridTimestamp safeTime() { - return hybridTimestamp(safeTimeLong()); - } } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java index 95be9fed363..8847a42b831 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/TimestampAware.java @@ -17,28 +17,14 @@ package org.apache.ignite.internal.replicator.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.NetworkMessage; +import org.jetbrains.annotations.Nullable; /** * Message with a timestamp to adjust a hybrid logical clock. */ public interface TimestampAware extends NetworkMessage { - /** - * Gets a hybrid timestamp. - * - * @return Gets a hybrid timestamp. - */ - long timestampLong(); - - /** - * Gets a hybrid timestamp. - * - * @return Gets a hybrid timestamp. - */ - default HybridTimestamp timestamp() { - return hybridTimestamp(timestampLong()); - } + /** Hybrid timestamp. */ + @Nullable HybridTimestamp timestamp(); } diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java index c836cd8e53d..889d251d261 100644 --- a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java +++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java @@ -172,8 +172,8 @@ private CompletableFuture sendLeaseGranted( ) { PlacementDriverReplicaMessage msg = MSG_FACTORY.leaseGrantedMessage() .groupId(GRP_ID) - .leaseStartTimeLong(leaseStartTime.longValue()) - .leaseExpirationTimeLong(leaseExpirationTime.longValue()) + .leaseStartTime(leaseStartTime) + .leaseExpirationTime(leaseExpirationTime) .force(force) .build(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java index 0852a77a857..c105e6f219c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java @@ -92,7 +92,7 @@ public CompletableFuture sendBatch(String nodeName, UUID qryId, long fragm .batchId(batchId) .last(last) .rows(rows) - .timestampLong(clockService.nowLong()) + .timestamp(clockService.now()) .build() ); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index c314582b686..81b8213119c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -833,8 +833,8 @@ private CompletableFuture sendFragment( .txAttributes(txAttributes) .catalogVersion(catalogVersion) .timeZoneId(ctx.timeZoneId().getId()) - .operationTimeLong(ctx.operationTime().longValue()) - .timestampLong(clockService.nowLong()) + .operationTime(ctx.operationTime()) + .timestamp(clockService.now()) .build(); return messageService.send(targetNodeName, request); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java index 5f10205d598..5689c1e6e00 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java @@ -154,7 +154,7 @@ public CompletableFuture upsertAll( .transactionId(txAttributes.id()) .enlistmentConsistencyToken(nodeWithConsistencyToken.enlistmentConsistencyToken()) .requestTypeInt(RW_UPSERT_ALL.ordinal()) - .timestampLong(clockService.nowLong()) + .timestamp(clockService.now()) .skipDelayedAck(true) .coordinatorId(txAttributes.coordinatorId()) .build(); @@ -259,7 +259,7 @@ public CompletableFuture insertAll( .transactionId(txAttributes.id()) .enlistmentConsistencyToken(nodeWithConsistencyToken.enlistmentConsistencyToken()) .requestTypeInt(RW_INSERT_ALL.ordinal()) - .timestampLong(clockService.nowLong()) + .timestamp(clockService.now()) .skipDelayedAck(true) .coordinatorId(txAttributes.coordinatorId()) .build(); @@ -328,7 +328,7 @@ public CompletableFuture deleteAll( .transactionId(txAttributes.id()) .enlistmentConsistencyToken(nodeWithConsistencyToken.enlistmentConsistencyToken()) .requestTypeInt(RW_DELETE_ALL.ordinal()) - .timestampLong(clockService.nowLong()) + .timestamp(clockService.now()) .skipDelayedAck(true) .coordinatorId(txAttributes.coordinatorId()) .build(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java index 4d213602b0b..862885fe686 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.sql.engine.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; @@ -65,15 +63,6 @@ public interface QueryStartRequest extends TimestampAware, ExecutionContextAware */ String timeZoneId(); - /** - * Time of the operation. - */ - long operationTimeLong(); - - /** - * Time of the operation as {@link HybridTimestamp}. - */ - default HybridTimestamp operationTime() { - return hybridTimestamp(operationTimeLong()); - } + /** Time of the operation. */ + HybridTimestamp operationTime(); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java index 4cae51529a0..cf81093dec7 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java @@ -268,7 +268,7 @@ private ReadWriteSingleRowReplicaRequest getRequest(TablePartitionId tablePartit .groupId(toTablePartitionIdMessage(replicaMessageFactory, tablePartitionId)) .transactionId(TestTransactionIds.newTransactionId()) .commitPartitionId(tablePartitionId()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .schemaVersion(binaryRow.schemaVersion()) .binaryTuple(binaryRow.tupleSlice()) .requestTypeInt(RW_GET.ordinal()) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java index 332483c06ff..bf8792842f1 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.hlc.TestClockService; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.lang.SafeTimeReorderException; @@ -111,19 +112,19 @@ public void after() throws Exception { } } - private static long calculateSafeTime(ClockService clockService) { - return clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue(); + private static HybridTimestamp calculateSafeTime(ClockService clockService) { + return clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()); } private static void sendSafeTimeSyncCommand( RaftGroupService raftClient, - long safeTime, + HybridTimestamp safeTime, boolean expectSafeTimeReorderException ) { CompletableFuture safeTimeCommandFuture = raftClient.run( REPLICA_MESSAGES_FACTORY .safeTimeSyncCommand() - .safeTimeLong(safeTime) + .safeTime(safeTime) .build() ); @@ -159,7 +160,7 @@ public void testSafeTimeReorderingOnLeaderReElection() throws Exception { assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); - long firstSafeTime = calculateSafeTime(someNode.clockService); + HybridTimestamp firstSafeTime = calculateSafeTime(someNode.clockService); // Send command with safe time X. sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); @@ -181,7 +182,7 @@ public void testSafeTimeReorderingOnLeaderReElection() throws Exception { RaftGroupService anotherClient = aliveNode.get().raftClient; // Send command with safe time less than previously applied to the new leader and verify that SafeTimeReorderException is thrown. - sendSafeTimeSyncCommand(anotherClient, firstSafeTime - 1, true); + sendSafeTimeSyncCommand(anotherClient, firstSafeTime.subtractPhysicalTime(1), true); sendSafeTimeSyncCommand(anotherClient, calculateSafeTime(aliveNode.get().clockService), false); } @@ -222,7 +223,7 @@ public void testSafeTimeReorderingOnLeaderRestart() throws Exception { assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); - long firstSafeTime = calculateSafeTime(someNode.clockService); + HybridTimestamp firstSafeTime = calculateSafeTime(someNode.clockService); // Send command with safe time X. sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); @@ -238,7 +239,7 @@ public void testSafeTimeReorderingOnLeaderRestart() throws Exception { // Send command with safe time less than previously applied to the leader before the restart // and verify that SafeTimeReorderException is thrown. - sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime - 1, true); + sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime.subtractPhysicalTime(1), true); sendSafeTimeSyncCommand(someNode.raftClient, calculateSafeTime(someNode.clockService), false); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 5529df73cd4..fcffbd15607 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -23,7 +23,6 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toMap; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RO_GET; import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RO_GET_ALL; @@ -713,31 +712,31 @@ private CompletableFuture processOperationRequest( if (request instanceof ReadWriteSingleRowReplicaRequest) { var req = (ReadWriteSingleRowReplicaRequest) request; - var opId = new OperationId(senderId, req.timestampLong()); + var opId = new OperationId(senderId, req.timestamp().longValue()); return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processSingleEntryAction(req, lst)); } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) { var req = (ReadWriteSingleRowPkReplicaRequest) request; - var opId = new OperationId(senderId, req.timestampLong()); + var opId = new OperationId(senderId, req.timestamp().longValue()); return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processSingleEntryAction(req, lst)); } else if (request instanceof ReadWriteMultiRowReplicaRequest) { var req = (ReadWriteMultiRowReplicaRequest) request; - var opId = new OperationId(senderId, req.timestampLong()); + var opId = new OperationId(senderId, req.timestamp().longValue()); return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processMultiEntryAction(req, lst)); } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) { var req = (ReadWriteMultiRowPkReplicaRequest) request; - var opId = new OperationId(senderId, req.timestampLong()); + var opId = new OperationId(senderId, req.timestamp().longValue()); return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processMultiEntryAction(req, lst)); } else if (request instanceof ReadWriteSwapRowReplicaRequest) { var req = (ReadWriteSwapRowReplicaRequest) request; - var opId = new OperationId(senderId, req.timestampLong()); + var opId = new OperationId(senderId, req.timestamp().longValue()); return appendTxCommand(req.transactionId(), opId, req.requestType(), req.full(), () -> processTwoEntriesAction(req, lst)); } else if (request instanceof ReadWriteScanRetrieveBatchReplicaRequest) { @@ -755,7 +754,7 @@ private CompletableFuture processOperationRequest( null )); - var opId = new OperationId(senderId, req.timestampLong()); + var opId = new OperationId(senderId, req.timestamp().longValue()); // Implicit RW scan can be committed locally on a last batch or error. return appendTxCommand(req.transactionId(), opId, RW_SCAN, false, () -> processScanRetrieveBatchAction(req)) @@ -1149,7 +1148,7 @@ private CompletableFuture processReplicaSafeTimeSyncRequest(ReplicaSafeTim CompletableFuture resultFuture = new CompletableFuture<>(); applyCmdWithRetryOnSafeTimeReorderException( - REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTimeLong(clockService.nowLong()).build(), + REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTime(clockService.now()).build(), resultFuture ); @@ -1804,12 +1803,12 @@ private CompletableFuture applyFinishCommand( FinishTxCommandBuilder finishTxCmdBldr = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand() .txId(transactionId) .commit(commit) - .safeTimeLong(clockService.nowLong()) + .safeTime(clockService.now()) .requiredCatalogVersion(catalogVersion) .partitionIds(partitionIds); if (commit) { - finishTxCmdBldr.commitTimestampLong(commitTimestamp.longValue()); + finishTxCmdBldr.commitTimestamp(commitTimestamp); } CompletableFuture resultFuture = new CompletableFuture<>(); @@ -1847,7 +1846,6 @@ private CompletableFuture processWriteIntentSwitchAction(WriteInt request.txId(), request.commit(), request.commitTimestamp(), - request.commitTimestampLong(), catalogVersion ); @@ -1903,14 +1901,13 @@ private CompletableFuture applyWriteIntentSwitc UUID transactionId, boolean commit, HybridTimestamp commitTimestamp, - long commitTimestampLong, int catalogVersion ) { WriteIntentSwitchCommand wiSwitchCmd = PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(transactionId) .commit(commit) - .commitTimestampLong(commitTimestampLong) - .safeTimeLong(clockService.nowLong()) + .commitTimestamp(commitTimestamp) + .safeTime(clockService.now()) .requiredCatalogVersion(catalogVersion) .build(); @@ -2291,7 +2288,7 @@ private CompletableFuture processMultiEntryAction(ReadWriteMultiR if (lockedRowId != null) { rowIdsToDelete.put(lockedRowId.uuid(), PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage() - .timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRowId.uuid()))) + .timestamp(lastCommitTimes.get(lockedRowId.uuid())) .build()); result.add(new NullBinaryRow()); @@ -2469,7 +2466,7 @@ private CompletableFuture processMultiEntryAction(ReadWriteMultiR TimedBinaryRowMessageBuilder timedBinaryRowMessageBuilder = PARTITION_REPLICATION_MESSAGES_FACTORY .timedBinaryRowMessage() - .timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRow.uuid()))); + .timestamp(lastCommitTimes.get(lockedRow.uuid())); if (deleted == null || !deleted.get(i)) { timedBinaryRowMessageBuilder.binaryRowMessage(binaryRowMessage(searchRows.get(i))); @@ -2592,7 +2589,7 @@ private CompletableFuture processMultiEntryAction(ReadWriteMultiRowPkReplicaR if (lockedRowId != null) { rowIdsToDelete.put(lockedRowId.uuid(), PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage() - .timestamp(hybridTimestampToLong(lastCommitTimes.get(lockedRowId.uuid()))) + .timestamp(lastCommitTimes.get(lockedRowId.uuid())) .build()); rows.add(lockedRowId); @@ -2705,7 +2702,7 @@ private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, Comple SafeTimePropagatingCommand clonedSafeTimePropagatingCommand = (SafeTimePropagatingCommand) safeTimePropagatingCommand.clone(); - clonedSafeTimePropagatingCommand.safeTimeLong(safeTimeForRetry.longValue()); + clonedSafeTimePropagatingCommand.safeTime(safeTimeForRetry); applyCmdWithRetryOnSafeTimeReorderException(clonedSafeTimePropagatingCommand, resultFuture); } else { @@ -3843,7 +3840,7 @@ private UpdateCommand updateCommand( .rowUuid(rowUuid) .txId(txId) .full(full) - .safeTimeLong(safeTimeTimestamp.longValue()) + .safeTime(safeTimeTimestamp) .txCoordinatorId(txCoordinatorId) .requiredCatalogVersion(catalogVersion) .leaseStartTime(leaseStartTime); @@ -3852,7 +3849,7 @@ private UpdateCommand updateCommand( TimedBinaryRowMessageBuilder rowMsgBldr = PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage(); if (lastCommitTimestamp != null) { - rowMsgBldr.timestamp(lastCommitTimestamp.longValue()); + rowMsgBldr.timestamp(lastCommitTimestamp); } if (row != null) { @@ -3886,7 +3883,7 @@ private UpdateAllCommand updateAllCommand( .tablePartitionId(commitPartitionId) .messageRowsToUpdate(rowsToUpdate) .txId(transactionId) - .safeTimeLong(safeTimeTimestamp.longValue()) + .safeTime(safeTimeTimestamp) .full(full) .txCoordinatorId(txCoordinatorId) .requiredCatalogVersion(catalogVersion) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java index 97f82b19d3f..d8cbce1d905 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java @@ -117,7 +117,7 @@ public void start() { .thenAccept(txStateMeta -> { NetworkMessage response = TX_MESSAGES_FACTORY.txStateResponse() .txStateMeta(toTransactionMetaMessage(txStateMeta)) - .timestampLong(clockService.nowLong()) + .timestamp(clockService.now()) .build(); messagingService.respond(sender, response, correlationId); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index d905d38b67b..bbcdced0f3f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -549,7 +549,7 @@ private CompletableFuture> enlistCursorInTx( Function mapFunc = (enlistmentConsistencyToken) -> TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() .groupId(serializeTablePartitionId(partGroupId)) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .transactionId(tx.id()) .scanId(scanId) .indexToUse(indexId) @@ -894,7 +894,7 @@ public CompletableFuture get(BinaryRowEx keyRow, @Nullable InternalTr .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_GET.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(false) .coordinatorId(txo.coordinatorId()) .build(), @@ -916,7 +916,7 @@ public CompletableFuture get( .schemaVersion(keyRow.schemaVersion()) .primaryKey(keyRow.tupleSlice()) .requestTypeInt(RO_GET.ordinal()) - .readTimestampLong(readTimestamp.longValue()) + .readTimestamp(readTimestamp) .build() ); } @@ -997,7 +997,7 @@ public CompletableFuture> getAll( .schemaVersion(partitionRowBatch.getValue().requestedRows.get(0).schemaVersion()) .primaryKeys(serializeBinaryTuples(partitionRowBatch.getValue().requestedRows)) .requestTypeInt(RO_GET_ALL.ordinal()) - .readTimestampLong(readTimestamp.longValue()) + .readTimestamp(readTimestamp) .build(); partitionRowBatch.getValue().resultFuture = replicaSvc.invoke(recipientNode, request); @@ -1024,7 +1024,7 @@ private ReadWriteMultiRowPkReplicaRequest readWriteMultiRowPkReplicaRequest( .transactionId(tx.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(requestType.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(full) .coordinatorId(tx.coordinatorId()) .build(); @@ -1092,7 +1092,7 @@ public CompletableFuture upsert(BinaryRowEx row, @Nullable InternalTransac .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_UPSERT.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(tx == null) .coordinatorId(txo.coordinatorId()) .build(), @@ -1176,7 +1176,7 @@ public CompletableFuture getAndUpsert(BinaryRowEx row, InternalTransa .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_GET_AND_UPSERT.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(tx == null) .coordinatorId(txo.coordinatorId()) .build(), @@ -1198,7 +1198,7 @@ public CompletableFuture insert(BinaryRowEx row, InternalTransaction tx .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_INSERT.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(tx == null) .coordinatorId(txo.coordinatorId()) .build(), @@ -1255,7 +1255,7 @@ private ReadWriteMultiRowReplicaRequest readWriteMultiRowReplicaRequest( .transactionId(tx.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(requestType.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(full) .coordinatorId(tx.coordinatorId()) .build(); @@ -1275,7 +1275,7 @@ public CompletableFuture replace(BinaryRowEx row, InternalTransaction t .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_REPLACE_IF_EXIST.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(tx == null) .coordinatorId(txo.coordinatorId()) .build(), @@ -1301,7 +1301,7 @@ public CompletableFuture replace(BinaryRowEx oldRow, BinaryRowEx newRow .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_REPLACE.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(tx == null) .coordinatorId(txo.coordinatorId()) .build(), @@ -1323,7 +1323,7 @@ public CompletableFuture getAndReplace(BinaryRowEx row, InternalTrans .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_GET_AND_REPLACE.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(tx == null) .coordinatorId(txo.coordinatorId()) .build(), @@ -1345,7 +1345,7 @@ public CompletableFuture delete(BinaryRowEx keyRow, InternalTransaction .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_DELETE.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(tx == null) .coordinatorId(txo.coordinatorId()) .build(), @@ -1367,7 +1367,7 @@ public CompletableFuture deleteExact(BinaryRowEx oldRow, InternalTransa .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_DELETE_EXACT.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(tx == null) .coordinatorId(txo.coordinatorId()) .build(), @@ -1389,7 +1389,7 @@ public CompletableFuture getAndDelete(BinaryRowEx row, InternalTransa .transactionId(txo.id()) .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestTypeInt(RW_GET_AND_DELETE.ordinal()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .full(tx == null) .coordinatorId(txo.coordinatorId()) .build(), @@ -1582,7 +1582,7 @@ private Publisher readOnlyScan( (scanId, batchSize) -> { ReadOnlyScanRetrieveBatchReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(serializeTablePartitionId(tablePartitionId)) - .readTimestampLong(readTimestamp.longValue()) + .readTimestamp(readTimestamp) .transactionId(txId) .scanId(scanId) .batchSize(batchSize) @@ -1691,7 +1691,7 @@ private Publisher readWriteScan( (scanId, batchSize) -> { ReadWriteScanRetrieveBatchReplicaRequest request = TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() .groupId(serializeTablePartitionId(tablePartitionId)) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .transactionId(txId) .scanId(scanId) .indexToUse(indexId) diff --git a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/UpdateCommandsMarshalingMicroBenchmark.java b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/UpdateCommandsMarshalingMicroBenchmark.java index 8e4ba966c16..cbd6f4caa56 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/UpdateCommandsMarshalingMicroBenchmark.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/UpdateCommandsMarshalingMicroBenchmark.java @@ -17,11 +17,14 @@ package org.apache.ignite.internal.benchmarks; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; + import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.MessageSerializationRegistryImpl; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; @@ -93,7 +96,7 @@ public void setUp() { byte[] array = new byte[payloadSize]; UUID uuid = UUID.randomUUID(); - long timestamp = System.currentTimeMillis(); + HybridTimestamp timestamp = hybridTimestamp(System.currentTimeMillis()); TimedBinaryRowMessage timedBinaryRowMessage = PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage() .timestamp(timestamp) @@ -110,8 +113,8 @@ public void setUp() { } message = PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommand() .txId(uuid) - .leaseStartTime(timestamp) - .safeTimeLong(timestamp) + .leaseStartTime(timestamp.longValue()) + .safeTime(timestamp) .requiredCatalogVersion(10_000) .tablePartitionId(REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage() .partitionId(2048) @@ -123,8 +126,8 @@ public void setUp() { } else { message = PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommand() .txId(uuid) - .leaseStartTime(timestamp) - .safeTimeLong(timestamp) + .leaseStartTime(timestamp.longValue()) + .safeTime(timestamp) .rowUuid(uuid) .requiredCatalogVersion(10_000) .tablePartitionId(REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage() diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java index 35e7ca967bd..b85cca5e8bc 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.table.distributed.command; -import static org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP; -import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -32,6 +30,7 @@ import java.util.UUID; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand; @@ -138,7 +137,7 @@ public void testUpdateAllCommand() throws Exception { TestTransactionIds.newTransactionId(), PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage() .binaryRowMessage(binaryRowMessage(i)) - .timestamp(i % 2 == 0 ? clock.nowLong() : NULL_HYBRID_TIMESTAMP) + .timestamp(i % 2 == 0 ? clock.now() : null) .build() ); } @@ -166,8 +165,8 @@ public void testUpdateAllCommand() throws Exception { assertEquals(val, readVal); - var readTs = readCmd.rowsToUpdate().get(entry.getKey()).commitTimestamp(); - var ts = nullableHybridTimestamp(entry.getValue().timestamp()); + HybridTimestamp readTs = readCmd.rowsToUpdate().get(entry.getKey()).commitTimestamp(); + HybridTimestamp ts = entry.getValue().timestamp(); assertEquals(ts, readTs); } @@ -210,7 +209,7 @@ public void testTxCleanupCommand() throws Exception { WriteIntentSwitchCommand cmd = PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(UUID.randomUUID()) .commit(true) - .commitTimestampLong(clock.nowLong()) + .commitTimestamp(clock.now()) .build(); WriteIntentSwitchCommand readCmd = copyCommand(cmd); @@ -235,7 +234,7 @@ public void testFinishTxCommand() throws Exception { FinishTxCommand cmd = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand() .txId(UUID.randomUUID()) .commit(true) - .commitTimestampLong(clock.nowLong()) + .commitTimestamp(clock.now()) .partitionIds(grps) .build(); @@ -256,7 +255,7 @@ private T copyCommand(T cmd) { .txId(finishTxCommand.txId()) .commit(finishTxCommand.commit()) .partitionIds(finishTxCommand.partitionIds()) - .commitTimestampLong(finishTxCommand.commitTimestampLong()) + .commitTimestamp(finishTxCommand.commitTimestamp()) .build(); } else if (cmd instanceof WriteIntentSwitchCommand) { WriteIntentSwitchCommand writeIntentSwitchCommand = (WriteIntentSwitchCommand) cmd; @@ -264,7 +263,7 @@ private T copyCommand(T cmd) { return (T) PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(writeIntentSwitchCommand.txId()) .commit(writeIntentSwitchCommand.commit()) - .commitTimestampLong(writeIntentSwitchCommand.commitTimestampLong()) + .commitTimestamp(writeIntentSwitchCommand.commitTimestamp()) .build(); } else if (cmd instanceof UpdateCommand) { UpdateCommand updateCommand = (UpdateCommand) cmd; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index 8b53ba5400a..9bf90b13be6 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -517,7 +517,7 @@ void updatesLastAppliedForUpdateCommands(boolean stale) { .tablePartitionId(defaultPartitionIdMessage()) .txCoordinatorId(UUID.randomUUID().toString()) .txId(TestTransactionIds.newTransactionId()) - .safeTimeLong(staleOrFreshSafeTime(stale)) + .safeTime(staleOrFreshSafeTime(stale)) .build(); commandListener.onWrite(List.of( @@ -527,8 +527,8 @@ void updatesLastAppliedForUpdateCommands(boolean stale) { verify(mvPartitionStorage).lastApplied(3, 2); } - private long staleOrFreshSafeTime(boolean stale) { - return stale ? safeTimeTracker.current().subtractPhysicalTime(1).longValue() : hybridClock.nowLong(); + private HybridTimestamp staleOrFreshSafeTime(boolean stale) { + return stale ? safeTimeTracker.current().subtractPhysicalTime(1) : hybridClock.now(); } @ParameterizedTest @@ -544,7 +544,7 @@ void updatesLastAppliedForUpdateAllCommands(boolean stale) { .tablePartitionId(defaultPartitionIdMessage()) .txCoordinatorId(UUID.randomUUID().toString()) .txId(TestTransactionIds.newTransactionId()) - .safeTimeLong(staleOrFreshSafeTime(stale)) + .safeTime(staleOrFreshSafeTime(stale)) .build(); commandListener.onWrite(List.of( @@ -561,7 +561,7 @@ void updatesLastAppliedForFinishTxCommands(boolean stale) { FinishTxCommand command = PARTITION_REPLICATION_MESSAGES_FACTORY.finishTxCommand() .txId(TestTransactionIds.newTransactionId()) - .safeTimeLong(staleOrFreshSafeTime(stale)) + .safeTime(staleOrFreshSafeTime(stale)) .partitionIds(List.of()) .build(); @@ -577,7 +577,7 @@ void updatesLastAppliedForFinishTxCommands(boolean stale) { void locksOnCommandApplication() { SafeTimeSyncCommandBuilder safeTimeSyncCommand = new ReplicaMessagesFactory() .safeTimeSyncCommand() - .safeTimeLong(hybridClock.nowLong()); + .safeTime(hybridClock.now()); commandListener.onWrite(List.of( writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(), commandClosureResultCaptor) @@ -702,7 +702,7 @@ void updatesLastAppliedForWriteIntentSwitchCommands(boolean stale) { WriteIntentSwitchCommand command = PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(TestTransactionIds.newTransactionId()) - .safeTimeLong(staleOrFreshSafeTime(stale)) + .safeTime(staleOrFreshSafeTime(stale)) .build(); commandListener.onWrite(List.of( @@ -719,7 +719,7 @@ void updatesLastAppliedForSafeTimeSyncCommands(boolean stale) { SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory() .safeTimeSyncCommand() - .safeTimeLong(staleOrFreshSafeTime(stale)) + .safeTime(staleOrFreshSafeTime(stale)) .build(); commandListener.onWrite(List.of( @@ -813,15 +813,15 @@ private void insertAll() { .tablePartitionId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartId)) .messageRowsToUpdate(rows) .txId(txId) - .safeTimeLong(hybridClock.nowLong()) + .safeTime(hybridClock.now()) .txCoordinatorId(UUID.randomUUID().toString()) .build()); invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(txId) .commit(true) - .commitTimestampLong(commitTimestamp.longValue()) - .safeTimeLong(hybridClock.nowLong()) + .commitTimestamp(commitTimestamp) + .safeTime(hybridClock.now()) .build()); } @@ -851,15 +851,15 @@ private void updateAll(Function keyValueMapper) { .tablePartitionId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartId)) .messageRowsToUpdate(rows) .txId(txId) - .safeTimeLong(hybridClock.nowLong()) + .safeTime(hybridClock.now()) .txCoordinatorId(UUID.randomUUID().toString()) .build()); invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(txId) .commit(true) - .commitTimestampLong(commitTimestamp.longValue()) - .safeTimeLong(hybridClock.nowLong()) + .commitTimestamp(commitTimestamp) + .safeTime(hybridClock.now()) .build()); } @@ -884,15 +884,15 @@ private void deleteAll() { .tablePartitionId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartId)) .messageRowsToUpdate(keyRows) .txId(txId) - .safeTimeLong(hybridClock.nowLong()) + .safeTime(hybridClock.now()) .txCoordinatorId(UUID.randomUUID().toString()) .build()); invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(txId) .commit(true) - .commitTimestampLong(commitTimestamp.longValue()) - .safeTimeLong(hybridClock.nowLong()) + .commitTimestamp(commitTimestamp) + .safeTime(hybridClock.now()) .build()); } @@ -921,7 +921,7 @@ private void update(Function keyValueMapper) { .binaryRowMessage(row) .build()) .txId(txId) - .safeTimeLong(hybridClock.nowLong()) + .safeTime(hybridClock.now()) .txCoordinatorId(UUID.randomUUID().toString()) .build()); @@ -937,8 +937,8 @@ private void update(Function keyValueMapper) { txIds.forEach(txId -> invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(txId) .commit(true) - .commitTimestampLong(commitTimestamp.longValue()) - .safeTimeLong(hybridClock.nowLong()) + .commitTimestamp(commitTimestamp) + .safeTime(hybridClock.now()) .build())); } @@ -968,7 +968,7 @@ private void delete() { .tablePartitionId(defaultPartitionIdMessage()) .rowUuid(readResult.rowId().uuid()) .txId(txId) - .safeTimeLong(hybridClock.nowLong()) + .safeTime(hybridClock.now()) .txCoordinatorId(UUID.randomUUID().toString()) .build()); @@ -984,8 +984,8 @@ private void delete() { txIds.forEach(txId -> invokeBatchedCommand(PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(txId) .commit(true) - .commitTimestampLong(commitTimestamp.longValue()) - .safeTimeLong(hybridClock.nowLong()) + .commitTimestamp(commitTimestamp) + .safeTime(hybridClock.now()) .build())); } @@ -1041,7 +1041,7 @@ private void insert() { .binaryRowMessage(getTestRow(i, i)) .build()) .txId(txId) - .safeTimeLong(hybridClock.nowLong()) + .safeTime(hybridClock.now()) .txCoordinatorId(UUID.randomUUID().toString()) .build()); @@ -1052,14 +1052,14 @@ private void insert() { }).when(clo).result(any()); })); - long commitTimestamp = hybridClock.nowLong(); + HybridTimestamp commitTimestamp = hybridClock.now(); txIds.forEach(txId -> invokeBatchedCommand( PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommand() .txId(txId) .commit(true) - .commitTimestampLong(commitTimestamp) - .safeTimeLong(hybridClock.nowLong()) + .commitTimestamp(commitTimestamp) + .safeTime(hybridClock.now()) .build())); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java index b8e96dde592..d344ceac121 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java @@ -344,6 +344,7 @@ void testReadWriteSingle(ReadWriteTestArg arg) { .primaryKey(testPk.tupleSlice()) .requestTypeInt(arg.type.ordinal()) .coordinatorId(localNode.id()) + .timestamp(CLOCK.now()) .build(); break; @@ -363,6 +364,7 @@ void testReadWriteSingle(ReadWriteTestArg arg) { .binaryTuple(testBinaryRow.tupleSlice()) .requestTypeInt(arg.type.ordinal()) .coordinatorId(localNode.id()) + .timestamp(CLOCK.now()) .build(); break; @@ -430,6 +432,7 @@ void testReadWriteMulti(ReadWriteTestArg arg) { .primaryKeys(pks.stream().map(BinaryRow::tupleSlice).collect(toList())) .requestTypeInt(arg.type.ordinal()) .coordinatorId(localNode.id()) + .timestamp(CLOCK.now()) .build(); break; @@ -446,6 +449,7 @@ void testReadWriteMulti(ReadWriteTestArg arg) { .binaryTuples(binaryRowsToBuffers(rows)) .requestTypeInt(arg.type.ordinal()) .coordinatorId(localNode.id()) + .timestamp(CLOCK.now()) .build(); break; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 8c0947fe1d6..2d1cd8018cb 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -25,7 +25,6 @@ import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RO_GET; import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RO_GET_ALL; import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_DELETE; @@ -802,7 +801,7 @@ private CompletableFuture doReadOnlySingleGet(BinaryRow pk) { private CompletableFuture doReadOnlySingleGet(BinaryRow pk, HybridTimestamp readTimestamp) { ReadOnlySingleRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) - .readTimestampLong(readTimestamp.longValue()) + .readTimestamp(readTimestamp) .schemaVersion(pk.schemaVersion()) .primaryKey(pk.tupleSlice()) .requestTypeInt(RO_GET.ordinal()) @@ -923,13 +922,14 @@ public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exc TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(scanTxId) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .scanId(1L) .indexToUse(sortedIndexId) .batchSize(4) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(), localNode.id()); List rows = (List) fut.get(1, TimeUnit.SECONDS).result(); @@ -941,13 +941,14 @@ public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exc fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(scanTxId) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .scanId(1L) .indexToUse(sortedIndexId) .batchSize(4) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(), localNode.id()); rows = (List) fut.get(1, TimeUnit.SECONDS).result(); @@ -959,7 +960,7 @@ public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exc fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(newTxId()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .scanId(2L) .indexToUse(sortedIndexId) @@ -969,6 +970,7 @@ public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exc .batchSize(5) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(), localNode.id()); rows = (List) fut.get(1, TimeUnit.SECONDS).result(); @@ -980,7 +982,7 @@ public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exc fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(newTxId()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .scanId(2L) .indexToUse(sortedIndexId) @@ -988,6 +990,7 @@ public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exc .batchSize(5) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(), localNode.id()); rows = (List) fut.get(1, TimeUnit.SECONDS).result(); @@ -999,7 +1002,7 @@ public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exc fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(newTxId()) - .timestampLong(clock.nowLong()) + .timestamp(clock.now()) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .scanId(2L) .indexToUse(sortedIndexId) @@ -1007,6 +1010,7 @@ public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exc .batchSize(5) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(), localNode.id()); rows = (List) fut.get(1, TimeUnit.SECONDS).result(); @@ -1041,7 +1045,7 @@ public void testReadOnlyScanRetrieveBatchReplicaRequestSortedIndex() throws Exce TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(scanTxId) - .readTimestampLong(clock.nowLong()) + .readTimestamp(clock.now()) .scanId(1L) .indexToUse(sortedIndexId) .batchSize(4) @@ -1057,7 +1061,7 @@ public void testReadOnlyScanRetrieveBatchReplicaRequestSortedIndex() throws Exce fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(scanTxId) - .readTimestampLong(clock.nowLong()) + .readTimestamp(clock.now()) .scanId(1L) .indexToUse(sortedIndexId) .batchSize(4) @@ -1073,7 +1077,7 @@ public void testReadOnlyScanRetrieveBatchReplicaRequestSortedIndex() throws Exce fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(newTxId()) - .readTimestampLong(clock.nowLong()) + .readTimestamp(clock.now()) .scanId(2L) .indexToUse(sortedIndexId) .lowerBoundPrefix(toIndexBound(1)) @@ -1092,7 +1096,7 @@ public void testReadOnlyScanRetrieveBatchReplicaRequestSortedIndex() throws Exce fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(newTxId()) - .readTimestampLong(clock.nowLong()) + .readTimestamp(clock.now()) .scanId(2L) .indexToUse(sortedIndexId) .lowerBoundPrefix(toIndexBound(5)) @@ -1109,7 +1113,7 @@ public void testReadOnlyScanRetrieveBatchReplicaRequestSortedIndex() throws Exce fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(newTxId()) - .readTimestampLong(clock.nowLong()) + .readTimestamp(clock.now()) .scanId(2L) .indexToUse(sortedIndexId) .exactKey(toIndexKey(0)) @@ -1149,7 +1153,7 @@ public void testReadOnlyScanRetrieveBatchReplicaRequstHashIndex() throws Excepti TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(scanTxId) - .readTimestampLong(clock.nowLong()) + .readTimestamp(clock.now()) .scanId(1L) .indexToUse(hashIndexId) .exactKey(toIndexKey(0)) @@ -1166,7 +1170,7 @@ public void testReadOnlyScanRetrieveBatchReplicaRequstHashIndex() throws Excepti fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(scanTxId) - .readTimestampLong(clock.nowLong()) + .readTimestamp(clock.now()) .scanId(1L) .indexToUse(hashIndexId) .exactKey(toIndexKey(0)) @@ -1183,7 +1187,7 @@ public void testReadOnlyScanRetrieveBatchReplicaRequstHashIndex() throws Excepti fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(newTxId()) - .readTimestampLong(clock.nowLong()) + .readTimestamp(clock.now()) .scanId(2L) .indexToUse(hashIndexId) .exactKey(toIndexKey(5)) @@ -1200,7 +1204,7 @@ public void testReadOnlyScanRetrieveBatchReplicaRequstHashIndex() throws Excepti fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .transactionId(newTxId()) - .readTimestampLong(clock.nowLong()) + .readTimestamp(clock.now()) .scanId(2L) .indexToUse(hashIndexId) .exactKey(toIndexKey(1)) @@ -1333,6 +1337,7 @@ private CompletableFuture doSingleRowRequest(UUID txId, BinaryRow binaryRow, .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) .full(full) + .timestamp(clock.now()) .build(), localNode.id() ); @@ -1353,6 +1358,7 @@ private CompletableFuture doSingleRowPkRequest(UUID txId, BinaryRow binaryRow .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) .full(full) + .timestamp(clock.now()) .build(), localNode.id() ); @@ -1384,6 +1390,7 @@ private CompletableFuture doMultiRowRequest(UUID txId, Collection .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) .full(full) + .timestamp(clock.now()) .build(), localNode.id() ); @@ -1408,6 +1415,7 @@ private CompletableFuture doMultiRowPkRequest(UUID txId, Collection checkRowInMvStorage(binaryRow(0), true) @@ -1461,6 +1470,7 @@ public void testWriteIntentOnPrimaryReplicaUpdateAll() { .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(); }, () -> checkRowInMvStorage(binaryRow(0), true) @@ -1534,7 +1544,7 @@ private void testWriteIntentOnPrimaryReplica( .groupId(tablePartitionIdMessage(grpId)) .txId(txId) .commit(true) - .commitTimestampLong(now.longValue()) + .commitTimestamp(now) .build(), localNode.id() ); @@ -1669,7 +1679,7 @@ void testReadOnlyGetAfterRowRewrite( assertThat(res, contains(matchers)); } else { - BinaryRow res = roGet(br1Pk, clock.nowLong()); + BinaryRow res = roGet(br1Pk, clock.now()); BinaryRow expected = committed ? (upsertAfterDelete ? br1 : null) : (insertFirst ? br1 : null); @@ -1766,7 +1776,7 @@ private CompletableFuture beginAndCommitTx() { .txId(txId) .groups(Map.of(tablePartitionIdMessage(grpId), localNode.name())) .commit(true) - .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) + .commitTimestamp(commitTimestamp) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .build(); @@ -1951,6 +1961,7 @@ private CompletableFuture doReplaceRequest(UUID targetTxId, BinaryRow oldRow, .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) .full(full) + .timestamp(clock.now()) .build(), localNode.id() ); @@ -1970,6 +1981,7 @@ public void failsWhenScanByExactMatchReadsTupleWithIncompatibleSchemaFromFuture( .batchSize(100) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(), localNode.id() ) @@ -1989,6 +2001,7 @@ public void failsWhenScanByIndexReadsTupleWithIncompatibleSchemaFromFuture() { .batchSize(100) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(), localNode.id() ) @@ -2013,6 +2026,7 @@ private CompletableFuture doRwScanRetrieveBatchRequest(UUID targetTxId) { .full(false) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(), localNode.id() ); @@ -2036,7 +2050,7 @@ private CompletableFuture doRoScanRetrieveBatchRequest(UUID targetTxId, Hybri .transactionId(targetTxId) .scanId(1) .batchSize(100) - .readTimestampLong(readTimestamp.longValue()) + .readTimestamp(readTimestamp) .coordinatorId(localNode.id()) .build(), localNode.id() @@ -2510,7 +2524,7 @@ private void testCommitRequestIfTableWasDropped( .txId(txId) .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .commit(true) - .commitTimestampLong(clock.nowLong()) + .commitTimestamp(clock.now()) .build(), localNode.id() ); @@ -2663,6 +2677,7 @@ private CompletableFuture upsertAsync(UUID txId, BinaryRow row, b .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) .full(full) + .timestamp(clock.now()) .build(); return partitionReplicaListener.invoke(message, localNode.id()); @@ -2678,12 +2693,13 @@ private void delete(UUID txId, BinaryRow row) { .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) .commitPartitionId(commitPartitionId()) .coordinatorId(localNode.id()) + .timestamp(clock.now()) .build(); assertThat(partitionReplicaListener.invoke(message, localNode.id()), willCompleteSuccessfully()); } - private BinaryRow roGet(BinaryRow row, long readTimestamp) { + private BinaryRow roGet(BinaryRow row, HybridTimestamp readTimestamp) { CompletableFuture roGetAsync = roGetAsync(row, readTimestamp); assertThat(roGetAsync, willCompleteSuccessfully()); @@ -2691,11 +2707,11 @@ private BinaryRow roGet(BinaryRow row, long readTimestamp) { return roGetAsync.join(); } - private CompletableFuture roGetAsync(BinaryRow row, long readTimestamp) { + private CompletableFuture roGetAsync(BinaryRow row, HybridTimestamp readTimestamp) { ReadOnlySingleRowPkReplicaRequest message = TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() .groupId(tablePartitionIdMessage(grpId)) .requestTypeInt(RO_GET.ordinal()) - .readTimestampLong(readTimestamp) + .readTimestamp(readTimestamp) .schemaVersion(row.schemaVersion()) .primaryKey(row.tupleSlice()) .build(); @@ -2715,7 +2731,7 @@ private CompletableFuture doReadOnlyMultiGet(Collection switchWriteIntents( primaryConsistentId, TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() .groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId)) - .timestampLong(clockService.nowLong()) + .timestamp(clockService.now()) .txId(txId) .commit(commit) - .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) + .commitTimestamp(commitTimestamp) .build() ); } @@ -134,8 +133,8 @@ public CompletableFuture cleanup( TX_MESSAGES_FACTORY.txCleanupMessage() .txId(txId) .commit(commit) - .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) - .timestampLong(clockService.nowLong()) + .commitTimestamp(commitTimestamp) + .timestamp(clockService.now()) .groups(toTablePartitionIdMessages(replicationGroupIds)) .build(), transactionConfiguration.rpcTimeout().value()); @@ -166,11 +165,11 @@ public CompletableFuture finish( primaryConsistentId, TX_MESSAGES_FACTORY.txFinishReplicaRequest() .txId(txId) - .timestampLong(clockService.nowLong()) + .timestamp(clockService.now()) .groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartition)) .groups(toTablePartitionIdMessages(replicationGroupIds)) .commit(commit) - .commitTimestampLong(hybridTimestampToLong(commitTimestamp)) + .commitTimestamp(commitTimestamp) .enlistmentConsistencyToken(consistencyToken) .build()); } @@ -215,7 +214,7 @@ public CompletableFuture resolveTxStateFromCoordinator( return messagingService.invoke( primaryConsistentId, TX_MESSAGES_FACTORY.txStateCoordinatorRequest() - .readTimestampLong(timestamp.longValue()) + .readTimestamp(timestamp) .txId(txId) .build(), transactionConfiguration.rpcTimeout().value()) diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java index 934f5951499..0780ae455fa 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.tx.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; - import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.tx.TransactionMeta; @@ -30,8 +28,8 @@ public interface TransactionMetaMessage extends NetworkMessage { /** Ordinal of {@link TxState} value. */ int txStateInt(); - /** Commit timestamp in primitive representation, {@link HybridTimestamp#NULL_HYBRID_TIMESTAMP} as {@code null}. */ - long commitTimestampLong(); + /** Commit timestamp. */ + @Nullable HybridTimestamp commitTimestamp(); /** Transaction state. */ default TxState txState() { @@ -42,11 +40,6 @@ default TxState txState() { return state; } - /** Commit timestamp. */ - default @Nullable HybridTimestamp commitTimestamp() { - return nullableHybridTimestamp(commitTimestampLong()); - } - /** Converts to {@link TransactionMeta}. */ default TransactionMeta asTransactionMeta() { throw new AssertionError("Must be implemented by heirs."); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java index 138d9be809a..b247285483c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.tx.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; - import java.util.List; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -54,19 +52,6 @@ public interface TxCleanupMessage extends TimestampAware { */ boolean commit(); - /** - * Returns a transaction commit timestamp. - * - * @return Commit timestamp. - */ - long commitTimestampLong(); - - /** - * Returns a transaction commit timestamp. - * - * @return Commit timestamp. - */ - default @Nullable HybridTimestamp commitTimestamp() { - return nullableHybridTimestamp(commitTimestampLong()); - } + /** Transaction commit timestamp. */ + @Nullable HybridTimestamp commitTimestamp(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java index abecb5159f3..ae39416d2eb 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.tx.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; - import java.util.Map; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -53,17 +51,8 @@ public interface TxFinishReplicaRequest extends PrimaryReplicaRequest, Timestamp */ boolean commit(); - /** - * Transaction commit timestamp. - */ - long commitTimestampLong(); - - /** - * Transaction commit timestamp. - */ - default @Nullable HybridTimestamp commitTimestamp() { - return nullableHybridTimestamp(commitTimestampLong()); - } + /** Transaction commit timestamp. */ + @Nullable HybridTimestamp commitTimestamp(); /** Enlisted partition groups aggregated by expected primary replica nodes. */ Map groups(); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java index 0ac49a11634..b7af2ce3629 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateCoordinatorRequest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.tx.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; - import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.NetworkMessage; @@ -31,9 +29,5 @@ public interface TxStateCoordinatorRequest extends NetworkMessage { UUID txId(); - long readTimestampLong(); - - default HybridTimestamp readTimestamp() { - return hybridTimestamp(readTimestampLong()); - } + HybridTimestamp readTimestamp(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java index 4b7a361b478..782da146826 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.tx.message; -import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; - import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.annotations.Transferable; @@ -46,19 +44,6 @@ public interface WriteIntentSwitchReplicaRequest extends ReplicaRequest, Timesta */ boolean commit(); - /** - * Returns a transaction commit timestamp. - * - * @return Commit timestamp. - */ - long commitTimestampLong(); - - /** - * Returns a transaction commit timestamp. - * - * @return Commit timestamp. - */ - default @Nullable HybridTimestamp commitTimestamp() { - return nullableHybridTimestamp(commitTimestampLong()); - } + /** Transaction commit timestamp. */ + @Nullable HybridTimestamp commitTimestamp(); }