Skip to content

Commit

Permalink
[#11158] Refactor GrpcDataSender
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jul 1, 2024
1 parent f5b0e72 commit 087e5a7
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ public EnhancedDataSender<MetaDataType, ResponseMessage> get() {
final ChannelFactoryBuilder channelFactoryBuilder = newChannelFactoryBuilder(sslEnable, clientRetryEnable);

final ChannelFactory channelFactory = channelFactoryBuilder.build();
final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize();

if (clientRetryEnable) {
return new MetadataGrpcHedgingDataSender<>(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, channelFactory);
return new MetadataGrpcHedgingDataSender<>(collectorIp, collectorPort, messageConverter, channelFactory);
}

final int senderExecutorQueueSize = grpcTransportConfig.getMetadataSenderExecutorQueueSize();
final int retryMaxCount = grpcTransportConfig.getMetadataRetryMaxCount();
final int retryDelayMillis = grpcTransportConfig.getMetadataRetryDelayMillis();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.profiler.sender.grpc;

import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.common.profiler.logging.ThrottledLogger;
import com.navercorp.pinpoint.common.profiler.message.DataSender;
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
import com.navercorp.pinpoint.grpc.ManagedChannelUtils;
import com.navercorp.pinpoint.grpc.client.ChannelFactory;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;

/**
* @author Woonduk Kang(emeroad)
*/
public abstract class AbstractGrpcDataSender<T> implements DataSender<T> {
protected final Logger logger = LogManager.getLogger(this.getClass());
protected final boolean isDebug = logger.isDebugEnabled();
protected final ThrottledLogger tLogger;

protected final String name;
protected final String host;
protected final int port;

protected final ManagedChannel managedChannel;
protected final long logId;

// not thread safe
protected final MessageConverter<T, GeneratedMessageV3> messageConverter;

protected final ChannelFactory channelFactory;

protected volatile boolean shutdown;


public AbstractGrpcDataSender(String host, int port,
MessageConverter<T, GeneratedMessageV3> messageConverter,
ChannelFactory channelFactory) {
this.tLogger = ThrottledLogger.getLogger(logger, 100);

this.channelFactory = Objects.requireNonNull(channelFactory, "channelFactory");

this.name = Objects.requireNonNull(channelFactory.getFactoryName(), "channelFactory.name");
this.host = Objects.requireNonNull(host, "host");
this.port = port;

this.messageConverter = Objects.requireNonNull(messageConverter, "messageConverter");

this.managedChannel = channelFactory.build(host, port);
this.logId = ManagedChannelUtils.getLogId(managedChannel);

final ConnectivityState state = managedChannel.getState(false);
this.managedChannel.notifyWhenStateChanged(state, new ConnectivityStateMonitor(state));

}

public long getLogId() {
return logId;
}

private class ConnectivityStateMonitor implements Runnable {
private final ConnectivityState before;

public ConnectivityStateMonitor(ConnectivityState before) {
this.before = Objects.requireNonNull(before, "before");
}

@Override
public void run() {
final ConnectivityState change = managedChannel.getState(false);
logger.info("ConnectivityState changed before:{}, change:{}", before, change);
if (change == ConnectivityState.TRANSIENT_FAILURE) {
logger.info("Failed to connect to collector server {} {}/{}", name, host, port);
}
managedChannel.notifyWhenStateChanged(change, new ConnectivityStateMonitor(change));
}
}



protected void releaseChannel() {
final ManagedChannel managedChannel = this.managedChannel;
if (managedChannel != null) {
ManagedChannelUtils.shutdownManagedChannel(name, managedChannel);
}
final ChannelFactory channelFactory = this.channelFactory;
if (channelFactory != null) {
channelFactory.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,10 @@
import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.common.profiler.concurrent.ExecutorFactory;
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
import com.navercorp.pinpoint.common.profiler.logging.ThrottledLogger;
import com.navercorp.pinpoint.common.profiler.message.DataSender;
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
import com.navercorp.pinpoint.grpc.ExecutorUtils;
import com.navercorp.pinpoint.grpc.ManagedChannelUtils;
import com.navercorp.pinpoint.grpc.client.ChannelFactory;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -39,77 +31,21 @@
/**
* @author Woonduk Kang(emeroad)
*/
public abstract class GrpcDataSender<T> implements DataSender<T> {
protected final Logger logger = LogManager.getLogger(this.getClass());
protected final boolean isDebug = logger.isDebugEnabled();

protected final String name;
protected final String host;
protected final int port;

protected final ManagedChannel managedChannel;
protected final long logId;

// not thread safe
protected final MessageConverter<T, GeneratedMessageV3> messageConverter;
public abstract class GrpcDataSender<T> extends AbstractGrpcDataSender<T> {

protected final ExecutorService executor;

protected final ChannelFactory channelFactory;

protected volatile boolean shutdown;

protected final BlockingQueue<T> queue;
protected final ThrottledLogger tLogger;


public GrpcDataSender(String host, int port,
int executorQueueSize,
MessageConverter<T, GeneratedMessageV3> messageConverter,
ChannelFactory channelFactory) {
this.channelFactory = Objects.requireNonNull(channelFactory, "channelFactory");

this.name = Objects.requireNonNull(channelFactory.getFactoryName(), "channelFactory.name");
this.host = Objects.requireNonNull(host, "host");
this.port = port;

this.messageConverter = Objects.requireNonNull(messageConverter, "messageConverter");
super(host, port, messageConverter, channelFactory);

this.executor = newExecutorService(name + "-Executor", executorQueueSize);

this.managedChannel = channelFactory.build(host, port);
this.logId = ManagedChannelUtils.getLogId(managedChannel);

final ConnectivityState state = managedChannel.getState(false);
this.managedChannel.notifyWhenStateChanged(state, new ConnectivityStateMonitor(state));


this.tLogger = ThrottledLogger.getLogger(logger, 100);
this.queue = new LinkedBlockingQueue<>(executorQueueSize);
}

public long getLogId() {
return logId;
}

private class ConnectivityStateMonitor implements Runnable {
private final ConnectivityState before;

public ConnectivityStateMonitor(ConnectivityState before) {
this.before = Objects.requireNonNull(before, "before");
}

@Override
public void run() {
final ConnectivityState change = managedChannel.getState(false);
logger.info("ConnectivityState changed before:{}, change:{}", before, change);
if (change == ConnectivityState.TRANSIENT_FAILURE) {
logger.info("Failed to connect to collector server {} {}/{}", name, host, port);
}
managedChannel.notifyWhenStateChanged(change, new ConnectivityStateMonitor(change));
}
}

protected ExecutorService newExecutorService(String name, int senderExecutorQueueSize) {
ThreadFactory threadFactory = new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name, true);
return ExecutorFactory.newFixedThreadPool(1, senderExecutorQueueSize, threadFactory);
Expand All @@ -133,13 +69,6 @@ public boolean send(final T data) {

protected void release() {
ExecutorUtils.shutdownExecutorService(name, executor);
final ManagedChannel managedChannel = this.managedChannel;
if (managedChannel != null) {
ManagedChannelUtils.shutdownManagedChannel(name, managedChannel);
}
final ChannelFactory channelFactory = this.channelFactory;
if (channelFactory != null) {
channelFactory.close();
}
super.releaseChannel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
public class LogResponseStreamObserver<ResT> implements StreamObserver<ResT> {
private final Logger logger;
private final String name;
private final long requestCount;
private final long requestId;

public LogResponseStreamObserver(Logger logger, String name, long requestCount) {
public LogResponseStreamObserver(Logger logger, String name, long requestId) {
this.logger = Objects.requireNonNull(logger, "logger");
this.name = Objects.requireNonNull(name, "name");
this.requestCount = requestCount;
this.requestId = requestId;
}

@Override
Expand All @@ -49,12 +49,12 @@ public void onError(Throwable throwable) {
if (logger.isInfoEnabled()) {
final StatusError statusError = StatusErrors.throwable(throwable);
if (statusError.isSimpleError()) {
logger.info("{} Error. requestCount={}, cause={}", name, requestCount, statusError.getMessage());
logger.info("{} Error. requestId={}, cause={}", name, requestId, statusError.getMessage());
} else {
if (logger.isDebugEnabled()) {
logger.debug("{} Error. requestCount={}, cause={}", name, requestCount, statusError.getMessage(), statusError.getThrowable());
logger.debug("{} Error. requestId={}, cause={}", name, requestId, statusError.getMessage(), statusError.getThrowable());
} else {
logger.info("{} Error. requestCount={}, cause={}", name, requestCount, statusError.getMessage());
logger.info("{} Error. requestId={}, cause={}", name, requestId, statusError.getMessage());
}

}
Expand All @@ -64,7 +64,7 @@ public void onError(Throwable throwable) {
@Override
public void onCompleted() {
if (logger.isDebugEnabled()) {
logger.debug("{} onCompleted. requestCount={}", requestCount, name);
logger.debug("{} onCompleted. requestCount={}", requestId, name);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void scheduleNextRetry(final GeneratedMessageV3 message, final int remai
}
if (remainingRetryCount <= 0) {
if (isDebug) {
logger.debug("Request drop. remainingRetryCount={}, request={}", MessageFormatUtils.debugLog(message), remainingRetryCount);
logger.debug("Request drop. request={}, remainingRetryCount={}", MessageFormatUtils.debugLog(message), remainingRetryCount);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@

/**
*/
public class MetadataGrpcHedgingDataSender<T> extends GrpcDataSender<T> implements EnhancedDataSender<T, ResponseMessage> {
//
public class MetadataGrpcHedgingDataSender<T> extends AbstractGrpcDataSender<T> implements EnhancedDataSender<T, ResponseMessage> {

private final MetadataGrpc.MetadataStub metadataStub;

private final AtomicLong requestCount = new AtomicLong(0);
private final AtomicLong requestIdGen = new AtomicLong(0);

public MetadataGrpcHedgingDataSender(String host, int port, int executorQueueSize,
public MetadataGrpcHedgingDataSender(String host, int port,
MessageConverter<T, GeneratedMessageV3> messageConverter,
ChannelFactory channelFactory) {
super(host, port, executorQueueSize, messageConverter, channelFactory);
super(host, port, messageConverter, channelFactory);

this.metadataStub = MetadataGrpc.newStub(managedChannel);
}
Expand Down Expand Up @@ -100,8 +100,8 @@ public boolean request(final T data) {

private StreamObserver<PResult> newLogStreamObserver(GeneratedMessageV3 message) {
String type = message.getClass().getSimpleName();
long requestCount = this.requestCount.incrementAndGet();
return new LogResponseStreamObserver<>(logger, type, requestCount);
long requestId = this.requestIdGen.incrementAndGet();
return new LogResponseStreamObserver<>(logger, type, requestId);
}

@Override
Expand All @@ -111,6 +111,6 @@ public void stop() {
}
this.shutdown = true;

super.release();
super.releaseChannel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ public void close() {
}
};

return new MetadataGrpcHedgingDataSender<>("localhost", 1234, 1,
converter, factory);
return new MetadataGrpcHedgingDataSender<>("localhost", 1234, converter, factory);
}
}

0 comments on commit 087e5a7

Please sign in to comment.