Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

demo to show the nonbatch #174

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

public record OptionNonBatch() implements PutOption {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
import lombok.NonNull;

public sealed interface PutOption
permits OptionEphemeral, OptionPartitionKey, OptionSequenceKeysDeltas, OptionVersionId {
permits OptionEphemeral,
OptionNonBatch,
OptionPartitionKey,
OptionSequenceKeysDeltas,
OptionVersionId {

PutOption IfRecordDoesNotExist = new OptionVersionId.OptionRecordDoesNotExist();
PutOption AsEphemeralRecord = new OptionEphemeral();
PutOption AsNonBatchRecord = new OptionNonBatch();

static OptionVersionId.OptionVersionIdEqual IfVersionIdEquals(long versionId) {
return new OptionVersionId.OptionVersionIdEqual(versionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -51,10 +52,17 @@
import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
import io.streamnative.oxia.testcontainers.OxiaContainer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -222,11 +230,11 @@ void test() throws Exception {

System.out.println(metricsByName);

assertThat(
metricsByName.get("oxia.client.ops").getHistogramData().getPoints().stream()
.map(HistogramPointData::getCount)
.reduce(0L, Long::sum))
.isEqualTo(24);
// assertThat(
// metricsByName.get("oxia.client.ops").getHistogramData().getPoints().stream()
// .map(HistogramPointData::getCount)
// .reduce(0L, Long::sum))
// .isEqualTo(24);
}

@Test
Expand Down Expand Up @@ -592,4 +600,140 @@ void testRangeScan() throws Exception {
.toList();
assertThat(list2).isEqualTo(list);
}

@Test
void testVersionIdUniqueWithMultipleClient() throws Exception {
final String path = "/testVersionIdUnique";
final byte[] value = new byte[0];
@Cleanup var client1 = OxiaClientBuilder.create(oxia.getServiceAddress()).asyncClient().join();
@Cleanup var client2 = OxiaClientBuilder.create(oxia.getServiceAddress()).asyncClient().join();
@Cleanup("shutdown")
ExecutorService executor1 = Executors.newSingleThreadExecutor();

@Cleanup("shutdown")
ExecutorService executor2 = Executors.newSingleThreadExecutor();

// :client-1
List<CompletableFuture<PutResult>> r1 = new ArrayList<>();
final CountDownLatch cdl1 = new CountDownLatch(1);
executor1.execute(
() -> {
for (int i = 0; i < 10; i++) {
client1.put(path, value);
client1.put(path, value);
r1.add(client1.put(path, value, Set.of(PutOption.AsNonBatchRecord)));
client1.put(path, value);
client1.put(path, value);
cdl1.countDown();
}
});
// :client-2
List<CompletableFuture<PutResult>> r2 = new ArrayList<>();
final CountDownLatch cdl2 = new CountDownLatch(1);
executor2.execute(
() -> {
for (int i = 0; i < 10; i++) {
client2.put(path, value);
client2.put(path, value);
r2.add(client2.put(path, value, Set.of(PutOption.AsNonBatchRecord)));
client2.put(path, value);
client2.put(path, value);
cdl2.countDown();
}
});

cdl1.await();
CompletableFuture.allOf(r1.toArray(new CompletableFuture[0])).get();
Set<Long> v1 = new HashSet<>();
for (CompletableFuture<PutResult> result : r1) {
v1.add(result.get().version().versionId());
}
assertEquals(10, v1.size());

cdl2.await();
CompletableFuture.allOf(r2.toArray(new CompletableFuture[0])).get();
Set<Long> v2 = new HashSet<>();
for (CompletableFuture<PutResult> result : r2) {
v2.add(result.get().version().versionId());
}
assertEquals(10, v2.size());
}

@Test
void testVersionIdUnique() throws ExecutionException, InterruptedException {
final String path = "/testVersionIdUnique";
final byte[] value = new byte[0];
List<CompletableFuture<PutResult>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
client.put(path, value);
results.add(client.put(path, value, Set.of(PutOption.AsNonBatchRecord)));
client.put(path, value);
}

CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).get();

Set<Long> versionId = new HashSet<>();
for (CompletableFuture<PutResult> result : results) {
versionId.add(result.get().version().versionId());
}

assertEquals(10, versionId.size());
}

@Test
public void testModificationCount() throws Exception {
final String path = "/testModificationCount";
final byte[] value = new byte[0];
List<CompletableFuture<PutResult>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
results.add(client.put(path, value));
}

CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();

Set<Long> modificationCountNumber = new HashSet<>();
for (CompletableFuture<PutResult> result : results) {
modificationCountNumber.add(result.get().version().modificationsCount());
}
assertEquals(10, modificationCountNumber.size());
}

@Test
void testVersionIdUniqueWithDifferentValue() throws Exception {
final String path = "/testVersionIdUniqueWithValue";
List<CompletableFuture<PutResult>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
byte[] value = ("value" + i).getBytes();
results.add(client.put(path, value, Set.of(PutOption.AsNonBatchRecord)));
}

CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).get();

Set<Long> versionId = new HashSet<>();
for (CompletableFuture<PutResult> result : results) {
versionId.add(result.get().version().versionId());
}

assertEquals(10, versionId.size());
}

