Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid the need of dedicated thread to fetch shards assignments #151

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {

var instrumentProvider = new InstrumentProvider(config.openTelemetry(), config.namespace());
var serviceAddrStub = stubManager.getStub(config.serviceAddress());
var shardManager = new ShardManager(serviceAddrStub, instrumentProvider, config.namespace());
var shardManager =
new ShardManager(executor, serviceAddrStub, instrumentProvider, config.namespace());
var notificationManager =
new NotificationManager(executor, stubManager, shardManager, instrumentProvider);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,52 +25,54 @@
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.common.Attributes;
import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.grpc.CustomStatusCode;
import io.streamnative.oxia.client.grpc.GrpcResponseStream;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.Counter;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.util.Backoff;
import io.streamnative.oxia.proto.ShardAssignments;
import io.streamnative.oxia.proto.ShardAssignmentsRequest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

@Slf4j
public class ShardManager extends GrpcResponseStream implements AutoCloseable {
public class ShardManager implements AutoCloseable, StreamObserver<ShardAssignments> {
private final ScheduledExecutorService executor;
private final OxiaStub stub;
private final @NonNull ShardAssignmentsContainer assignments;
private final @NonNull CompositeConsumer<ShardAssignmentChanges> callbacks;

private final Counter shardAssignmentsEvents;

private final Scheduler scheduler;
private final Backoff backoff = new Backoff();
private volatile boolean closed;

private final CompletableFuture<Void> initialAssignmentsFuture = new CompletableFuture<>();

@VisibleForTesting
ShardManager(
@NonNull ScheduledExecutorService executor,
@NonNull OxiaStub stub,
@NonNull ShardAssignmentsContainer assignments,
@NonNull CompositeConsumer<ShardAssignmentChanges> callbacks,
@NonNull InstrumentProvider instrumentProvider) {
super(stub);
this.stub = stub;
this.executor = executor;
this.assignments = assignments;
this.callbacks = callbacks;
this.scheduler = Schedulers.newSingle("shard-assignments");

this.shardAssignmentsEvents =
instrumentProvider.newCounter(
Expand All @@ -81,10 +83,12 @@ public class ShardManager extends GrpcResponseStream implements AutoCloseable {
}

public ShardManager(
ScheduledExecutorService executor,
@NonNull OxiaStub stub,
@NonNull InstrumentProvider instrumentProvider,
@NonNull String namespace) {
this(
executor,
stub,
new ShardAssignmentsContainer(Xxh332HashRangeShardStrategy, namespace),
new CompositeConsumer<>(),
Expand All @@ -93,45 +97,32 @@ public ShardManager(

@Override
public void close() {
super.close();
scheduler.dispose();
closed = true;
}

public CompletableFuture<Void> start() {
var req = ShardAssignmentsRequest.newBuilder().setNamespace(assignments.getNamespace()).build();

stub.async().getShardAssignments(req, this);
return initialAssignmentsFuture;
}

@Override
protected CompletableFuture<Void> start(OxiaStub stub, Consumer<Disposable> consumer) {
RetryBackoffSpec retrySpec =
Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.filter(this::isErrorRetryable)
.doBeforeRetry(signal -> log.warn("Retrying receiving shard assignments: {}", signal));
var assignmentsFlux =
Flux.defer(
() ->
stub.reactor()
.getShardAssignments(
ShardAssignmentsRequest.newBuilder()
.setNamespace(assignments.getNamespace())
.build()))
.doOnError(this::processError)
.retryWhen(retrySpec)
.repeat()
.publishOn(scheduler)
.doOnNext(this::updateAssignments)
.doOnEach(x -> shardAssignmentsEvents.increment())
.publish();
// Complete after the first response has been processed
var future = Mono.from(assignmentsFlux).then().toFuture();
var disposable = assignmentsFlux.connect();
consumer.accept(disposable);
return future;
}

/**
* Process errors of repeated flux, it will auto suppress unknown status #{@link
* StatusRuntimeException}.
*
* @param error Error from flux
*/
private void processError(@NonNull Throwable error) {
public void onNext(ShardAssignments assignments) {
shardAssignmentsEvents.increment();
updateAssignments(assignments);
backoff.reset();
if (!initialAssignmentsFuture.isDone()) {
initialAssignmentsFuture.complete(null);
}
}

@Override
public void onError(Throwable error) {
if (closed) {
return;
}

if (error instanceof StatusRuntimeException statusError) {
var status = statusError.getStatus();
if (status.getCode() == Status.Code.UNKNOWN) {
Expand All @@ -140,14 +131,49 @@ private void processError(@NonNull Throwable error) {
if (description != null) {
var customStatusCode = CustomStatusCode.fromDescription(description);
if (customStatusCode == CustomStatusCode.ErrorNamespaceNotFound) {
final var ex = new NamespaceNotFoundException(assignments.getNamespace());
log.error("Failed receiving shard assignments", ex);
throw ex;
log.error("Namespace not found: {}", assignments.getNamespace());
if (!initialAssignmentsFuture.isDone()) {
if (initialAssignmentsFuture.completeExceptionally(
new NamespaceNotFoundException(assignments.getNamespace()))) {
close();
}
}
}
}
}
}
log.warn("Failed receiving shard assignments", error);
log.warn("Failed receiving shard assignments: {}", error.getMessage());
executor.schedule(
() -> {
if (!closed) {
log.info(
"Retry creating stream for shard assignments namespace={}",
assignments.getNamespace());
start();
}
},
backoff.nextDelayMillis(),
TimeUnit.MILLISECONDS);
}

@Override
public void onCompleted() {
if (closed) {
return;
}

log.warn("Stream closed while receiving shard assignments");
executor.schedule(
() -> {
if (!closed) {
log.info(
"Retry creating stream for shard assignments after stream closed namespace={}",
assignments.getNamespace());
start();
}
},
backoff.nextDelayMillis(),
TimeUnit.MILLISECONDS);
}

private void updateAssignments(io.streamnative.oxia.proto.ShardAssignments shardAssignments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -70,8 +72,11 @@ public Flux<ShardAssignments> getShardAssignments(Mono<ShardAssignmentsRequest>

@Mock OxiaStub stub;

ScheduledExecutorService executor;

@BeforeEach
void beforeEach() throws Exception {
executor = Executors.newSingleThreadScheduledExecutor();
requests.set(0);
responses.clear();
server =
Expand All @@ -88,6 +93,7 @@ void beforeEach() throws Exception {
void afterEach() throws Exception {
stub.close();
server.shutdownNow();
executor.shutdownNow();
}

@Test
Expand All @@ -99,7 +105,8 @@ void start() {
NamespaceShardsAssignment.newBuilder().addAssignments(assignment(0, 0, 3)).build())
.build();
responses.offer(Flux.just(assignments).concatWith(Flux.never()));
try (var shardManager = new ShardManager(stub, InstrumentProvider.NOOP, DefaultNamespace)) {
try (var shardManager =
new ShardManager(executor, stub, InstrumentProvider.NOOP, DefaultNamespace)) {
assertThat(shardManager.start()).succeedsWithin(Duration.ofSeconds(1));
assertThat(shardManager.allShardIds()).containsExactlyInAnyOrder(0L);
assertThat(shardManager.leader(0)).isEqualTo("leader0");
Expand All @@ -109,7 +116,8 @@ void start() {
@Test
void neverStarts() {
responses.offer(Flux.never());
try (var shardManager = new ShardManager(stub, InstrumentProvider.NOOP, DefaultNamespace)) {
try (var shardManager =
new ShardManager(executor, stub, InstrumentProvider.NOOP, DefaultNamespace)) {
assertThatThrownBy(() -> shardManager.start().get(1, SECONDS))
.isInstanceOf(TimeoutException.class);
assertThat(shardManager.allShardIds()).isEmpty();
Expand All @@ -134,7 +142,8 @@ void update() {
.build())
.build();
responses.offer(Flux.just(assignments0, assignments1).concatWith(Flux.never()));
try (var shardManager = new ShardManager(stub, InstrumentProvider.NOOP, DefaultNamespace)) {
try (var shardManager =
new ShardManager(executor, stub, InstrumentProvider.NOOP, DefaultNamespace)) {
shardManager.start().join();
await()
.untilAsserted(
Expand All @@ -156,7 +165,8 @@ public void recoveryFromError() {
NamespaceShardsAssignment.newBuilder().addAssignments(assignment(0, 0, 3)).build())
.build();
responses.offer(Flux.just(assignments).concatWith(Flux.never()));
try (var shardManager = new ShardManager(stub, InstrumentProvider.NOOP, DefaultNamespace)) {
try (var shardManager =
new ShardManager(executor, stub, InstrumentProvider.NOOP, DefaultNamespace)) {
assertThat(shardManager.start()).succeedsWithin(Duration.ofSeconds(1));
assertThat(shardManager.allShardIds()).containsExactlyInAnyOrder(0L);
assertThat(shardManager.leader(0)).isEqualTo("leader0");
Expand All @@ -174,7 +184,8 @@ public void recoveryFromEndOfStream() {
NamespaceShardsAssignment.newBuilder().addAssignments(assignment(0, 0, 3)).build())
.build();
responses.offer(Flux.just(assignments).concatWith(Flux.never()));
try (var shardManager = new ShardManager(stub, InstrumentProvider.NOOP, DefaultNamespace)) {
try (var shardManager =
new ShardManager(executor, stub, InstrumentProvider.NOOP, DefaultNamespace)) {
assertThat(shardManager.start()).succeedsWithin(Duration.ofSeconds(1));
assertThat(shardManager.allShardIds()).containsExactlyInAnyOrder(0L);
assertThat(shardManager.leader(0)).isEqualTo("leader0");
Expand Down
Loading
Loading