Skip to content

Commit

Permalink
Added get() with floor/ceiling/lower/higher comparison (#159)
Browse files Browse the repository at this point in the history
* Added get() with floor/ceiling/lower/higher comparison

* Fixed test
  • Loading branch information
merlimat authored May 15, 2024
1 parent b8cbc58 commit b606d0c
Show file tree
Hide file tree
Showing 20 changed files with 489 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ public interface AsyncOxiaClient extends AutoCloseable {
@NonNull
CompletableFuture<GetResult> get(String key);

/**
* Returns the record associated with the specified key. The returned value includes the value,
* and other metadata.
*
* @param key The key associated with the record to be fetched.
* @param options Set {@link GetOption options} for the get operation.
* @return The value associated with the supplied key, or {@code null} if the key did not exist.
* Supplied via a future returning a {@link GetResult}.
*/
@NonNull
CompletableFuture<GetResult> get(String key, Set<GetOption> options);

/**
* Lists any existing keys within the specified range. For more information on how keys are
* sorted, check the relevant section in the <a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

public sealed interface GetOption permits OptionComparisonType {

/** ComparisonEqual sets the Get() operation to compare the stored key for equality. */
GetOption ComparisonEqual = new OptionComparisonType(OptionComparisonType.ComparisonType.Equal);

/**
* ComparisonFloor option will make the get operation to search for the record whose key is the
* highest key <= to the supplied key.
*/
GetOption ComparisonFloor = new OptionComparisonType(OptionComparisonType.ComparisonType.Floor);

/**
* ComparisonCeiling option will make the get operation to search for the record whose key is the
* lowest key >= to the supplied key.
*/
GetOption ComparisonCeiling =
new OptionComparisonType(OptionComparisonType.ComparisonType.Ceiling);

/**
* ComparisonLower option will make the get operation to search for the record whose key is
* strictly < to the supplied key.
*/
GetOption ComparisonLower = new OptionComparisonType(OptionComparisonType.ComparisonType.Lower);

/**
* ComparisonHigher option will make the get operation to search for the record whose key is
* strictly > to the supplied key.
*/
GetOption ComparisonHigher = new OptionComparisonType(OptionComparisonType.ComparisonType.Higher);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
/** The result of a client get request. */
@Value
public class GetResult {

/** The key associated with the record. */
@NonNull String key;

/** The value associated with the key specified in the call. */
byte @NonNull [] value;
@NonNull byte[] value;

/** Metadata for the record associated with the key specified in the call. */
@NonNull Version version;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

public record OptionComparisonType(ComparisonType comparisonType) implements GetOption {

public enum ComparisonType {
Equal,
Floor,
Ceiling,
Lower,
Higher,
}

public ComparisonType getComparisonType() {
return comparisonType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ boolean delete(@NonNull String key, @NonNull Set<DeleteOption> options)
*/
GetResult get(@NonNull String key);

/**
* Returns the record associated with the specified key. The returned value includes the value,
* and other metadata.
*
* @param key The key associated with the record to be fetched.
* @param options Set {@link GetOption options} for the get operation.
* @return The value associated with the supplied key, or {@code null} if the key did not exist.
*/
GetResult get(@NonNull String key, @NonNull Set<GetOption> options);

/**
* Lists any existing keys within the specified range. For more information on how keys are
* sorted, check the relevant section in the <a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public record Version(

/** Represents the state where a versionId of a record (and thus the record) does not exist. */
public Version {
requireValidVersionId(versionId);
requireValidTimestamp(createdTimestamp);
requireValidTimestamp(modifiedTimestamp);
requireValidModificationsCount(modificationsCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,24 @@
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.GetOption;
import io.streamnative.oxia.client.api.GetResult;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.Notification.KeyCreated;
import io.streamnative.oxia.client.api.Notification.KeyDeleted;
import io.streamnative.oxia.client.api.Notification.KeyModified;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.client.api.PutOption;
import io.streamnative.oxia.client.api.SyncOxiaClient;
import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
import io.streamnative.oxia.testcontainers.OxiaContainer;
import java.util.Collections;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -217,4 +222,167 @@ void test() throws Exception {
.reduce(0L, Long::sum))
.isEqualTo(24);
}

@Test
void testGetFloorCeiling() throws Exception {
@Cleanup
SyncOxiaClient client = OxiaClientBuilder.create(oxia.getServiceAddress()).syncClient();

client.put("a", "0".getBytes());
// client.put("b", "1".getBytes()); // Skipped intentionally
client.put("c", "2".getBytes());
client.put("d", "3".getBytes());
client.put("e", "4".getBytes());
// client.put("f", "5".getBytes()); // Skipped intentionally
client.put("g", "6".getBytes());

GetResult gr = client.get("a");
assertThat(gr.getKey()).isEqualTo("a");
assertThat(gr.getValue()).isEqualTo("0".getBytes());

gr = client.get("a", Collections.singleton(GetOption.ComparisonEqual));
assertThat(gr.getKey()).isEqualTo("a");
assertThat(gr.getValue()).isEqualTo("0".getBytes());

gr = client.get("a", Collections.singleton(GetOption.ComparisonFloor));
assertThat(gr.getKey()).isEqualTo("a");
assertThat(gr.getValue()).isEqualTo("0".getBytes());

gr = client.get("a", Collections.singleton(GetOption.ComparisonCeiling));
assertThat(gr.getKey()).isEqualTo("a");
assertThat(gr.getValue()).isEqualTo("0".getBytes());

gr = client.get("a", Collections.singleton(GetOption.ComparisonLower));
assertThat(gr).isNull();

gr = client.get("a", Collections.singleton(GetOption.ComparisonHigher));
assertThat(gr.getKey()).isEqualTo("c");
assertThat(gr.getValue()).isEqualTo("2".getBytes());

// ------------------------------------------------------------------------------------------------

gr = client.get("b");
assertThat(gr).isNull();

gr = client.get("b", Collections.singleton(GetOption.ComparisonEqual));
assertThat(gr).isNull();

gr = client.get("b", Collections.singleton(GetOption.ComparisonFloor));
assertThat(gr.getKey()).isEqualTo("a");
assertThat(gr.getValue()).isEqualTo("0".getBytes());

gr = client.get("b", Collections.singleton(GetOption.ComparisonCeiling));
assertThat(gr.getKey()).isEqualTo("c");
assertThat(gr.getValue()).isEqualTo("2".getBytes());

gr = client.get("b", Collections.singleton(GetOption.ComparisonLower));
assertThat(gr.getKey()).isEqualTo("a");
assertThat(gr.getValue()).isEqualTo("0".getBytes());

gr = client.get("b", Collections.singleton(GetOption.ComparisonHigher));
assertThat(gr.getKey()).isEqualTo("c");
assertThat(gr.getValue()).isEqualTo("2".getBytes());

// ------------------------------------------------------------------------------------------------

gr = client.get("c");
assertThat(gr.getKey()).isEqualTo("c");
assertThat(gr.getValue()).isEqualTo("2".getBytes());

gr = client.get("c", Collections.singleton(GetOption.ComparisonEqual));
assertThat(gr.getKey()).isEqualTo("c");
assertThat(gr.getValue()).isEqualTo("2".getBytes());

gr = client.get("c", Collections.singleton(GetOption.ComparisonFloor));
assertThat(gr.getKey()).isEqualTo("c");
assertThat(gr.getValue()).isEqualTo("2".getBytes());

gr = client.get("c", Collections.singleton(GetOption.ComparisonCeiling));
assertThat(gr.getKey()).isEqualTo("c");
assertThat(gr.getValue()).isEqualTo("2".getBytes());

gr = client.get("c", Collections.singleton(GetOption.ComparisonLower));
assertThat(gr.getKey()).isEqualTo("a");
assertThat(gr.getValue()).isEqualTo("0".getBytes());

gr = client.get("c", Collections.singleton(GetOption.ComparisonHigher));
assertThat(gr.getKey()).isEqualTo("d");
assertThat(gr.getValue()).isEqualTo("3".getBytes());

// ------------------------------------------------------------------------------------------------

gr = client.get("d");
assertThat(gr.getKey()).isEqualTo("d");
assertThat(gr.getValue()).isEqualTo("3".getBytes());

gr = client.get("d", Collections.singleton(GetOption.ComparisonEqual));
assertThat(gr.getKey()).isEqualTo("d");
assertThat(gr.getValue()).isEqualTo("3".getBytes());

gr = client.get("d", Collections.singleton(GetOption.ComparisonFloor));
assertThat(gr.getKey()).isEqualTo("d");
assertThat(gr.getValue()).isEqualTo("3".getBytes());

gr = client.get("d", Collections.singleton(GetOption.ComparisonCeiling));
assertThat(gr.getKey()).isEqualTo("d");
assertThat(gr.getValue()).isEqualTo("3".getBytes());

gr = client.get("d", Collections.singleton(GetOption.ComparisonLower));
assertThat(gr.getKey()).isEqualTo("c");
assertThat(gr.getValue()).isEqualTo("2".getBytes());

gr = client.get("d", Collections.singleton(GetOption.ComparisonHigher));
assertThat(gr.getKey()).isEqualTo("e");
assertThat(gr.getValue()).isEqualTo("4".getBytes());

// ------------------------------------------------------------------------------------------------

gr = client.get("e");
assertThat(gr.getKey()).isEqualTo("e");
assertThat(gr.getValue()).isEqualTo("4".getBytes());

gr = client.get("e", Collections.singleton(GetOption.ComparisonEqual));
assertThat(gr.getKey()).isEqualTo("e");
assertThat(gr.getValue()).isEqualTo("4".getBytes());

gr = client.get("e", Collections.singleton(GetOption.ComparisonFloor));
assertThat(gr.getKey()).isEqualTo("e");
assertThat(gr.getValue()).isEqualTo("4".getBytes());

gr = client.get("e", Collections.singleton(GetOption.ComparisonCeiling));
assertThat(gr.getKey()).isEqualTo("e");
assertThat(gr.getValue()).isEqualTo("4".getBytes());

gr = client.get("e", Collections.singleton(GetOption.ComparisonLower));
assertThat(gr.getKey()).isEqualTo("d");
assertThat(gr.getValue()).isEqualTo("3".getBytes());

gr = client.get("e", Collections.singleton(GetOption.ComparisonHigher));
assertThat(gr.getKey()).isEqualTo("g");
assertThat(gr.getValue()).isEqualTo("6".getBytes());

// ------------------------------------------------------------------------------------------------

gr = client.get("f");
assertThat(gr).isNull();

gr = client.get("f", Collections.singleton(GetOption.ComparisonEqual));
assertThat(gr).isNull();

gr = client.get("f", Collections.singleton(GetOption.ComparisonFloor));
assertThat(gr.getKey()).isEqualTo("e");
assertThat(gr.getValue()).isEqualTo("4".getBytes());

gr = client.get("f", Collections.singleton(GetOption.ComparisonCeiling));
assertThat(gr.getKey()).isEqualTo("g");
assertThat(gr.getValue()).isEqualTo("6".getBytes());

gr = client.get("f", Collections.singleton(GetOption.ComparisonLower));
assertThat(gr.getKey()).isEqualTo("e");
assertThat(gr.getValue()).isEqualTo("4".getBytes());

gr = client.get("f", Collections.singleton(GetOption.ComparisonHigher));
assertThat(gr.getKey()).isEqualTo("g");
assertThat(gr.getValue()).isEqualTo("6".getBytes());
}
}
Loading

0 comments on commit b606d0c

Please sign in to comment.