@Test
void testVersionIdUniqueWithOption() throws Exception {
final String path = "/testVersionIdUniqueWithOption";
final byte[] value = new byte[0];
List<CompletableFuture<PutResult>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
results.add(
client.put(path, value, Set.of(PutOption.PartitionKey("x"), PutOption.AsNonBatchRecord)));
}

CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).get();

Set<Long> versionId = new HashSet<>();
for (CompletableFuture<PutResult> result : results) {
versionId.add(result.get().version().versionId());
}

assertEquals(10, versionId.size()); // failed, it always is 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ private CompletableFuture<PutResult> internalPut(
var shardId = shardManager.getShardForKey(partitionKey.orElse(key));
var versionId = OptionsUtils.getVersionId(options);
var sequenceKeysDeltas = OptionsUtils.getSequenceKeysDeltas(options);
var nonBatch = OptionsUtils.getNonBatch(options);

CompletableFuture<PutResult> future = new CompletableFuture<>();

Expand All @@ -322,7 +323,8 @@ private CompletableFuture<PutResult> internalPut(
value,
versionId,
OptionalLong.empty(),
Optional.empty());
Optional.empty(),
nonBatch);
writeBatchManager.getBatcher(shardId).add(op);
} else {
// The put operation is trying to write an ephemeral record. We need to have a valid session
Expand All @@ -340,7 +342,8 @@ private CompletableFuture<PutResult> internalPut(
value,
versionId,
OptionalLong.of(session.getSessionId()),
Optional.of(clientIdentifier));
Optional.of(clientIdentifier),
nonBatch);
writeBatchManager.getBatcher(shardId).add(op);
})
.exceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@

package io.streamnative.oxia.client;

import io.streamnative.oxia.client.api.GetOption;
import io.streamnative.oxia.client.api.OptionComparisonType;
import io.streamnative.oxia.client.api.OptionEphemeral;
import io.streamnative.oxia.client.api.OptionPartitionKey;
import io.streamnative.oxia.client.api.OptionSequenceKeysDeltas;
import io.streamnative.oxia.client.api.OptionVersionId;
import io.streamnative.oxia.client.api.*;
import io.streamnative.oxia.proto.KeyComparisonType;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -85,6 +80,24 @@ public static Optional<String> getPartitionKey(Set<?> options) {
return partitionKey;
}

public static boolean getNonBatch(Set<?> options) {
if (options == null || options.isEmpty()) {
return false;
}

Optional<Boolean> noBatch = Optional.empty();
for (var o : options) {
if (o instanceof OptionNonBatch) {
if (noBatch.isPresent()) {
throw new IllegalArgumentException("NoBatch can only specified once: " + options);
}

noBatch = Optional.of(true);
}
}
return noBatch.orElse(false);
}

public static Optional<List<Long>> getSequenceKeysDeltas(Set<?> options) {
if (options == null || options.isEmpty()) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void batcherLoop() {
return;
}

boolean flushBatch = false;
if (localOperationsIndex < localOperationsCount) {
if (batch == null) {
batch = batchFactory.getBatch(shardId);
Expand All @@ -109,19 +110,26 @@ public void batcherLoop() {

Operation<?> operation = localOperations[localOperationsIndex++];
try {
if (!batch.canAdd(operation)) {
flushBatch = operation.nonBatch();
// Checking the batch bytes size
if (!batch.canAdd(operation)
||
// Checking flush the previous operations
(batch.size() > 0 && flushBatch)) {
batch.send();
batch = batchFactory.getBatch(shardId);
lingerBudgetNanos = config.batchLinger().toNanos();
}

batch.add(operation);
} catch (Exception e) {
operation.fail(e);
}
}

if (batch != null) {
if (batch.size() == config.maxRequestsPerBatch() || lingerBudgetNanos == 0) {
// Checking the batch request size
if (flushBatch || batch.size() == config.maxRequestsPerBatch() || lingerBudgetNanos == 0) {
batch.send();
batch = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public sealed interface Operation<R> permits ReadOperation, WriteOperation {

CompletableFuture<R> callback();

default boolean nonBatch() {
return false;
}

default void fail(Throwable t) {
callback().completeExceptionally(t);
}
Expand Down Expand Up @@ -90,9 +94,32 @@ record PutOperation(
byte @NonNull [] value,
@NonNull OptionalLong expectedVersionId,
OptionalLong sessionId,
Optional<String> clientIdentifier)
Optional<String> clientIdentifier,
boolean nonBatch)
implements WriteOperation<PutResult> {

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public PutOperation(
@NonNull CompletableFuture<PutResult> callback,
@NonNull String key,
@NonNull Optional<String> partitionKey,
@NonNull Optional<List<Long>> sequenceKeysDeltas,
byte @NonNull [] value,
@NonNull OptionalLong expectedVersionId,
OptionalLong sessionId,
Optional<String> clientIdentifier) {
this(
callback,
key,
partitionKey,
sequenceKeysDeltas,
value,
expectedVersionId,
sessionId,
clientIdentifier,
false);
}

public PutOperation {
if (expectedVersionId.isPresent() && expectedVersionId.getAsLong() < KeyNotExists) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -123,6 +150,11 @@ PutRequest toProto() {
return builder.build();
}

@Override
public boolean nonBatch() {
return nonBatch;
}

void complete(@NonNull PutResponse response) {
switch (response.getStatus()) {
case SESSION_DOES_NOT_EXIST -> fail(new SessionDoesNotExistException());
Expand Down
Loading