Skip to content

Commit

Permalink
[pinpoint-apm#9323] GRPC server distinguishes critical methods
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjin.kim2 committed Oct 25, 2022
1 parent 0db9607 commit 0dc1459
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,17 @@
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import com.navercorp.pinpoint.grpc.security.SslServerConfig;
import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.ServerFactory;
import com.navercorp.pinpoint.grpc.server.ServerOption;
import com.navercorp.pinpoint.grpc.server.TransportMetadataFactory;
import com.navercorp.pinpoint.grpc.server.TransportMetadataServerInterceptor;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import org.apache.logging.log4j.Logger;
import com.navercorp.pinpoint.grpc.server.*;
import io.grpc.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.NestedExceptionUtils;

import javax.annotation.Nonnull;
import java.io.Closeable;
import java.net.BindException;
import java.util.ArrayList;
Expand Down Expand Up @@ -68,6 +61,7 @@ public class GrpcReceiver implements InitializingBean, DisposableBean, BeanNameA

private List<ServerInterceptor> serverInterceptorList;
private List<ServerTransportFilter> transportFilterList;
private List<MethodDescriptor<?, ?>> criticalMethodList;

private ServerOption serverOption;
private GrpcSslConfiguration grpcSslConfiguration;
Expand Down Expand Up @@ -126,6 +120,12 @@ public void afterPropertiesSet() throws Exception {
// Add service
addService();

if (CollectionUtils.hasLength(criticalMethodList)) {
for (MethodDescriptor<?, ?> method : criticalMethodList) {
this.serverFactory.addCriticalMethod(method);
}
}

this.server = serverFactory.build();
if (logger.isInfoEnabled()) {
logger.info("Start {} server {}", this.beanName, this.server);
Expand Down Expand Up @@ -185,7 +185,7 @@ void blockUntilShutdown() throws InterruptedException {
}

@Override
public void setBeanName(final String beanName) {
public void setBeanName(@Nonnull final String beanName) {
this.beanName = beanName;
}

Expand Down Expand Up @@ -238,6 +238,11 @@ public void setTransportFilterList(List<ServerTransportFilter> transportFilterLi
this.transportFilterList = transportFilterList;
}

public void setCriticalMethodList(List<MethodDescriptor<?, ?>> methods) {
Objects.requireNonNull(methods, "critical methods");
this.criticalMethodList = methods;
}

@Autowired
public void setServerInterceptorList(List<ServerInterceptor> serverInterceptorList) {
this.serverInterceptorList = serverInterceptorList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@
<property name="serverInterceptorList" ref="agentInterceptorList"/>
<property name="enable" value="#{grpcAgentReceiverConfig.enable}"/>
<property name="serverOption" value="#{grpcAgentReceiverConfig.serverOption}"/>
<property name="criticalMethodList">
<list>
<value>#{T(com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc).getHandleCommandMethod()}</value>
<value>#{T(com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc).getHandleCommandV2Method()}</value>
</list>
</property>
</bean>

<bean id="grpcAgentSslReceiver" class="com.navercorp.pinpoint.collector.receiver.grpc.GrpcReceiver">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,7 @@
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import com.navercorp.pinpoint.grpc.security.SslContextFactory;
import com.navercorp.pinpoint.grpc.security.SslServerConfig;

import io.grpc.BindableService;
import io.grpc.InternalWithLogId;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import io.grpc.*;
import io.grpc.netty.LogIdServerListenerDelegator;
import io.grpc.netty.PinpointNettyServerBuilder;
import io.netty.channel.ChannelOption;
Expand All @@ -37,19 +31,14 @@
import io.netty.channel.WriteBufferWaterMark;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.annotation.Nonnull;
import javax.net.ssl.SSLException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;

/**
* @author Woonduk Kang(emeroad)
Expand All @@ -59,7 +48,7 @@ public class ServerFactory {

private final String name;

private String hostname;
private final String hostname;
private final int port;

private final Class<? extends ServerChannel> channelType;
Expand All @@ -69,11 +58,13 @@ public class ServerFactory {
private final ExecutorService workerExecutor;
private final EventLoopGroup workerEventLoopGroup;

private final Executor criticalExecutor;
private final Executor serverExecutor;

private final List<Object> bindableServices = new ArrayList<>();
private final List<ServerTransportFilter> serverTransportFilters = new ArrayList<>();
private final List<ServerInterceptor> serverInterceptors = new ArrayList<>();
private final List<MethodDescriptor<?, ?>> criticalMethods = new ArrayList<>();

private final ServerOption serverOption;
private final SslServerConfig sslServerConfig;
Expand All @@ -98,6 +89,13 @@ public ServerFactory(String name, String hostname, int port, Executor serverExec
this.workerExecutor = newExecutor(name + "-Channel-Worker");
this.workerEventLoopGroup = serverChannelType.newEventLoopGroup(CpuUtils.cpuCount(), workerExecutor);

this.criticalExecutor = new ThreadPoolExecutor(
4,
Integer.MAX_VALUE, 60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name + "-Critical", true)
);
this.serverExecutor = Objects.requireNonNull(serverExecutor, "executor");
}

Expand Down Expand Up @@ -136,6 +134,11 @@ public void addInterceptor(ServerInterceptor serverInterceptor) {
this.serverInterceptors.add(serverInterceptor);
}

public void addCriticalMethod(MethodDescriptor<?, ?> method) {
Objects.requireNonNull(method, "critical method");
this.criticalMethods.add(method);
}

public Server build() throws SSLException {
InetSocketAddress bindAddress = new InetSocketAddress(this.hostname, this.port);
PinpointNettyServerBuilder serverBuilder = PinpointNettyServerBuilder.forAddress(bindAddress);
Expand Down Expand Up @@ -168,7 +171,9 @@ public Server build() throws SSLException {
serverBuilder.intercept(serverInterceptor);
}

serverBuilder.executor(serverExecutor);
serverBuilder.executor(this.criticalExecutor);
serverBuilder.callExecutor(new GrpcCallExecutorSupplier(this.serverExecutor, this.criticalExecutor, this.criticalMethods));

setupServerOption(serverBuilder);

if (sslServerConfig.isEnable()) {
Expand All @@ -190,6 +195,35 @@ public Server build() throws SSLException {
return server;
}

private static class GrpcCallExecutorSupplier implements ServerCallExecutorSupplier {
private final Set<MethodDescriptor<?, ?>> criticalMethods;
private final Executor criticalExecutor;
private final Executor serverExecutor;

public GrpcCallExecutorSupplier(Executor serverExecutor, Executor criticalExecutor, Collection<MethodDescriptor<?, ?>> criticalMethods) {
this.serverExecutor = Objects.requireNonNull(serverExecutor, "serverExecutor");
this.criticalExecutor = Objects.requireNonNull(criticalExecutor, "criticalExecutor");

Objects.requireNonNull(criticalMethods, "criticalMethods");
this.criticalMethods = Collections.unmodifiableSet(new HashSet<>(criticalMethods));
}

@Nonnull
@Override
public <ReqT, RespT> Executor getExecutor(ServerCall<ReqT, RespT> call, Metadata metadata) {
if (isCritical(call)) {
return this.criticalExecutor;
} else {
return this.serverExecutor;
}
}

private <ReqT, RespT> boolean isCritical(ServerCall<ReqT, RespT> call) {
final MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
return criticalMethods.contains(method);
}
}


private void setupInternal(PinpointNettyServerBuilder serverBuilder) {

Expand Down

0 comments on commit 0dc1459

Please sign in to comment.