diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/AsyncOxiaClient.java b/client-api/src/main/java/io/streamnative/oxia/client/api/AsyncOxiaClient.java index 5b608aee..6452fddf 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/AsyncOxiaClient.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/AsyncOxiaClient.java @@ -108,6 +108,18 @@ public interface AsyncOxiaClient extends AutoCloseable { @NonNull CompletableFuture get(String key); + /** + * Returns the record associated with the specified key. The returned value includes the value, + * and other metadata. + * + * @param key The key associated with the record to be fetched. + * @param options Set {@link GetOption options} for the get operation. + * @return The value associated with the supplied key, or {@code null} if the key did not exist. + * Supplied via a future returning a {@link GetResult}. + */ + @NonNull + CompletableFuture get(String key, Set options); + /** * Lists any existing keys within the specified range. For more information on how keys are * sorted, check the relevant section in the = to the supplied key. + */ + GetOption ComparisonCeiling = + new OptionComparisonType(OptionComparisonType.ComparisonType.Ceiling); + + /** + * ComparisonLower option will make the get operation to search for the record whose key is + * strictly < to the supplied key. + */ + GetOption ComparisonLower = new OptionComparisonType(OptionComparisonType.ComparisonType.Lower); + + /** + * ComparisonHigher option will make the get operation to search for the record whose key is + * strictly > to the supplied key. + */ + GetOption ComparisonHigher = new OptionComparisonType(OptionComparisonType.ComparisonType.Higher); +} diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/GetResult.java b/client-api/src/main/java/io/streamnative/oxia/client/api/GetResult.java index ca9b4b67..6ee521e0 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/GetResult.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/GetResult.java @@ -21,8 +21,12 @@ /** The result of a client get request. */ @Value public class GetResult { + + /** The key associated with the record. */ + @NonNull String key; + /** The value associated with the key specified in the call. */ - byte @NonNull [] value; + @NonNull byte[] value; /** Metadata for the record associated with the key specified in the call. */ @NonNull Version version; diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/OptionComparisonType.java b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionComparisonType.java new file mode 100644 index 00000000..f484c45f --- /dev/null +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionComparisonType.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.api; + +public record OptionComparisonType(ComparisonType comparisonType) implements GetOption { + + public enum ComparisonType { + Equal, + Floor, + Ceiling, + Lower, + Higher, + } + + public ComparisonType getComparisonType() { + return comparisonType; + } +} diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/SyncOxiaClient.java b/client-api/src/main/java/io/streamnative/oxia/client/api/SyncOxiaClient.java index d89df28d..6a1a9767 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/SyncOxiaClient.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/SyncOxiaClient.java @@ -96,6 +96,16 @@ boolean delete(@NonNull String key, @NonNull Set options) */ GetResult get(@NonNull String key); + /** + * Returns the record associated with the specified key. The returned value includes the value, + * and other metadata. + * + * @param key The key associated with the record to be fetched. + * @param options Set {@link GetOption options} for the get operation. + * @return The value associated with the supplied key, or {@code null} if the key did not exist. + */ + GetResult get(@NonNull String key, @NonNull Set options); + /** * Lists any existing keys within the specified range. For more information on how keys are * sorted, check the relevant section in the internalPut( @Override public @NonNull CompletableFuture get(String key) { + return get(key, Collections.emptySet()); + } + + @Override + public @NonNull CompletableFuture get(String key, Set options) { long startTime = System.nanoTime(); gaugePendingGetRequests.increment(); var callback = new CompletableFuture(); try { checkIfClosed(); Objects.requireNonNull(key); - var shardId = shardManager.getShardForKey(key); - readBatchManager.getBatcher(shardId).add(new GetOperation(callback, key)); + internalGet(key, options, callback); } catch (RuntimeException e) { callback.completeExceptionally(e); } @@ -404,6 +410,65 @@ private CompletableFuture internalPut( }); } + private void internalGet( + String key, Set options, CompletableFuture result) { + KeyComparisonType comparisonType = GetOptionsUtil.getComparisonType(options); + if (comparisonType == KeyComparisonType.EQUAL) { + // Normal equality get operation + long shardId = shardManager.getShardForKey(key); + readBatchManager.getBatcher(shardId).add(new GetOperation(result, key, comparisonType)); + } else { + internalGetFloorCeiling(key, comparisonType, result); + } + } + + private void internalGetFloorCeiling( + String key, KeyComparisonType comparisonType, CompletableFuture result) { + // We need check on all the shards for a floor/ceiling query + List> futures = new ArrayList<>(); + for (long shardId : shardManager.allShardIds()) { + CompletableFuture f = new CompletableFuture<>(); + readBatchManager.getBatcher(shardId).add(new GetOperation(f, key, comparisonType)); + futures.add(f); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .whenComplete( + (v, ex) -> { + if (ex != null) { + result.completeExceptionally(ex); + return; + } + + try { + List results = + futures.stream() + .map(CompletableFuture::join) + .filter(Objects::nonNull) + .sorted( + (o1, o2) -> CompareWithSlash.INSTANCE.compare(o1.getKey(), o2.getKey())) + .toList(); + if (results.isEmpty()) { + result.complete(null); + return; + } + + GetResult gr = + switch (comparisonType) { + case EQUAL, + UNRECOGNIZED -> null; // This would be handled withing context of single + // shard + case FLOOR, LOWER -> results.get(results.size() - 1); + case CEILING, HIGHER -> results.get(0); + }; + + result.complete(gr); + } catch (Throwable t) { + result.completeExceptionally(t); + } + }); + } + @Override public @NonNull CompletableFuture> list( String startKeyInclusive, String endKeyExclusive) { diff --git a/client/src/main/java/io/streamnative/oxia/client/GetOptionsUtil.java b/client/src/main/java/io/streamnative/oxia/client/GetOptionsUtil.java new file mode 100644 index 00000000..0c414275 --- /dev/null +++ b/client/src/main/java/io/streamnative/oxia/client/GetOptionsUtil.java @@ -0,0 +1,57 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.streamnative.oxia.client; + +import io.streamnative.oxia.client.api.GetOption; +import io.streamnative.oxia.client.api.OptionComparisonType; +import io.streamnative.oxia.proto.KeyComparisonType; +import java.util.Set; +import lombok.experimental.UtilityClass; + +@UtilityClass +public class GetOptionsUtil { + + public static KeyComparisonType getComparisonType(Set options) { + if (options == null || options.isEmpty()) { + return KeyComparisonType.EQUAL; + } + + boolean alreadyHasComparisonType = false; + KeyComparisonType comparisonType = KeyComparisonType.EQUAL; + for (GetOption o : options) { + if (o instanceof OptionComparisonType e) { + + if (alreadyHasComparisonType) { + throw new IllegalArgumentException( + "Incompatible " + GetOption.class.getSimpleName() + "s: " + options); + } + + comparisonType = + switch (e.comparisonType()) { + case Equal -> KeyComparisonType.EQUAL; + case Floor -> KeyComparisonType.FLOOR; + case Ceiling -> KeyComparisonType.CEILING; + case Lower -> KeyComparisonType.LOWER; + case Higher -> KeyComparisonType.HIGHER; + }; + alreadyHasComparisonType = true; + } + } + + return comparisonType; + } +} diff --git a/client/src/main/java/io/streamnative/oxia/client/ProtoUtil.java b/client/src/main/java/io/streamnative/oxia/client/ProtoUtil.java index 63f7110a..3ac1064c 100644 --- a/client/src/main/java/io/streamnative/oxia/client/ProtoUtil.java +++ b/client/src/main/java/io/streamnative/oxia/client/ProtoUtil.java @@ -40,9 +40,14 @@ public static long uint32ToLong(int n) { return new PutResult(getVersionFromProto(response.getVersion())); } - public static @NonNull GetResult getResultFromProto(@NonNull GetResponse response) { + public static @NonNull GetResult getResultFromProto( + @NonNull String originalKey, @NonNull GetResponse response) { + String key = originalKey; + if (response.hasKey()) { + key = response.getKey(); + } return new GetResult( - response.getValue().toByteArray(), getVersionFromProto(response.getVersion())); + key, response.getValue().toByteArray(), getVersionFromProto(response.getVersion())); } public static @NonNull Version getVersionFromProto( diff --git a/client/src/main/java/io/streamnative/oxia/client/SyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/SyncOxiaClientImpl.java index a8732b28..b3bbc75e 100644 --- a/client/src/main/java/io/streamnative/oxia/client/SyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/SyncOxiaClientImpl.java @@ -17,6 +17,7 @@ import io.streamnative.oxia.client.api.AsyncOxiaClient; import io.streamnative.oxia.client.api.DeleteOption; +import io.streamnative.oxia.client.api.GetOption; import io.streamnative.oxia.client.api.GetResult; import io.streamnative.oxia.client.api.Notification; import io.streamnative.oxia.client.api.PutOption; @@ -92,8 +93,14 @@ public void deleteRange(@NonNull String startKeyInclusive, @NonNull String endKe @SneakyThrows @Override public GetResult get(@NonNull String key) { + return get(key, Collections.emptySet()); + } + + @SneakyThrows + @Override + public GetResult get(@NonNull String key, @NonNull Set options) { try { - return asyncClient.get(key).get(); + return asyncClient.get(key, options).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java b/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java index 4d1d852e..28350d1e 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java @@ -36,6 +36,7 @@ import io.streamnative.oxia.proto.DeleteResponse; import io.streamnative.oxia.proto.GetRequest; import io.streamnative.oxia.proto.GetResponse; +import io.streamnative.oxia.proto.KeyComparisonType; import io.streamnative.oxia.proto.PutRequest; import io.streamnative.oxia.proto.PutResponse; import io.streamnative.oxia.proto.Status; @@ -55,16 +56,23 @@ default void fail(Throwable t) { } sealed interface ReadOperation extends Operation permits GetOperation { - record GetOperation(@NonNull CompletableFuture callback, @NonNull String key) + record GetOperation( + @NonNull CompletableFuture callback, + @NonNull String key, + KeyComparisonType comparisonType) implements ReadOperation { GetRequest toProto() { - return GetRequest.newBuilder().setKey(key).setIncludeValue(true).build(); + return GetRequest.newBuilder() + .setKey(key) + .setComparisonType(comparisonType) + .setIncludeValue(true) + .build(); } void complete(@NonNull GetResponse response) { switch (response.getStatus()) { case KEY_NOT_FOUND -> callback.complete(null); - case OK -> callback.complete(ProtoUtil.getResultFromProto(response)); + case OK -> callback.complete(ProtoUtil.getResultFromProto(key, response)); default -> fail(new IllegalStateException("GRPC.Status: " + response.getStatus().name())); } } diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java b/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java index 1649523f..348e08bc 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java @@ -15,8 +15,6 @@ */ package io.streamnative.oxia.client.batch; -import static java.util.stream.Collectors.toList; - import com.google.common.annotations.VisibleForTesting; import io.grpc.stub.StreamObserver; import io.streamnative.oxia.client.grpc.OxiaStubProvider; @@ -92,8 +90,7 @@ public void onCompleted() { ReadRequest toProto() { return ReadRequest.newBuilder() .setShardId(getShardId()) - .addAllGets( - gets.stream().map(Operation.ReadOperation.GetOperation::toProto).collect(toList())) + .addAllGets(gets.stream().map(Operation.ReadOperation.GetOperation::toProto).toList()) .build(); } } diff --git a/client/src/main/proto/io/streamnative/oxia/client.proto b/client/src/main/proto/io/streamnative/oxia/client.proto index cc184a02..c5e12fa8 100644 --- a/client/src/main/proto/io/streamnative/oxia/client.proto +++ b/client/src/main/proto/io/streamnative/oxia/client.proto @@ -258,6 +258,22 @@ message DeleteResponse { Status status = 1; } +/** + * The type of key comparison to apply in a get() request + */ +enum KeyComparisonType { + // The stored key must be equal to the requested key + EQUAL = 0; + // Search for a key that is the highest key that is <= to the requested key + FLOOR = 1; + // Search for a key that is the lowest key that is >= to the requested key + CEILING = 2; + // Search for a key that is the highest key that is < to the requested key + LOWER = 3; + // Search for a key that is the lowest key that is > to the requested key + HIGHER = 4; +} + /** * A get request. Gets the stat and optionally the value for the specified * key. @@ -267,6 +283,8 @@ message GetRequest { string key = 1; // Specifies whether the response should include the value bool include_value = 2; + + KeyComparisonType comparison_type = 3; } /** @@ -279,6 +297,9 @@ message GetResponse { Version version = 2; // The value, if it was requested and there was no error optional bytes value = 3; + // In case of non-exact queries (eg. floor, ceiling) the found key will be + // returned in the GetResponse. + optional string key = 4; } /** diff --git a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java index 8d61bb71..c94c0f42 100644 --- a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java @@ -333,7 +333,8 @@ void get() { .satisfies( o -> { assertThat(o.key()).isEqualTo(key); - var getResult = new GetResult(new byte[1], new Version(1, 2, 3, 4, empty(), empty())); + var getResult = + new GetResult(key, new byte[1], new Version(1, 2, 3, 4, empty(), empty())); o.callback().complete(getResult); }); } diff --git a/client/src/test/java/io/streamnative/oxia/client/api/GetResultTest.java b/client/src/test/java/io/streamnative/oxia/client/api/GetResultTest.java index df2a11ef..26aad5d7 100644 --- a/client/src/test/java/io/streamnative/oxia/client/api/GetResultTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/api/GetResultTest.java @@ -32,6 +32,7 @@ void fromProto() { var payload = "hello".getBytes(UTF_8); assertThat( ProtoUtil.getResultFromProto( + "original-key", GetResponse.newBuilder() .setValue(ByteString.copyFrom(payload)) .setVersion( @@ -44,6 +45,32 @@ void fromProto() { .build())) .isEqualTo( new GetResult( + "original-key", + payload, + new io.streamnative.oxia.client.api.Version( + 1L, 2L, 3L, 4L, Optional.empty(), Optional.empty()))); + } + + @Test + void fromProtoWithOverride() { + var payload = "hello".getBytes(UTF_8); + assertThat( + ProtoUtil.getResultFromProto( + "original-key", + GetResponse.newBuilder() + .setKey("new-key") + .setValue(ByteString.copyFrom(payload)) + .setVersion( + Version.newBuilder() + .setVersionId(1L) + .setCreatedTimestamp(2L) + .setModifiedTimestamp(3L) + .setModificationsCount(4L) + .build()) + .build())) + .isEqualTo( + new GetResult( + "new-key", payload, new io.streamnative.oxia.client.api.Version( 1L, 2L, 3L, 4L, Optional.empty(), Optional.empty()))); diff --git a/client/src/test/java/io/streamnative/oxia/client/api/VersionTest.java b/client/src/test/java/io/streamnative/oxia/client/api/VersionTest.java index b1c00ff1..d82c7262 100644 --- a/client/src/test/java/io/streamnative/oxia/client/api/VersionTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/api/VersionTest.java @@ -41,12 +41,6 @@ void invalidModified() { .isInstanceOf(IllegalArgumentException.class); } - @Test - void invalidVersionId() { - assertThatThrownBy(() -> new Version(-1, 0, 0, 0, Optional.empty(), Optional.empty())) - .isInstanceOf(IllegalArgumentException.class); - } - @Test void invalidModificationsCount() { assertThatThrownBy(() -> new Version(0, 0, 0, -1, Optional.empty(), Optional.empty())) diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java index 4604b092..64cc030e 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java @@ -52,6 +52,7 @@ import io.streamnative.oxia.proto.DeleteRangeResponse; import io.streamnative.oxia.proto.DeleteResponse; import io.streamnative.oxia.proto.GetResponse; +import io.streamnative.oxia.proto.KeyComparisonType; import io.streamnative.oxia.proto.PutResponse; import io.streamnative.oxia.proto.ReadRequest; import io.streamnative.oxia.proto.ReadResponse; @@ -328,7 +329,7 @@ public void shardId() { class ReadBatchTests { ReadBatch batch; CompletableFuture getCallable = new CompletableFuture<>(); - GetOperation get = new GetOperation(getCallable, ""); + GetOperation get = new GetOperation(getCallable, "", KeyComparisonType.EQUAL); @BeforeEach void setup() { diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java index 41431e9f..acddb931 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java @@ -31,6 +31,7 @@ import io.streamnative.oxia.client.api.PutResult; import io.streamnative.oxia.client.batch.Operation.ReadOperation.GetOperation; import io.streamnative.oxia.client.util.BatchedArrayBlockingQueue; +import io.streamnative.oxia.proto.KeyComparisonType; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Optional; @@ -78,7 +79,7 @@ void teardown() { @Test void createBatchAndAdd() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key"); + Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); when(batch.canAdd(any())).thenReturn(true); @@ -90,7 +91,7 @@ void createBatchAndAdd() throws Exception { @Test void sendBatchOnFull() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key"); + Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(config.maxRequestsPerBatch()); when(batch.canAdd(any())).thenReturn(true); @@ -124,7 +125,7 @@ void addWhenNextDoesNotFit() { @Test void sendBatchOnFullThenNewBatch() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key"); + Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(config.maxRequestsPerBatch(), 1); when(batch.canAdd(any())).thenReturn(true); @@ -142,7 +143,7 @@ void sendBatchOnFullThenNewBatch() throws Exception { @Test void sendBatchOnLingerExpiration() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key"); + Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); when(batch.canAdd(any())).thenReturn(true); @@ -155,7 +156,7 @@ void sendBatchOnLingerExpiration() throws Exception { @Test void sendBatchOnLingerExpirationMulti() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key"); + Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); when(batch.canAdd(any())).thenReturn(true); @@ -171,7 +172,7 @@ void sendBatchOnLingerExpirationMulti() throws Exception { @Test void unboundedTakeAtStart() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key"); + Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java index ca4687d9..8f3973c7 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java @@ -38,6 +38,7 @@ import io.streamnative.oxia.proto.DeleteRangeResponse; import io.streamnative.oxia.proto.DeleteResponse; import io.streamnative.oxia.proto.GetResponse; +import io.streamnative.oxia.proto.KeyComparisonType; import io.streamnative.oxia.proto.PutResponse; import io.streamnative.oxia.proto.Version; import java.util.Objects; @@ -59,7 +60,7 @@ class OperationTest { class GetOperationTests { CompletableFuture callback = new CompletableFuture<>(); - GetOperation op = new GetOperation(callback, "key"); + GetOperation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); @Test void toProto() { @@ -79,6 +80,7 @@ void completeOk() { var payload = "hello".getBytes(UTF_8); var response = GetResponse.newBuilder() + .setKey("my-key") .setStatus(OK) .setValue(ByteString.copyFrom(payload)) .setVersion( @@ -93,6 +95,7 @@ void completeOk() { assertThat(callback) .isCompletedWithValue( new GetResult( + "my-key", payload, new io.streamnative.oxia.client.api.Version( 1L, 2L, 3L, 4L, Optional.empty(), Optional.empty()))); @@ -103,6 +106,7 @@ void completeOkEphemeral() { var payload = "hello".getBytes(UTF_8); var response = GetResponse.newBuilder() + .setKey("my-key") .setStatus(OK) .setValue(ByteString.copyFrom(payload)) .setVersion( @@ -119,6 +123,7 @@ void completeOkEphemeral() { assertThat(callback) .isCompletedWithValue( new GetResult( + "my-key", payload, new io.streamnative.oxia.client.api.Version( 1L, 2L, 3L, 4L, Optional.of(5L), Optional.of("client-id"))));