Skip to content

Commit

Permalink
[#9371] Periodically renew SendSpan with transport
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjin.kim2 committed Nov 4, 2022
1 parent f2cbad7 commit 452eb76
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 3 deletions.
4 changes: 4 additions & 0 deletions agent/src/main/resources/pinpoint-root.config
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ profiler.transport.grpc.agent.sender.channel.executor.queue.size=1000
profiler.transport.grpc.agent.sender.request.timeout.millis=6000
profiler.transport.grpc.agent.sender.keepalive.time.millis=30000
profiler.transport.grpc.agent.sender.keepalive.timeout.millis=60000
profiler.transport.grpc.agent.sender.loadbalancer=pick_first
profiler.transport.grpc.agent.sender.connect.timeout.millis=3000
profiler.transport.grpc.agent.sender.headers.size.max=8K
profiler.transport.grpc.agent.sender.message.inbound.size.max=4M
Expand All @@ -49,6 +50,7 @@ profiler.transport.grpc.metadata.sender.retry.max.count=3
profiler.transport.grpc.metadata.sender.retry.delay.millis=1000
profiler.transport.grpc.metadata.sender.keepalive.time.millis=30000
profiler.transport.grpc.metadata.sender.keepalive.timeout.millis=60000
profiler.transport.grpc.metadata.sender.loadbalancer=pick_first
profiler.transport.grpc.metadata.sender.connect.timeout.millis=3000
profiler.transport.grpc.metadata.sender.headers.size.max=8K
profiler.transport.grpc.metadata.sender.message.inbound.size.max=4M
Expand All @@ -68,6 +70,7 @@ profiler.transport.grpc.stat.sender.channel.executor.queue.size=1000
profiler.transport.grpc.stat.sender.request.timeout.millis=6000
profiler.transport.grpc.stat.sender.keepalive.time.millis=30000
profiler.transport.grpc.stat.sender.keepalive.timeout.millis=60000
profiler.transport.grpc.stat.sender.loadbalancer=pick_first
profiler.transport.grpc.stat.sender.connect.timeout.millis=3000
profiler.transport.grpc.stat.sender.headers.size.max=8K
profiler.transport.grpc.stat.sender.message.inbound.size.max=4M
Expand All @@ -87,6 +90,7 @@ profiler.transport.grpc.span.sender.channel.executor.queue.size=1000
profiler.transport.grpc.span.sender.request.timeout.millis=6000
profiler.transport.grpc.span.sender.keepalive.time.millis=30000
profiler.transport.grpc.span.sender.keepalive.timeout.millis=60000
profiler.transport.grpc.span.sender.loadbalancer=subconnection_expiring_pick_first
profiler.transport.grpc.span.sender.connect.timeout.millis=3000
profiler.transport.grpc.span.sender.headers.size.max=8K
profiler.transport.grpc.span.sender.message.inbound.size.max=4M
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.NameResolverProvider;
import io.grpc.internal.GrpcUtil;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
Expand Down Expand Up @@ -142,7 +141,6 @@ public ManagedChannel build(String channelName, String host, int port) {
channelBuilder.eventLoopGroup(eventLoopGroup);

setupInternal(channelBuilder);
channelBuilder.defaultLoadBalancingPolicy(GrpcUtil.DEFAULT_LB_POLICY);

addHeader(channelBuilder);
addClientInterceptor(channelBuilder);
Expand Down Expand Up @@ -207,6 +205,7 @@ private void setupClientOption(final NettyChannelBuilder channelBuilder) {
channelBuilder.maxInboundMessageSize(clientOption.getMaxInboundMessageSize());
channelBuilder.flowControlWindow(clientOption.getFlowControlWindow());
channelBuilder.idleTimeout(clientOption.getIdleTimeoutMillis(), TimeUnit.MILLISECONDS);
channelBuilder.defaultLoadBalancingPolicy(clientOption.getDefaultLoadBalancer());

// ChannelOption
channelBuilder.withOption(ChannelOption.TCP_NODELAY, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.navercorp.pinpoint.bootstrap.module.JavaModule;
import com.navercorp.pinpoint.common.util.ByteSizeUnit;
import com.navercorp.pinpoint.grpc.ChannelTypeEnum;
import io.grpc.internal.GrpcUtil;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class ClientOption {
public static final int DEFAULT_LIMIT_COUNT = 100;
public static final int DEFAULT_LIMIT_TIME = 60 * 1000;

public static final String DEFAULT_LOAD_BALANCER = GrpcUtil.DEFAULT_LB_POLICY;

@Value("${keepalive.time.millis}")
private long keepAliveTime = DEFAULT_KEEPALIVE_TIME;
@Value("${keepalive.timeout.millis}")
Expand Down Expand Up @@ -76,6 +79,9 @@ public class ClientOption {
@Value("${limittime}")
private long limitTime;

@Value("${loadbalancer}")
private String defaultLoadBalancer = DEFAULT_LOAD_BALANCER;

public ClientOption() {
}

Expand Down Expand Up @@ -159,6 +165,10 @@ public long getLimitTime() {
return limitTime;
}

public String getDefaultLoadBalancer() {
return defaultLoadBalancer;
}

@Value("${headers.size.max}")
void setMaxHeaderListSize(String maxHeaderListSize) {
this.maxHeaderListSize = (int) ByteSizeUnit.getByteSize(maxHeaderListSize, DEFAULT_MAX_HEADER_LIST_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor;
import com.navercorp.pinpoint.profiler.sender.grpc.metric.ChannelzScheduledReporter;
import com.navercorp.pinpoint.profiler.sender.grpc.metric.DefaultChannelzScheduledReporter;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolverProvider;
import com.navercorp.pinpoint.profiler.grpc.SubconnectionExpiringLoadBalancerProvider;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

Expand Down Expand Up @@ -90,6 +92,8 @@ protected void configure() {

bind(ScheduledExecutorService.class).toProvider(ReconnectSchedulerProvider.class).in(Scopes.SINGLETON);

LoadBalancerRegistry.getDefaultRegistry().register(new SubconnectionExpiringLoadBalancerProvider());

// not singleton
bind(ReconnectExecutor.class).toProvider(ReconnectExecutorProvider.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.profiler.grpc;

import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Status;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static io.grpc.ConnectivityState.*;

/**
* @author youngjin.kim2
*/
public class SubconnectionExpiringLoadBalancer extends LoadBalancer {
private static final long subchannelMaxAge = TimeUnit.MINUTES.toNanos(5);

private final Helper helper;
private List<EquivalentAddressGroup> currentAddresses;
private Subchannel readySubchannel;
private Subchannel failureSubchannel;
private Subchannel connectingSubchannel;

private Status failureStatus;

private boolean initialized = false;

SubconnectionExpiringLoadBalancer(Helper helper) {
this.helper = helper;
}

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
final List<EquivalentAddressGroup> addresses = resolvedAddresses.getAddresses();
this.updateAddresses(addresses);
this.currentAddresses = addresses;

if (!initialized) {
initialized = true;
this.createSubchannel();
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
}
}

private void updateAddresses(List<EquivalentAddressGroup> addresses) {
if (this.readySubchannel != null) {
this.readySubchannel.updateAddresses(addresses);
}
if (this.failureSubchannel != null) {
this.failureSubchannel.updateAddresses(addresses);
}
if (this.connectingSubchannel != null) {
this.connectingSubchannel.updateAddresses(addresses);
}
}

private void createSubchannel() {
Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(this.currentAddresses)
.setAttributes(
Attributes.newBuilder()
.set(ATTR_PICK_PROGRESS, new AtomicReference<>(PickProgress.NOT_PICKED_YET))
.set(ATTR_CREATED_AT, System.nanoTime())
.build()
)
.build()
);

subchannel.start(stateInfo -> {
final ConnectivityState state = stateInfo.getState();
if (state == SHUTDOWN) {
return;
}
if (state == TRANSIENT_FAILURE || state == IDLE) {
helper.refreshNameResolution();
}

moveTo(subchannel, stateInfo);
updateBalancingState();
});

subchannel.requestConnection();
}

private void moveTo(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
if (this.readySubchannel == subchannel) {
this.readySubchannel = null;
}
if (this.failureSubchannel == subchannel) {
this.failureSubchannel = null;
}
if (this.connectingSubchannel == subchannel) {
this.connectingSubchannel = null;
}

final ConnectivityState position = stateInfo.getState();

if (position == READY) {
if (this.readySubchannel != null) {
this.readySubchannel.shutdown();
}
this.readySubchannel = subchannel;
} else if (position == TRANSIENT_FAILURE) {
if (this.failureSubchannel != null) {
subchannel.shutdown();
} else {
this.failureSubchannel = subchannel;
this.failureStatus = stateInfo.getStatus();
}
} else if (position == CONNECTING) {
if (this.connectingSubchannel != null) {
subchannel.shutdown();
} else {
this.connectingSubchannel = subchannel;
}
} else if (position == IDLE) {
subchannel.requestConnection();
}
}

private void updateBalancingState() {
if (this.readySubchannel != null) {
helper.updateBalancingState(READY, new Picker(PickResult.withSubchannel(this.readySubchannel), args -> requestSuccessor(this.readySubchannel)));
return;
}

if (this.connectingSubchannel != null) {
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
return;
}

if (this.failureSubchannel != null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(failureStatus)));
return;
}

helper.updateBalancingState(IDLE, new Picker(PickResult.withNoResult()));
}

private void requestSuccessor(Subchannel subchannel) {
final long createdAt = getCreatedAt(subchannel);
if (createdAt < System.nanoTime() - subchannelMaxAge) {
final AtomicReference<PickProgress> progress = getPickProgress(subchannel);
if (progress.compareAndSet(PickProgress.NOT_PICKED_YET, PickProgress.PICKED)) {
helper.getSynchronizationContext().execute(this::createSubchannel);
}
}
}

@Override
public void requestConnection() {
if (this.readySubchannel != null) {
return;
}
this.createSubchannel();
}

@Override
public void handleNameResolutionError(Status error) {
clear();
updateBalancingState();
}

@Override
public void shutdown() {
clear();
}

private void clear() {
if (readySubchannel != null) {
readySubchannel.shutdown();
readySubchannel = null;
}
if (connectingSubchannel != null) {
connectingSubchannel.shutdown();
connectingSubchannel = null;
}
if (failureSubchannel != null) {
failureSubchannel.shutdown();
failureSubchannel = null;
}
}

private static final class Picker extends SubchannelPicker {
private final PickResult result;
private final Consumer<PickSubchannelArgs> beforePick;

Picker(PickResult result) {
this(result, null);
}

Picker(PickResult result, Consumer<PickSubchannelArgs> beforePick) {
this.result = result;
this.beforePick = beforePick;
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
if (beforePick != null) {
beforePick.accept(args);
}
return result;
}
}

private enum PickProgress {
NOT_PICKED_YET,
PICKED,
}

static final Attributes.Key<AtomicReference<PickProgress>> ATTR_PICK_PROGRESS =
Attributes.Key.create("pick_progress");
static final Attributes.Key<Long> ATTR_CREATED_AT =
Attributes.Key.create("created_at");

private AtomicReference<PickProgress> getPickProgress(Subchannel subchannel) {
return Objects.requireNonNull(subchannel.getAttributes().get(ATTR_PICK_PROGRESS));
}

private long getCreatedAt(Subchannel subchannel) {
final Long createdAt = subchannel.getAttributes().get(ATTR_CREATED_AT);
if (createdAt == null) {
return 0L;
}
return createdAt;
}
}
Loading

0 comments on commit 452eb76

Please sign in to comment.