Skip to content

Commit

Permalink
[pinpoint-apm#10420] Removed PinpointNettyServerBuilder, Tracing gRPC…
Browse files Browse the repository at this point in the history
… logId
  • Loading branch information
youngjin.kim2 committed Oct 22, 2023
1 parent 0601f71 commit ded5cfe
Show file tree
Hide file tree
Showing 26 changed files with 357 additions and 1,340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public String helloworld() {
public String async() {
return "async " + testService.getHello() + " world";
}

@GetMapping(value = "/sleep")
public String sleep() throws InterruptedException {
Thread.sleep(5000);
return "sleep " + 5000 + "ms";
}
}
Original file line number Diff line number Diff line change
@@ -1,134 +1,89 @@
package com.navercorp.pinpoint.collector.controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.navercorp.pinpoint.collector.receiver.grpc.GrpcReceiverNames;
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import com.navercorp.pinpoint.grpc.channelz.ChannelzUtils;
import io.grpc.InternalChannelz;
import io.grpc.InternalInstrumented;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import com.navercorp.pinpoint.collector.service.ChannelzService;
import com.navercorp.pinpoint.collector.service.ChannelzService.ServerStatsWithId;
import com.navercorp.pinpoint.collector.service.ChannelzService.SocketStatsWithId;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Objects;
import java.util.Set;


@RestController
@RequestMapping("/channelz")
public class ChannelzController {

private final ChannelzRegistry channelzRegistry;
private final InternalChannelz channelz = InternalChannelz.instance();
private final ObjectMapper mapper;
private final ChannelzService channelzService;

public ChannelzController(ChannelzRegistry channelzRegistry, ObjectMapper objectMapper) {
this.channelzRegistry = Objects.requireNonNull(channelzRegistry, "channelzRegistry");
this.mapper = Objects.requireNonNull(objectMapper, "objectMapper");
public ChannelzController(ChannelzService channelzService) {
this.channelzService = Objects.requireNonNull(channelzService, "channelzService");
}

@GetMapping("/getSocket")
public String getSocket(long logId) throws JsonProcessingException {
InternalChannelz.SocketStats stats = getSocket0(logId);

return mapper.writeValueAsString(stats);
@GetMapping(value = "/sockets/{logId}")
public SocketStatsWithId findSocketStatsByLogId(@PathVariable long logId) {
return this.channelzService.getSocketStats(logId);
}

@GetMapping("/html/getSocket")
public String getSocketToHtml(long logId) {
InternalChannelz.SocketStats stats = getSocket0(logId);

return new HTMLBuilder().build(stats);
@GetMapping(value = "/sockets/{logId}", produces = MediaType.TEXT_HTML_VALUE)
public String findSocketStatsByLogIdInHtml(@PathVariable long logId) {
return buildHtml(this.findSocketStatsByLogId(logId));
}

private InternalChannelz.SocketStats getSocket0(long logId) {
InternalInstrumented<InternalChannelz.SocketStats> socket = channelz.getSocket(logId);
if (socket == null) {
return null;
}
return ChannelzUtils.getResult("Socket", socket);
@GetMapping(value = "/sockets")
public List<SocketStatsWithId> findSocketStats(
@RequestParam(required = false) String remoteAddress,
@RequestParam(required = false) Integer localPort
) throws Exception {
return this.channelzService.getSocketStatsList(remoteAddress, localPort);
}

@GetMapping("/findSocket")
public String findSocket(String remoteAddress, int localPort) throws JsonProcessingException {

ChannelzRegistry.AddressId addressId = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort);
List<InternalChannelz.SocketStats> stats = findSocket(addressId);
if (stats == null) {
return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort);
}

return mapper.writeValueAsString(stats);
@GetMapping(value = "/sockets", produces = MediaType.TEXT_HTML_VALUE)
public String findSocketStatInHtml(
@RequestParam(required = false) String remoteAddress,
@RequestParam(required = false) Integer localPort
) throws Exception {
return buildHtml(this.findSocketStats(remoteAddress, localPort));
}

@GetMapping("/html/findSocket")
public String findSocketStatToHtml(String remoteAddress, int localPort) {

ChannelzRegistry.AddressId targetAddress = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort);

List<InternalChannelz.SocketStats> stats = findSocket(targetAddress);
if (stats.isEmpty()) {
return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort);
}

return buildHtml(stats);
@GetMapping(value = "/servers")
public List<ServerStatsWithId> getAllServerStats() {
return this.channelzService.getAllServers();
}

@GetMapping(value = "/servers", produces = MediaType.TEXT_HTML_VALUE)
public String getAllServerStatsInHtml() {
return buildHtml(this.getAllServerStats());
}

private List<InternalChannelz.SocketStats> findSocket(ChannelzRegistry.AddressId targetAddress) {
Set<Long> logIdSet = channelzRegistry.getSocketLogId(targetAddress);
@GetMapping(value = "/servers/{name}")
public ServerStatsWithId getServerStat(@PathVariable("name") String name) {
return this.channelzService.getServer(name);
}

List<InternalInstrumented<InternalChannelz.SocketStats>> result = new ArrayList<>();
for (Long logId : logIdSet) {
InternalInstrumented<InternalChannelz.SocketStats> socket = channelz.getSocket(logId);
if (socket != null) {
result.add(socket);
}
}
return ChannelzUtils.getResults("Socket", result);
@GetMapping(value = "/servers/{name}", produces = MediaType.TEXT_HTML_VALUE)
public String getServerStatInHtml(@PathVariable("name") String name) {
return buildHtml(this.getServerStat(name));
}

@GetMapping("/html/getServer")
public String getServerStatToHtml(String serverName) {
List<InternalChannelz.ServerStats> stats = getServer(serverName);
if (stats == null) {
return notFound("serverName=" + serverName);
private static <T> String buildHtml(List<T> stats) {
if (stats == null || stats.isEmpty()) {
return "Empty";
}
return buildHtml(stats);
}

private <T> String buildHtml(List<T> stats) {
StringBuilder buffer = new StringBuilder();
for (T stat : stats) {
String html = new HTMLBuilder().build(stat);
buffer.append(html);
buffer.append(buildHtml(stat));
buffer.append("<br>");
}
return buffer.toString();
}


@GetMapping("/html/getSpanReceiver")
public String getSpanReceiverl() {
return getServerStatToHtml(GrpcReceiverNames.SPAN);
}


private List<InternalChannelz.ServerStats> getServer(String serverName) {
Long logId = channelzRegistry.getServerLogId(serverName);

InternalChannelz.ServerList serverList = channelz.getServers(logId, 10000);

return ChannelzUtils.getResults("ServerStats", serverList.servers);
}


private String notFound(String target) {
return target + " not Found";
private static <T> String buildHtml(T stats) {
if (stats == null) {
return "Null";
}
return new HTMLBuilder().build(stats);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.navercorp.pinpoint.collector.grpc.config.GrpcReceiverProperties;
import com.navercorp.pinpoint.collector.receiver.grpc.GrpcReceiver;
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import com.navercorp.pinpoint.grpc.channelz.GrpcServerRegistry;
import com.navercorp.pinpoint.grpc.security.SslContextFactory;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerInterceptor;
Expand Down Expand Up @@ -36,10 +36,10 @@ public GrpcReceiver grpcAgentSslReceiver(@Qualifier("grpcAgentSslReceiverPropert
AddressFilter addressFilter,
@Qualifier("agentServiceList") List<?> serviceList,
@Qualifier("agentInterceptorList")List<ServerInterceptor> serverInterceptorList,
ChannelzRegistry channelzRegistry,
GrpcServerRegistry grpcServerRegistry,
@Qualifier("grpcAgentServerExecutor") Executor executor,
@Qualifier("grpcAgentServerCallExecutorSupplier") ServerCallExecutorSupplier serverCallExecutorSupplier) throws SSLException {
GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, channelzRegistry, executor);
GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, grpcServerRegistry, executor);
receiver.setServerCallExecutorSupplier(serverCallExecutorSupplier);

return receiver;
Expand All @@ -51,10 +51,10 @@ public GrpcReceiver grpcSpanSslReceiver(@Qualifier("grpcSpanSslReceiverPropertie
AddressFilter addressFilter,
@Qualifier("spanServiceList") List<ServerServiceDefinition> serviceList,
@Qualifier("spanInterceptorList") List<ServerInterceptor> serverInterceptorList,
ChannelzRegistry channelzRegistry,
GrpcServerRegistry grpcServerRegistry,
@Qualifier("grpcSpanServerExecutor") Executor executor,
@Qualifier("serverTransportFilterList") List<ServerTransportFilter> transportFilterList) throws SSLException {
GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, channelzRegistry, executor);
GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, grpcServerRegistry, executor);
receiver.setTransportFilterList(transportFilterList);
return receiver;
}
Expand All @@ -65,10 +65,10 @@ public GrpcReceiver grpcStatSslReceiver(@Qualifier("grpcStatSslReceiverPropertie
AddressFilter addressFilter,
@Qualifier("statServiceList") List<ServerServiceDefinition> serviceList,
@Qualifier("statInterceptorList") List<ServerInterceptor> serverInterceptorList,
ChannelzRegistry channelzRegistry,
GrpcServerRegistry grpcServerRegistry,
@Qualifier("grpcStatServerExecutor") Executor executor,
@Qualifier("serverTransportFilterList") List<ServerTransportFilter> transportFilterList) throws SSLException {
GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, channelzRegistry, executor);
GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, grpcServerRegistry, executor);
receiver.setTransportFilterList(transportFilterList);
return receiver;
}
Expand All @@ -78,7 +78,7 @@ private GrpcReceiver createReceiver(GrpcSslReceiverProperties properties,
AddressFilter addressFilter,
List<?> serviceList,
List<ServerInterceptor> serverInterceptorList,
ChannelzRegistry channelzRegistry,
GrpcServerRegistry grpcServerRegistry,
Executor executor) throws SSLException {
GrpcReceiver receiver = new GrpcReceiver();
receiver.setBindAddress(properties.getBindAddress());
Expand All @@ -90,7 +90,7 @@ private GrpcReceiver createReceiver(GrpcSslReceiverProperties properties,
receiver.setAddressFilter(addressFilter);
receiver.setBindableServiceList(serviceList);
receiver.setServerInterceptorList(serverInterceptorList);
receiver.setChannelzRegistry(channelzRegistry);
receiver.setGrpcServerRegistry(grpcServerRegistry);

SslContext sslContext = newSslContext(properties.getGrpcSslProperties());
receiver.setSslContext(sslContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import com.navercorp.pinpoint.grpc.channelz.GrpcServerRegistry;
import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.ServerFactory;
import com.navercorp.pinpoint.grpc.server.ServerOption;
Expand Down Expand Up @@ -77,7 +77,7 @@ public class GrpcReceiver implements InitializingBean, DisposableBean, BeanNameA
private SslContext sslContext;

private Server server;
private ChannelzRegistry channelzRegistry;
private GrpcServerRegistry grpcServerRegistry;


@Override
Expand Down Expand Up @@ -122,8 +122,8 @@ public void afterPropertiesSet() throws Exception {
this.serverFactory.addInterceptor(serverInterceptor);
}
}
if (channelzRegistry != null) {
this.serverFactory.setChannelzRegistry(channelzRegistry);
if (grpcServerRegistry != null) {
this.serverFactory.setChannelzRegistry(grpcServerRegistry);
}

// Add service
Expand Down Expand Up @@ -276,8 +276,8 @@ public void setServerInterceptorList(List<ServerInterceptor> serverInterceptorLi
this.serverInterceptorList = serverInterceptorList;
}

public void setChannelzRegistry(ChannelzRegistry channelzRegistry) {
this.channelzRegistry = Objects.requireNonNull(channelzRegistry, "channelzRegistry");
public void setGrpcServerRegistry(GrpcServerRegistry grpcServerRegistry) {
this.grpcServerRegistry = Objects.requireNonNull(grpcServerRegistry, "grpcServerRegistry");
}

}
Loading

0 comments on commit ded5cfe

Please sign in to comment.