Skip to content

Commit

Permalink
feat(stub): support multiple connection per node (#190)
Browse files Browse the repository at this point in the history
* feat(stub): support multiple connection per node

* fix license check spotless

* fix test

* fix spotless

* remove useless file
  • Loading branch information
mattisonchao authored Nov 14, 2024
1 parent 83cf721 commit 1436335
Show file tree
Hide file tree
Showing 21 changed files with 157 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ static OxiaClientBuilder create(String serviceAddress) {

OxiaClientBuilder connectionBackoff(Duration minDelay, Duration maxDelay);

OxiaClientBuilder maxConnectionPerNode(int connections);

/**
* Configure the authentication plugin and its parameters.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -73,6 +74,7 @@ static void beforeAll() {

client =
OxiaClientBuilder.create(oxia.getServiceAddress())
.maxConnectionPerNode(ThreadLocalRandom.current().nextInt(10) + 1)
.openTelemetry(openTelemetry)
.asyncClient()
.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.streamnative.oxia.client.shard.NamespaceNotFoundException;
import io.streamnative.oxia.testcontainers.OxiaContainer;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -41,6 +42,7 @@ public void testWrongNamespace() {
try {
OxiaClientBuilder.create(oxia.getServiceAddress())
.namespace("my-ns-does-not-exist")
.maxConnectionPerNode(ThreadLocalRandom.current().nextInt(10) + 1)
.asyncClient()
.join();
Assertions.fail("Unexpected behaviour!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ static void beforeAll() {
client =
OxiaClientBuilder.create(oxia.getServiceAddress())
.openTelemetry(openTelemetry)
.maxConnectionPerNode(10)
.asyncClient()
.join();
client.notifications(notifications::add);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
config.connectionBackoffMinDelay(), config.connectionBackoffMaxDelay());
var stubManager =
new OxiaStubManager(
config.namespace(), config.authentication(), config.enableTls(), oxiaBackoffProvider);
config.authentication(),
config.enableTls(),
oxiaBackoffProvider,
config.maxConnectionPerNode());

var instrumentProvider = new InstrumentProvider(config.openTelemetry(), config.namespace());
var serviceAddrStub = stubManager.getStub(config.serviceAddress());
Expand All @@ -89,7 +92,8 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
var notificationManager =
new NotificationManager(executor, stubManager, shardManager, instrumentProvider);

OxiaStubProvider stubProvider = new OxiaStubProvider(stubManager, shardManager);
OxiaStubProvider stubProvider =
new OxiaStubProvider(config.namespace(), stubManager, shardManager);

shardManager.addCallback(notificationManager);
var readBatchManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ public record ClientConfig(
@Nullable Authentication authentication,
boolean enableTls,
@NonNull Duration connectionBackoffMinDelay,
@NonNull Duration connectionBackoffMaxDelay) {}
@NonNull Duration connectionBackoffMaxDelay,
int maxConnectionPerNode) {}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class OxiaClientBuilderImpl implements OxiaClientBuilder {
public static final Duration DefaultSessionTimeout = Duration.ofSeconds(15);
public static final String DefaultNamespace = "default";
public static final boolean DefaultEnableTls = false;
public static final int DefaultMaxConnectionPerNode = 1;

@NonNull protected final String serviceAddress;
@NonNull protected Duration requestTimeout = DefaultRequestTimeout;
Expand All @@ -70,6 +71,8 @@ public class OxiaClientBuilderImpl implements OxiaClientBuilder {
@NonNull protected Duration connectionBackoffMinDelay = Duration.ofMillis(100);
@NonNull protected Duration connectionBackoffMaxDelay = Duration.ofSeconds(30);

protected int maxConnectionsPerNode = DefaultMaxConnectionPerNode;

@Override
public @NonNull OxiaClientBuilder requestTimeout(@NonNull Duration requestTimeout) {
if (requestTimeout.isNegative() || requestTimeout.equals(ZERO)) {
Expand Down Expand Up @@ -151,6 +154,16 @@ public OxiaClientBuilder connectionBackoff(Duration minDelay, Duration maxDelay)
return this;
}

@Override
public OxiaClientBuilder maxConnectionPerNode(int connections) {
if (connections <= 0) {
throw new IllegalArgumentException(
"maxConnectionPerNode must be greater than zero: " + connections);
}
this.maxConnectionsPerNode = connections;
return this;
}

@Override
public OxiaClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
Expand Down Expand Up @@ -239,7 +252,8 @@ public OxiaClientBuilder loadConfig(Properties properties) {
authentication,
enableTls,
connectionBackoffMinDelay,
connectionBackoffMaxDelay);
connectionBackoffMaxDelay,
maxConnectionsPerNode);
return AsyncOxiaClientImpl.newInstance(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.grpc.OxiaStubProvider;
import io.streamnative.oxia.client.grpc.WriteStreamWrapper;
import lombok.Getter;
import lombok.NonNull;

Expand All @@ -34,4 +35,8 @@ abstract class BatchBase {
protected OxiaStub getStub() {
return stubProvider.getStubForShard(shardId);
}

protected WriteStreamWrapper getWriteStream() {
return stubProvider.getWriteStreamForShard(shardId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public int size() {
public void send() {
startSendTimeNanos = System.nanoTime();
try {
getStub()
.writeStream(getShardId())
getWriteStream()
.send(toProto())
.thenAccept(
response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.MetadataUtils;
import io.streamnative.oxia.client.api.Authentication;
import io.streamnative.oxia.client.batch.WriteStreamWrapper;
import io.streamnative.oxia.proto.OxiaClientGrpc;

import java.lang.reflect.Field;
Expand All @@ -42,13 +41,10 @@
@Slf4j
public class OxiaStub implements AutoCloseable {
private final ManagedChannel channel;
private final String namespace;
private final @NonNull OxiaClientGrpc.OxiaClientStub asyncStub;
private final Map<Long, WriteStreamWrapper> writeStreams = new ConcurrentHashMap<>();

public OxiaStub(
String address,
String namespace,
@Nullable Authentication authentication,
boolean enableTls,
@Nullable BackoffPolicy.Provider backoffProvider) {
Expand All @@ -59,16 +55,14 @@ public OxiaStub(
: InsecureChannelCredentials.create())
.directExecutor()
.build(),
namespace,
authentication, backoffProvider);
}

public OxiaStub(ManagedChannel channel, String namespace) {
this(channel, namespace, null, OxiaBackoffProvider.DEFAULT);
public OxiaStub(ManagedChannel channel) {
this(channel, null, OxiaBackoffProvider.DEFAULT);
}

public OxiaStub(ManagedChannel channel, String namespace,
@Nullable final Authentication authentication,
public OxiaStub(ManagedChannel channel, @Nullable final Authentication authentication,
@Nullable BackoffPolicy.Provider oxiaBackoffPolicyProvider) {
/*
The GRPC default backoff is from 2s to 120s, which is very long for time sensitive usage.
Expand All @@ -79,7 +73,6 @@ public OxiaStub(ManagedChannel channel, String namespace,
if (oxiaBackoffPolicyProvider != null) {
configureBackoffPolicyIfPossible(channel, oxiaBackoffPolicyProvider);
}
this.namespace = namespace;
this.channel = channel;
if (authentication != null) {
this.asyncStub =
Expand Down Expand Up @@ -123,28 +116,6 @@ public OxiaClientGrpc.OxiaClientStub async() {
return asyncStub;
}

private static final Metadata.Key<String> NAMESPACE_KEY =
Metadata.Key.of("namespace", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> SHARD_ID_KEY =
Metadata.Key.of("shard-id", Metadata.ASCII_STRING_MARSHALLER);

public WriteStreamWrapper writeStream(long streamId) {
return writeStreams.compute(
streamId,
(key, stream) -> {
if (stream == null || !stream.isValid()) {
Metadata headers = new Metadata();
headers.put(NAMESPACE_KEY, namespace);
headers.put(SHARD_ID_KEY, String.format("%d", streamId));

OxiaClientGrpc.OxiaClientStub stub =
asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
return new WriteStreamWrapper(stub);
}
return stream;
});
}

@Override
public void close() throws Exception {
channel.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,43 @@
*/
package io.streamnative.oxia.client.grpc;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.internal.BackoffPolicy;
import io.streamnative.oxia.client.api.Authentication;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

public class OxiaStubManager implements AutoCloseable {
private final Map<String, OxiaStub> stubs = new ConcurrentHashMap<>();
@VisibleForTesting final Map<Key, OxiaStub> stubs = new ConcurrentHashMap<>();

private final String namespace;
@Nullable private final Authentication authentication;
private final boolean enableTls;
@Nullable private final BackoffPolicy.Provider backoffProvider;

private final int maxConnectionPerNode;

public OxiaStubManager(
String namespace,
@Nullable Authentication authentication,
boolean enableTls,
@Nullable BackoffPolicy.Provider backoffProvider) {
this.namespace = namespace;
@Nullable BackoffPolicy.Provider backoffProvider,
int maxConnectionPerNode) {
this.authentication = authentication;
this.enableTls = enableTls;
this.backoffProvider = backoffProvider;
this.maxConnectionPerNode = maxConnectionPerNode;
}

public OxiaStub getStub(String address) {
final var random = ThreadLocalRandom.current().nextInt();
var modKey = random % maxConnectionPerNode;
if (modKey < 0) {
modKey += maxConnectionPerNode;
}
return stubs.computeIfAbsent(
address, addr -> new OxiaStub(addr, namespace, authentication, enableTls, backoffProvider));
new Key(address, modKey),
key -> new OxiaStub(key.address, authentication, enableTls, backoffProvider));
}

@Override
Expand All @@ -51,4 +60,6 @@ public void close() throws Exception {
stub.close();
}
}

record Key(String address, int random) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,28 @@
package io.streamnative.oxia.client.grpc;

import io.streamnative.oxia.client.shard.ShardManager;
import lombok.Getter;

public class OxiaStubProvider {

@Getter private final String namespace;
private final OxiaStubManager stubManager;
private final ShardManager shardManager;
private final OxiaWriteStreamManager writeStreamManager;

public OxiaStubProvider(OxiaStubManager stubManager, ShardManager shardManager) {
public OxiaStubProvider(
String namespace, OxiaStubManager stubManager, ShardManager shardManager) {
this.namespace = namespace;
this.stubManager = stubManager;
this.shardManager = shardManager;
this.writeStreamManager = new OxiaWriteStreamManager(this);
}

public OxiaStub getStubForShard(long shardId) {
String leader = shardManager.leader(shardId);
return stubManager.getStub(leader);
}

public WriteStreamWrapper getWriteStreamForShard(long shardId) {
return writeStreamManager.getWriteStream(shardId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.grpc;

import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public final class OxiaWriteStreamManager {
private final Map<Long, WriteStreamWrapper> writeStreams = new ConcurrentHashMap<>();
private final OxiaStubProvider provider;

public OxiaWriteStreamManager(OxiaStubProvider provider) {
this.provider = provider;
}

private static final Metadata.Key<String> NAMESPACE_KEY =
Metadata.Key.of("namespace", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> SHARD_ID_KEY =
Metadata.Key.of("shard-id", Metadata.ASCII_STRING_MARSHALLER);

public WriteStreamWrapper getWriteStream(long shardId) {
return writeStreams.compute(
shardId,
(key, stream) -> {
if (stream == null || !stream.isValid()) {
Metadata headers = new Metadata();
headers.put(NAMESPACE_KEY, provider.getNamespace());
headers.put(SHARD_ID_KEY, String.format("%d", shardId));
final var asyncStub = provider.getStubForShard(shardId).async();
return new WriteStreamWrapper(
asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers)));
}
return stream;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.streamnative.oxia.client.batch;
package io.streamnative.oxia.client.grpc;

import io.grpc.stub.StreamObserver;
import io.streamnative.oxia.proto.OxiaClientGrpc;
Expand Down
Loading

0 comments on commit 1436335

Please sign in to comment.