Skip to content

Commit

Permalink
[#9371] Add agent load-balancer which periodically change collector s…
Browse files Browse the repository at this point in the history
…erver
  • Loading branch information
youngjin.kim2 authored and smilu97 committed Nov 16, 2022
1 parent cbbd42d commit 2813dde
Show file tree
Hide file tree
Showing 11 changed files with 678 additions and 9 deletions.
8 changes: 8 additions & 0 deletions agent/src/main/resources/pinpoint-root.config
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ profiler.transport.module=GRPC
###########################################################
profiler.transport.grpc.collector.ip=127.0.0.1

# subconnection expiring loadbalancer
profiler.transport.grpc.loadbalancer.renew.period.millis=3153600000000

# placeHolder support "${key}"
# Agent
profiler.transport.grpc.agent.collector.ip=${profiler.transport.grpc.collector.ip}
Expand All @@ -28,6 +31,7 @@ profiler.transport.grpc.agent.sender.channel.executor.queue.size=1000
profiler.transport.grpc.agent.sender.request.timeout.millis=6000
profiler.transport.grpc.agent.sender.keepalive.time.millis=30000
profiler.transport.grpc.agent.sender.keepalive.timeout.millis=60000
profiler.transport.grpc.agent.sender.loadbalancer=pick_first
profiler.transport.grpc.agent.sender.connect.timeout.millis=3000
profiler.transport.grpc.agent.sender.headers.size.max=8K
profiler.transport.grpc.agent.sender.message.inbound.size.max=4M
Expand All @@ -49,6 +53,7 @@ profiler.transport.grpc.metadata.sender.retry.max.count=3
profiler.transport.grpc.metadata.sender.retry.delay.millis=1000
profiler.transport.grpc.metadata.sender.keepalive.time.millis=30000
profiler.transport.grpc.metadata.sender.keepalive.timeout.millis=60000
profiler.transport.grpc.metadata.sender.loadbalancer=pick_first
profiler.transport.grpc.metadata.sender.connect.timeout.millis=3000
profiler.transport.grpc.metadata.sender.headers.size.max=8K
profiler.transport.grpc.metadata.sender.message.inbound.size.max=4M
Expand All @@ -68,6 +73,7 @@ profiler.transport.grpc.stat.sender.channel.executor.queue.size=1000
profiler.transport.grpc.stat.sender.request.timeout.millis=6000
profiler.transport.grpc.stat.sender.keepalive.time.millis=30000
profiler.transport.grpc.stat.sender.keepalive.timeout.millis=60000
profiler.transport.grpc.stat.sender.loadbalancer=pick_first
profiler.transport.grpc.stat.sender.connect.timeout.millis=3000
profiler.transport.grpc.stat.sender.headers.size.max=8K
profiler.transport.grpc.stat.sender.message.inbound.size.max=4M
Expand All @@ -87,6 +93,7 @@ profiler.transport.grpc.span.sender.channel.executor.queue.size=1000
profiler.transport.grpc.span.sender.request.timeout.millis=6000
profiler.transport.grpc.span.sender.keepalive.time.millis=30000
profiler.transport.grpc.span.sender.keepalive.timeout.millis=60000
profiler.transport.grpc.span.sender.loadbalancer=pick_first
profiler.transport.grpc.span.sender.connect.timeout.millis=3000
profiler.transport.grpc.span.sender.headers.size.max=8K
profiler.transport.grpc.span.sender.message.inbound.size.max=4M
Expand All @@ -97,6 +104,7 @@ profiler.transport.grpc.span.sender.discardpolicy.logger.discard.ratelimit=1
profiler.transport.grpc.span.sender.discardpolicy.maxpendingthreshold=1024
profiler.transport.grpc.span.sender.discardpolicy.discard-count-for-reconnect=1000
profiler.transport.grpc.span.sender.discardpolicy.not-ready-timeout-millis=300000
profiler.transport.grpc.span.sender.rpc.age.max.millis=3153600000000
## AUTO, NIO, EPOLL
profiler.transport.grpc.span.sender.channel-type=AUTO
profiler.transport.grpc.span.sender.maxtraceevent=8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import com.navercorp.pinpoint.grpc.client.config.ClientOption;
import com.navercorp.pinpoint.grpc.security.SslClientConfig;
import com.navercorp.pinpoint.grpc.security.SslContextFactory;

import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.NameResolverProvider;
import io.grpc.internal.GrpcUtil;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
Expand All @@ -38,8 +36,8 @@
import io.netty.channel.WriteBufferWaterMark;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.net.ssl.SSLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -142,7 +140,6 @@ public ManagedChannel build(String channelName, String host, int port) {
channelBuilder.eventLoopGroup(eventLoopGroup);

setupInternal(channelBuilder);
channelBuilder.defaultLoadBalancingPolicy(GrpcUtil.DEFAULT_LB_POLICY);

addHeader(channelBuilder);
addClientInterceptor(channelBuilder);
Expand Down Expand Up @@ -207,6 +204,7 @@ private void setupClientOption(final NettyChannelBuilder channelBuilder) {
channelBuilder.maxInboundMessageSize(clientOption.getMaxInboundMessageSize());
channelBuilder.flowControlWindow(clientOption.getFlowControlWindow());
channelBuilder.idleTimeout(clientOption.getIdleTimeoutMillis(), TimeUnit.MILLISECONDS);
channelBuilder.defaultLoadBalancingPolicy(clientOption.getDefaultLoadBalancer());

// ChannelOption
channelBuilder.withOption(ChannelOption.TCP_NODELAY, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.navercorp.pinpoint.bootstrap.module.JavaModule;
import com.navercorp.pinpoint.common.util.ByteSizeUnit;
import com.navercorp.pinpoint.grpc.ChannelTypeEnum;
import io.grpc.internal.GrpcUtil;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class ClientOption {
public static final int DEFAULT_LIMIT_COUNT = 100;
public static final int DEFAULT_LIMIT_TIME = 60 * 1000;

public static final String DEFAULT_LOAD_BALANCER = GrpcUtil.DEFAULT_LB_POLICY;

@Value("${keepalive.time.millis}")
private long keepAliveTime = DEFAULT_KEEPALIVE_TIME;
@Value("${keepalive.timeout.millis}")
Expand Down Expand Up @@ -76,6 +79,9 @@ public class ClientOption {
@Value("${limittime}")
private long limitTime;

@Value("${loadbalancer}")
private String defaultLoadBalancer = DEFAULT_LOAD_BALANCER;

public ClientOption() {
}

Expand Down Expand Up @@ -159,6 +165,10 @@ public long getLimitTime() {
return limitTime;
}

public String getDefaultLoadBalancer() {
return defaultLoadBalancer;
}

@Value("${headers.size.max}")
void setMaxHeaderListSize(String maxHeaderListSize) {
this.maxHeaderListSize = (int) ByteSizeUnit.getByteSize(maxHeaderListSize, DEFAULT_MAX_HEADER_LIST_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class GrpcTransportConfig {
private static final long DEFAULT_DISCARD_MAX_PENDING_THRESHOLD = 1024;
private static final long DEFAULT_DISCARD_COUNT_FOR_RECONNECT = 1000;
private static final long DEFAULT_NOT_READY_TIMEOUT_MILLIS = 5 * 60 * 1000;
private static final long DEFAULT_RPC_MAX_AGE_MILLIS = 3153600000000L; // Disabled
private static final long DEFAULT_RENEW_TRANSPORT_PERIOD_MILLIS = 3153600000000L; // Disabled

private static final int DEFAULT_METADATA_RETRY_MAX_COUNT = 3;
private static final int DEFAULT_METADATA_RETRY_DELAY_MILLIS = 1000;
Expand Down Expand Up @@ -139,6 +141,11 @@ public class GrpcTransportConfig {
private long spanDiscardCountForReconnect = DEFAULT_DISCARD_COUNT_FOR_RECONNECT;
@Value("${profiler.transport.grpc.span.sender.discardpolicy.not-ready-timeout-millis}")
private long spanNotReadyTimeoutMillis = DEFAULT_NOT_READY_TIMEOUT_MILLIS;
@Value("${profiler.transport.grpc.span.sender.rpc.age.max.millis}")
private long spanRpcMaxAgeMillis = DEFAULT_RPC_MAX_AGE_MILLIS;

@Value("${profiler.transport.grpc.loadbalancer.renew.period.millis}")
private long renewTransportPeriodMillis = DEFAULT_RENEW_TRANSPORT_PERIOD_MILLIS;

@Value("${" + KEY_PROFILER_CONFIG_NETTY_TRY_REFLECTION_SET_ACCESSIBLE + "}")
private boolean nettySystemPropertyTryReflectiveSetAccessible = DEFAULT_NETTY_SYSTEM_PROPERTY_TRY_REFLECTIVE_SET_ACCESSIBLE;
Expand Down Expand Up @@ -292,6 +299,12 @@ public long getSpanDiscardCountForReconnect() {
public long getSpanNotReadyTimeoutMillis() {
return spanNotReadyTimeoutMillis;
}
public long getSpanRpcMaxAgeMillis() {
return spanRpcMaxAgeMillis;
}
public long getRenewTransportPeriodMillis() {
return renewTransportPeriodMillis;
}

public long getAgentRequestTimeout() {
return agentRequestTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@
import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
import com.navercorp.pinpoint.profiler.sender.ResultResponse;
import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor;
import com.navercorp.pinpoint.profiler.sender.grpc.SubconnectionExpiringLoadBalancerProvider;
import com.navercorp.pinpoint.profiler.sender.grpc.metric.ChannelzScheduledReporter;
import com.navercorp.pinpoint.profiler.sender.grpc.metric.DefaultChannelzScheduledReporter;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolverProvider;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -90,6 +92,8 @@ protected void configure() {

bind(ScheduledExecutorService.class).toProvider(ReconnectSchedulerProvider.class).in(Scopes.SINGLETON);

registerGrpcProviders(grpcTransportConfig);

// not singleton
bind(ReconnectExecutor.class).toProvider(ReconnectExecutorProvider.class);

Expand All @@ -107,6 +111,10 @@ protected void configure() {
nettyPlatformDependent.setup();
}

private void registerGrpcProviders(GrpcTransportConfig grpcTransportConfig) {
LoadBalancerRegistry.getDefaultRegistry().register(new SubconnectionExpiringLoadBalancerProvider(grpcTransportConfig));
}

private void bindAgentDataSender() {
// Agent
TypeLiteral<MessageConverter<MetaDataType, GeneratedMessageV3>> metadataMessageConverter = new TypeLiteral<MessageConverter<MetaDataType, GeneratedMessageV3>>() {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import com.navercorp.pinpoint.profiler.sender.grpc.metric.DefaultChannelzReporter;
import io.grpc.ClientInterceptor;
import io.grpc.NameResolverProvider;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -106,7 +106,8 @@ public DataSender<SpanType> get() {

final SpanGrpcDataSender spanGrpcDataSender = new SpanGrpcDataSender(collectorIp, collectorPort,
senderExecutorQueueSize, messageConverter,
reconnectExecutor, channelFactory, failState);
reconnectExecutor, channelFactory, failState, grpcTransportConfig.getSpanRpcMaxAgeMillis());


registerChannelzReporter(spanGrpcDataSender);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.protobuf.Empty;
import com.google.protobuf.GeneratedMessageV3;
import java.util.Objects;
import com.navercorp.pinpoint.grpc.client.ChannelFactory;
import com.navercorp.pinpoint.grpc.trace.PSpan;
import com.navercorp.pinpoint.grpc.trace.PSpanChunk;
Expand All @@ -35,6 +34,11 @@
import io.grpc.ManagedChannel;
import io.grpc.stub.ClientCallStreamObserver;

import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.navercorp.pinpoint.grpc.MessageFormatUtils.debugLog;

/**
Expand All @@ -53,6 +57,10 @@ public class SpanGrpcDataSender extends GrpcDataSender<SpanType> {

private final ClientStreamingService<PSpanMessage, Empty> clientStreamService;

private final long maxRpcAgeMillis;
private final AtomicLong rpcExpiredAt;
private final Random random = new Random();

public final MessageDispatcher<SpanType, PSpanMessage> dispatcher = new MessageDispatcher<SpanType, PSpanMessage>() {
@Override
public void onDispatch(ClientCallStreamObserver<PSpanMessage> stream, SpanType data) {
Expand All @@ -64,12 +72,14 @@ public void onDispatch(ClientCallStreamObserver<PSpanMessage> stream, SpanType d
final PSpanChunk spanChunk = (PSpanChunk) message;
final PSpanMessage spanMessage = PSpanMessage.newBuilder().setSpanChunk(spanChunk).build();
stream.onNext(spanMessage);
attemptRenew();
return;
}
if (message instanceof PSpan) {
final PSpan pSpan = (PSpan) message;
final PSpanMessage spanMessage = PSpanMessage.newBuilder().setSpan(pSpan).build();
stream.onNext(spanMessage);
attemptRenew();
return;
}
throw new IllegalStateException("unsupported message " + data);
Expand All @@ -82,9 +92,13 @@ public SpanGrpcDataSender(String host, int port,
MessageConverter<SpanType, GeneratedMessageV3> messageConverter,
ReconnectExecutor reconnectExecutor,
ChannelFactory channelFactory,
StreamState failState) {
StreamState failState,
long maxRpcAgeMillis) {
super(host, port, executorQueueSize, messageConverter, channelFactory);

this.maxRpcAgeMillis = maxRpcAgeMillis;
this.rpcExpiredAt = new AtomicLong(System.currentTimeMillis() + jitter(maxRpcAgeMillis));

this.reconnectExecutor = Objects.requireNonNull(reconnectExecutor, "reconnectExecutor");
final Runnable reconnectJob = new NamedRunnable(this.id) {
@Override
Expand Down Expand Up @@ -113,6 +127,33 @@ public ClientCallStreamObserver<PSpanMessage> newStream(ResponseStreamObserver<P
reconnectJob.run();
}

private void attemptRenew() {
if (maxRpcAgeMillis >= TimeUnit.DAYS.toMillis(365)) {
return;
}

final long rpcExpiredAtValue = rpcExpiredAt.get();
final long now = System.currentTimeMillis();
if (now > rpcExpiredAtValue) {
final long nextRpcExpiredAt = now + jitter(maxRpcAgeMillis);
if (rpcExpiredAt.compareAndSet(rpcExpiredAtValue, nextRpcExpiredAt)) {
renewStream();
}
}
}

private long jitter(long x) {
final double m = 0.8 + random.nextDouble() * 0.4;
return (long) (m * (double) x);
}

private void renewStream() {
if (this.currentStreamTask != null) {
logger.info("Aborting Span RPC to renew");
this.currentStreamTask.stop();
}
}

private void startStream() {
try {
StreamTask<SpanType, PSpanMessage> streamTask = new DefaultStreamTask<>(id, clientStreamService,
Expand Down
Loading

0 comments on commit 2813dde

Please sign in to comment.