Skip to content

Commit

Permalink
Use ArrayBlockingQueue instead of PriorityQueue in batcher (#140)
Browse files Browse the repository at this point in the history
* Use ArrayBlockingQueue instead of PriorityQueue in batcher

* Spotless
  • Loading branch information
merlimat authored May 2, 2024
1 parent d03623c commit bc4178c
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.NonNull;
Expand Down Expand Up @@ -93,7 +92,6 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
private final @NonNull BatchManager readBatchManager;
private final @NonNull BatchManager writeBatchManager;
private final @NonNull SessionManager sessionManager;
private final AtomicLong sequence = new AtomicLong();
private volatile boolean closed;

private final Counter counterPutBytes;
Expand Down Expand Up @@ -236,12 +234,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
var versionId = PutOption.toVersionId(validatedOptions);
var op =
new PutOperation(
sequence.getAndIncrement(),
callback,
key,
value,
versionId,
PutOption.toEphemeral(validatedOptions));
callback, key, value, versionId, PutOption.toEphemeral(validatedOptions));
writeBatchManager.getBatcher(shardId).add(op);
} catch (RuntimeException e) {
callback.completeExceptionally(e);
Expand Down Expand Up @@ -273,9 +266,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
var validatedOptions = DeleteOption.validate(options);
var shardId = shardManager.get(key);
var versionId = DeleteOption.toVersionId(validatedOptions);
writeBatchManager
.getBatcher(shardId)
.add(new DeleteOperation(sequence.getAndIncrement(), callback, key, versionId));
writeBatchManager.getBatcher(shardId).add(new DeleteOperation(callback, key, versionId));
} catch (RuntimeException e) {
callback.completeExceptionally(e);
}
Expand Down Expand Up @@ -308,10 +299,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
var shardCallback = new CompletableFuture<Void>();
b.add(
new DeleteRangeOperation(
sequence.getAndIncrement(),
shardCallback,
startKeyInclusive,
endKeyExclusive));
shardCallback, startKeyInclusive, endKeyExclusive));
return shardCallback;
})
.collect(toList())
Expand Down Expand Up @@ -340,9 +328,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
checkIfClosed();
Objects.requireNonNull(key);
var shardId = shardManager.get(key);
readBatchManager
.getBatcher(shardId)
.add(new GetOperation(sequence.getAndIncrement(), callback, key));
readBatchManager.getBatcher(shardId).add(new GetOperation(callback, key));
} catch (RuntimeException e) {
callback.completeExceptionally(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class OxiaClientBuilder {
public static final int DefaultMaxBatchSize = 128 * 1024;
public static final Duration DefaultRequestTimeout = Duration.ofSeconds(30);
public static final Duration DefaultSessionTimeout = Duration.ofSeconds(15);
public static final int DefaultRecordCacheCapacity = 10_000;
public static final int DefaultRecordCacheCapacity = 0;
public static final String DefaultNamespace = "default";

@NonNull private final String serviceAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@
*/
package io.streamnative.oxia.client.batch;

import static java.util.concurrent.TimeUnit.SECONDS;
import static lombok.AccessLevel.PACKAGE;

import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.session.SessionManager;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -36,8 +32,6 @@
@RequiredArgsConstructor
public class BatchManager implements AutoCloseable {

private final ExecutorService executor =
Executors.newCachedThreadPool(new DefaultThreadFactory("batch-manager"));
private final ConcurrentMap<Long, Batcher> batchersByShardId = new ConcurrentHashMap<>();
private final @NonNull Function<Long, Batcher> batcherFactory;
private volatile boolean closed;
Expand All @@ -50,9 +44,7 @@ public Batcher getBatcher(long shardId) {
}

private Batcher createAndStartBatcher(long shardId) {
Batcher batcher = batcherFactory.apply(shardId);
executor.execute(batcher);
return batcher;
return batcherFactory.apply(shardId);
}

@Override
Expand All @@ -62,18 +54,6 @@ public void close() throws Exception {
}
closed = true;
batchersByShardId.values().forEach(Batcher::close);
shutdownExecutor();
}

private void shutdownExecutor() {
executor.shutdown();
try {
if (!executor.awaitTermination(1, SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}

@RequiredArgsConstructor(access = PACKAGE)
Expand Down
88 changes: 50 additions & 38 deletions client/src/main/java/io/streamnative/oxia/client/batch/Batcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,19 @@
package io.streamnative.oxia.client.batch;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static lombok.AccessLevel.PACKAGE;

import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.batch.Operation.CloseOperation;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.session.SessionManager;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.function.Function;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;

@RequiredArgsConstructor(access = PACKAGE)
public class Batcher implements Runnable, AutoCloseable {
public class Batcher implements AutoCloseable {

private static final int DEFAULT_INITIAL_QUEUE_CAPACITY = 1_000;

Expand All @@ -40,12 +37,26 @@ public class Batcher implements Runnable, AutoCloseable {
@NonNull private final BatchFactory batchFactory;
@NonNull private final BlockingQueue<Operation<?>> operations;

private final Thread thread;

Batcher(@NonNull ClientConfig config, long shardId, @NonNull BatchFactory batchFactory) {
this(
config,
shardId,
batchFactory,
new PriorityBlockingQueue<>(DEFAULT_INITIAL_QUEUE_CAPACITY, Operation.PriorityComparator));
this(config, shardId, batchFactory, new ArrayBlockingQueue<>(DEFAULT_INITIAL_QUEUE_CAPACITY));
}

Batcher(
@NonNull ClientConfig config,
long shardId,
@NonNull BatchFactory batchFactory,
@NonNull BlockingQueue<Operation<?>> operations) {
this.config = config;
this.shardId = shardId;
this.batchFactory = batchFactory;
this.operations = operations;

this.thread =
new DefaultThreadFactory(String.format("batcher-shard-%d", shardId))
.newThread(this::batcherLoop);
this.thread.start();
}

@SneakyThrows
Expand All @@ -58,51 +69,47 @@ public <R> void add(@NonNull Operation<R> operation) {
}
}

@Override
public void run() {
public void batcherLoop() {
Batch batch = null;
long lingerBudgetNanos = -1L;
while (true) {
Operation<?> operation;

try {
Operation<?> operation;
if (batch == null) {
operation = operations.take();
} else {
operation = operations.poll(lingerBudgetNanos, NANOSECONDS);
long spentLingerBudgetNanos = Math.max(0, System.nanoTime() - batch.getStartTimeNanos());
lingerBudgetNanos = Math.max(0L, lingerBudgetNanos - spentLingerBudgetNanos);
}
} catch (InterruptedException e) {
// Exiting thread
return;
}

if (operation == CloseOperation.INSTANCE) {
break;
if (operation != null) {
if (batch == null) {
batch = batchFactory.getBatch(shardId);
lingerBudgetNanos = config.batchLinger().toNanos();
}

if (operation != null) {
if (batch == null) {
try {
if (!batch.canAdd(operation)) {
batch.send();
batch = batchFactory.getBatch(shardId);
lingerBudgetNanos = config.batchLinger().toNanos();
}
try {
if (!batch.canAdd(operation)) {
batch.send();
batch = batchFactory.getBatch(shardId);
lingerBudgetNanos = config.batchLinger().toNanos();
}
batch.add(operation);
} catch (Exception e) {
operation.fail(e);
}
batch.add(operation);
} catch (Exception e) {
operation.fail(e);
}
}

if (batch != null) {
if (batch.size() == config.maxRequestsPerBatch() || lingerBudgetNanos == 0) {
batch.send();
batch = null;
}
if (batch != null) {
if (batch.size() == config.maxRequestsPerBatch() || lingerBudgetNanos == 0) {
batch.send();
batch = null;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
Expand All @@ -129,6 +136,11 @@ public void run() {

@Override
public void close() {
operations.add(CloseOperation.INSTANCE);
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.streamnative.oxia.client.batch;

import static io.streamnative.oxia.client.api.Version.KeyNotExists;
import static io.streamnative.oxia.client.batch.Operation.CloseOperation;
import static io.streamnative.oxia.client.batch.Operation.ReadOperation;
import static io.streamnative.oxia.client.batch.Operation.ReadOperation.GetOperation;
import static io.streamnative.oxia.client.batch.Operation.WriteOperation;
Expand All @@ -40,25 +39,21 @@
import io.streamnative.oxia.proto.PutResponse;
import io.streamnative.oxia.proto.Status;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.NonNull;

public sealed interface Operation<R> permits CloseOperation, ReadOperation, WriteOperation {
public sealed interface Operation<R> permits ReadOperation, WriteOperation {

CompletableFuture<R> callback();

long sequence();

default void fail(Throwable t) {
callback().completeExceptionally(t);
}

sealed interface ReadOperation<R> extends Operation<R> permits GetOperation {
record GetOperation(
long sequence, @NonNull CompletableFuture<GetResult> callback, @NonNull String key)
record GetOperation(@NonNull CompletableFuture<GetResult> callback, @NonNull String key)
implements ReadOperation<GetResult> {
GetRequest toProto() {
return GetRequest.newBuilder().setKey(key).setIncludeValue(true).build();
Expand All @@ -77,7 +72,6 @@ void complete(@NonNull GetResponse response) {
sealed interface WriteOperation<R> extends Operation<R>
permits PutOperation, DeleteOperation, DeleteRangeOperation {
record PutOperation(
long sequence,
@NonNull CompletableFuture<PutResult> callback,
@NonNull String key,
byte @NonNull [] value,
Expand Down Expand Up @@ -147,7 +141,6 @@ record SessionInfo(long sessionId, @NonNull String clientIdentifier) {}
}

record DeleteOperation(
long sequence,
@NonNull CompletableFuture<Boolean> callback,
@NonNull String key,
@NonNull Optional<Long> expectedVersionId)
Expand Down Expand Up @@ -176,14 +169,12 @@ void complete(@NonNull DeleteResponse response) {
}
}

public DeleteOperation(
long sequence, @NonNull CompletableFuture<Boolean> callback, @NonNull String key) {
this(sequence, callback, key, Optional.empty());
public DeleteOperation(@NonNull CompletableFuture<Boolean> callback, @NonNull String key) {
this(callback, key, Optional.empty());
}
}

record DeleteRangeOperation(
long sequence,
@NonNull CompletableFuture<Void> callback,
@NonNull String startKeyInclusive,
@NonNull String endKeyExclusive)
Expand All @@ -204,29 +195,4 @@ void complete(@NonNull DeleteRangeResponse response) {
}
}
}

enum CloseOperation implements Operation<Void> {
INSTANCE {
@Override
public long sequence() {
return Long.MIN_VALUE;
}
};

@Override
public CompletableFuture<Void> callback() {
return null;
}
}

Comparator<Operation> PriorityComparator =
(o1, o2) -> {
if (o1 == CloseOperation.INSTANCE) {
return -1;
} else if (o2 == CloseOperation.INSTANCE) {
return +1;
} else {
return Long.compare(o1.sequence(), o2.sequence());
}
};
}
Loading

0 comments on commit bc4178c

Please sign in to comment.