Skip to content

Commit

Permalink
IGNITE-22715 Support HybridTimestamp in Network serialization (#4078)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkalkirill authored Jul 15, 2024
1 parent 8f510c4 commit bc6c427
Show file tree
Hide file tree
Showing 56 changed files with 300 additions and 381 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.plugin.extensions.communication;

import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -87,7 +88,10 @@ public enum MessageCollectionItemType {
IGNITE_UUID,

/** Message. */
MSG;
MSG,

/** {@link HybridTimestamp}. */
HYBRID_TIMESTAMP;

/** Enum values. */
private static final MessageCollectionItemType[] VALS = values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
Expand Down Expand Up @@ -206,7 +207,7 @@ public void setUp() {
Command cmd = invocationClose.getArgument(0);

if (cmd instanceof MetaStorageWriteCommand) {
((MetaStorageWriteCommand) cmd).safeTimeLong(10);
((MetaStorageWriteCommand) cmd).safeTime(hybridTimestamp(10));
}

long commandIndex = raftIndex.incrementAndGet();
Expand Down Expand Up @@ -254,7 +255,7 @@ public void result(@Nullable Serializable r) {
MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand()
.iif(iif)
.id(commandIdGenerator.newId())
.initiatorTimeLong(clusterTime.nowLong())
.initiatorTime(clusterTime.now())
.build();

return metaStorageService.run(multiInvokeCommand);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
import static org.apache.ignite.internal.affinity.Assignments.toBytes;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.hlc.TestClockService.TEST_MAX_CLOCK_SKEW_MILLIS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -154,7 +155,7 @@ public void setUp() {
long commandIndex = raftIndex.incrementAndGet();

if (cmd instanceof MetaStorageWriteCommand) {
((MetaStorageWriteCommand) cmd).safeTimeLong(10);
((MetaStorageWriteCommand) cmd).safeTime(hybridTimestamp(10));
}

CompletableFuture<Serializable> res = new CompletableFuture<>();
Expand Down Expand Up @@ -200,7 +201,7 @@ public void result(@Nullable Serializable r) {
MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand()
.iif(iif)
.id(commandIdGenerator.newId())
.initiatorTimeLong(clusterTime.nowLong())
.initiatorTime(clusterTime.now())
.build();

return metaStorageService.run(multiInvokeCommand);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ private static IdempotentCommand buildKeyNotExistsInvokeCommand(
.condition(notExists(testKey))
.success(List.of(put(testKey, testValue)))
.failure(List.of(put(testKey, anotherValue)))
.initiatorTimeLong(clock.nowLong())
.initiatorTime(clock.now())
.id(commandIdGenerator.newId())
.build();
}
Expand All @@ -504,8 +504,8 @@ private static IdempotentCommand buildKeyNotExistsMultiInvokeCommand(
return CMD_FACTORY.multiInvokeCommand()
.id(commandIdGenerator.newId())
.iif(iif)
.safeTimeLong(clock.now().longValue())
.initiatorTimeLong(clock.now().longValue())
.safeTime(clock.now())
.initiatorTime(clock.now())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,18 @@

package org.apache.ignite.internal.metastorage.command;

import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;

import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.annotations.WithSetter;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.jetbrains.annotations.Nullable;

/** Base meta storage write command. */
public interface MetaStorageWriteCommand extends WriteCommand {
/**
* Returns time on the initiator node.
*/
long initiatorTimeLong();

/**
* Returns time on the initiator node.
*/
default HybridTimestamp initiatorTime() {
return hybridTimestamp(initiatorTimeLong());
}
/** Time on the initiator node. */
HybridTimestamp initiatorTime();

/**
* This is a dirty hack. This time is set by the leader node to disseminate new safe time across
Expand All @@ -47,19 +37,12 @@ default HybridTimestamp initiatorTime() {
* command is saved into the Raft log (see {@link BeforeApplyHandler#onBeforeApply(Command)}.
*/
@WithSetter
long safeTimeLong();
@Nullable HybridTimestamp safeTime();

/**
* Setter for the safeTime field.
*/
default void safeTimeLong(long safeTime) {
default void safeTime(HybridTimestamp safeTime) {
// No-op.
}

/**
* Convenient getter for {@link #safeTimeLong()}.
*/
default HybridTimestamp safeTime() {
return hybridTimestamp(safeTimeLong());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public CompletableFuture<Void> put(ByteArray key, byte[] value) {
PutCommand putCommand = context.commandsFactory().putCommand()
.key(ByteBuffer.wrap(key.bytes()))
.value(ByteBuffer.wrap(value))
.initiatorTimeLong(clusterTime.nowLong())
.initiatorTime(clusterTime.now())
.build();

return context.raftService().run(putCommand);
Expand All @@ -148,7 +148,7 @@ public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
@Override
public CompletableFuture<Void> remove(ByteArray key) {
RemoveCommand removeCommand = context.commandsFactory().removeCommand().key(ByteBuffer.wrap(key.bytes()))
.initiatorTimeLong(clusterTime.nowLong()).build();
.initiatorTime(clusterTime.now()).build();

return context.raftService().run(removeCommand);
}
Expand All @@ -175,7 +175,7 @@ public CompletableFuture<Boolean> invoke(
.condition(condition)
.success(success)
.failure(failure)
.initiatorTimeLong(clusterTime.nowLong())
.initiatorTime(clusterTime.now())
.id(commandIdGenerator.newId())
.build();

Expand All @@ -186,7 +186,7 @@ public CompletableFuture<Boolean> invoke(
public CompletableFuture<StatementResult> invoke(Iif iif) {
MultiInvokeCommand multiInvokeCommand = context.commandsFactory().multiInvokeCommand()
.iif(iif)
.initiatorTimeLong(clusterTime.nowLong())
.initiatorTime(clusterTime.now())
.id(commandIdGenerator.newId())
.build();

Expand Down Expand Up @@ -248,7 +248,7 @@ public Publisher<Entry> prefix(ByteArray prefix, long revUpperBound) {
*/
public CompletableFuture<Void> syncTime(HybridTimestamp safeTime, long term) {
SyncTimeCommand syncTimeCommand = context.commandsFactory().syncTimeCommand()
.initiatorTimeLong(safeTime.longValue())
.initiatorTime(safeTime)
.initiatorTerm(term)
.build();

Expand Down Expand Up @@ -312,7 +312,7 @@ private PutAllCommand putAllCommand(MetaStorageCommandsFactory commandsFactory,
return commandsFactory.putAllCommand()
.keys(keys)
.values(values)
.initiatorTimeLong(ts.longValue())
.initiatorTime(ts)
.build();
}

Expand All @@ -330,6 +330,6 @@ private RemoveAllCommand removeAllCommand(MetaStorageCommandsFactory commandsFac
list.add(ByteBuffer.wrap(key.bytes()));
}

return commandsFactory.removeAllCommand().keys(list).initiatorTimeLong(ts.longValue()).build();
return commandsFactory.removeAllCommand().keys(list).initiatorTime(ts).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ boolean beforeApply(Command command) {

clusterTime.adjust(writeCommand.initiatorTime());

writeCommand.safeTimeLong(clusterTime.nowLong());
writeCommand.safeTime(clusterTime.now());

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public void testIdempotentInvoke() {
.condition(notExists(testKey))
.success(List.of(put(testKey, testValue.bytes())))
.failure(List.of(put(testKey, anotherValue.bytes())))
.safeTimeLong(clock.now().longValue())
.initiatorTimeLong(clock.now().longValue())
.safeTime(clock.now())
.initiatorTime(clock.now())
.build();

metaStorageListener.onWrite(commandIterator(command));
Expand Down Expand Up @@ -138,8 +138,8 @@ public void testIdempotentMultiInvoke() {
MultiInvokeCommand command = CMD_FACTORY.multiInvokeCommand()
.id(commandIdGenerator.newId())
.iif(iif)
.safeTimeLong(clock.now().longValue())
.initiatorTimeLong(clock.now().longValue())
.safeTime(clock.now())
.initiatorTime(clock.now())
.build();

metaStorageListener.onWrite(commandIterator(command));
Expand All @@ -166,8 +166,8 @@ public void testNonIdempotentCommand() {
PutCommand command0 = CMD_FACTORY.putCommand()
.key(ByteBuffer.wrap(testKey.bytes()))
.value(ByteBuffer.wrap(testValue0.bytes()))
.safeTimeLong(clock.now().longValue())
.initiatorTimeLong(clock.now().longValue())
.safeTime(clock.now())
.initiatorTime(clock.now())
.build();

metaStorageListener.onWrite(commandIterator(command0));
Expand All @@ -183,8 +183,8 @@ public void testNonIdempotentCommand() {
PutCommand command1 = CMD_FACTORY.putCommand()
.key(ByteBuffer.wrap(testKey.bytes()))
.value(ByteBuffer.wrap(testValue1.bytes()))
.safeTimeLong(clock.now().longValue())
.initiatorTimeLong(clock.now().longValue())
.safeTime(clock.now())
.initiatorTime(clock.now())
.build();

metaStorageListener.onWrite(commandIterator(command1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.lang.model.type.PrimitiveType;
import javax.lang.model.type.TypeKind;
import javax.lang.model.type.TypeMirror;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteUuid;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.processor.ProcessingException;
Expand Down Expand Up @@ -140,8 +141,10 @@ private String resolveReferenceMethodName(DeclaredType parameterType) {
return "Map";
} else if (typeUtils.isSameType(parameterType, ByteBuffer.class)) {
return "ByteBuffer";
} else {
throw new ProcessingException("Unsupported reference type for message (de-)serialization: " + parameterType);
} else if (typeUtils.isSameType(parameterType, HybridTimestamp.class)) {
return "HybridTimestamp";
}

throw new ProcessingException("Unsupported reference type for message (de-)serialization: " + parameterType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.lang.model.type.DeclaredType;
import javax.lang.model.type.PrimitiveType;
import javax.lang.model.type.TypeMirror;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteUuid;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.processor.ProcessingException;
Expand Down Expand Up @@ -128,8 +129,10 @@ private MessageCollectionItemType fromDeclaredType(DeclaredType parameterType) {
return MessageCollectionItemType.BIT_SET;
} else if (typeUtils.isSameType(parameterType, ByteBuffer.class)) {
return MessageCollectionItemType.BYTE_BUFFER;
} else {
throw new ProcessingException("Unsupported MessageCollectionItemType: " + parameterType);
} else if (typeUtils.isSameType(parameterType, HybridTimestamp.class)) {
return MessageCollectionItemType.HYBRID_TIMESTAMP;
}

throw new ProcessingException("Unsupported MessageCollectionItemType: " + parameterType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteUuid;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.serialization.MessageDeserializer;
Expand Down Expand Up @@ -72,6 +73,7 @@
* <li>Array of primitive types, corresponding boxed types or other directly marshallable types;</li>
* <li>{@code Collection} of boxed primitive types or other directly marshallable types;</li>
* <li>{@code Map} where both keys and values can be of a directly marshallable type.</li>
* <li>{@link HybridTimestamp}.</li>
* </ol>
*
* <p>After all marked interfaces in a module have been processed, the processor will use the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteUuid;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
Expand Down Expand Up @@ -304,6 +305,13 @@ public interface MessageReader {
*/
IgniteUuid readIgniteUuid(String name);

/**
* Reads an {@link HybridTimestamp}.
*
* @param name Field name.
*/
@Nullable HybridTimestamp readHybridTimestamp(String name);

/**
* Reads a group type or a message type of the message. Unlike regular {@link #readShort(String)}, this method never accepts a field
* name, because there are no names assigned to the header values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteUuid;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
Expand Down Expand Up @@ -339,6 +340,15 @@ public interface MessageWriter {
*/
boolean writeIgniteUuid(String name, IgniteUuid val);

/**
* Writes an {@link HybridTimestamp}.
*
* @param name Field name.
* @param val {@link HybridTimestamp}.
* @return Whether a value was fully written.
*/
boolean writeHybridTimestamp(String name, @Nullable HybridTimestamp val);

/**
* Writes a nested message.
*
Expand Down
Loading

0 comments on commit bc6c427

Please sign in to comment.