From db2d12186f887ce328d50497c217789ba22e0c59 Mon Sep 17 00:00:00 2001 From: emeroad Date: Sat, 16 Mar 2024 17:06:34 +0900 Subject: [PATCH] [#10701] Refactor SpanGrpcDataSender --- .../grpc/SpanGrpcDataSenderProvider.java | 18 +- .../sender/grpc/ExpireStreamState.java | 54 ++++++ .../profiler/sender/grpc/FinishState.java | 37 ++++ .../grpc/FinishStateResponseObserver.java | 37 ++++ .../sender/grpc/SimpleSpanGrpcDataSender.java | 177 ++++++++++++++++++ .../sender/grpc/SimpleStreamState.java | 13 +- .../profiler/sender/grpc/StreamState.java | 2 + .../sender/grpc/ExpireStreamStateTest.java | 18 ++ web/pom.xml | 2 +- 9 files changed, 349 insertions(+), 9 deletions(-) create mode 100644 agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ExpireStreamState.java create mode 100644 agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/FinishState.java create mode 100644 agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/FinishStateResponseObserver.java create mode 100644 agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SimpleSpanGrpcDataSender.java create mode 100644 agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/ExpireStreamStateTest.java diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/SpanGrpcDataSenderProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/SpanGrpcDataSenderProvider.java index cd29756afff9..3cb075deb414 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/SpanGrpcDataSenderProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/SpanGrpcDataSenderProvider.java @@ -33,7 +33,9 @@ import com.navercorp.pinpoint.profiler.context.SpanType; import com.navercorp.pinpoint.profiler.context.grpc.config.GrpcTransportConfig; import com.navercorp.pinpoint.profiler.context.module.SpanDataSender; +import com.navercorp.pinpoint.profiler.sender.grpc.ExpireStreamState; import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor; +import com.navercorp.pinpoint.profiler.sender.grpc.SimpleSpanGrpcDataSender; import com.navercorp.pinpoint.profiler.sender.grpc.SimpleStreamState; import com.navercorp.pinpoint.profiler.sender.grpc.SpanGrpcDataSender; import com.navercorp.pinpoint.profiler.sender.grpc.StreamState; @@ -108,13 +110,17 @@ public DataSender get() { final StreamState failState = new SimpleStreamState(spanClientOption.getLimitCount(), spanClientOption.getLimitTime()); logger.info("failState:{}", failState); - final SpanGrpcDataSender spanGrpcDataSender = new SpanGrpcDataSender(collectorIp, collectorPort, - senderExecutorQueueSize, messageConverter, - reconnectExecutor, channelFactory, failState, grpcTransportConfig.getSpanRpcMaxAgeMillis()); +// final SpanGrpcDataSender spanGrpcDataSender = new SpanGrpcDataSender(collectorIp, collectorPort, +// senderExecutorQueueSize, messageConverter, +// reconnectExecutor, channelFactory, failState, grpcTransportConfig.getSpanRpcMaxAgeMillis()); +// if (grpcTransportConfig.isSpanEnableStatLogging()) { +// registerChannelzReporter(spanGrpcDataSender); +// } - if (grpcTransportConfig.isSpanEnableStatLogging()) { - registerChannelzReporter(spanGrpcDataSender); - } + SimpleSpanGrpcDataSender spanGrpcDataSender = new SimpleSpanGrpcDataSender(collectorIp, collectorPort, + senderExecutorQueueSize, messageConverter, + channelFactory, new ExpireStreamState(6000 * 10) + , grpcTransportConfig.getSpanRpcMaxAgeMillis()); return spanGrpcDataSender; } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ExpireStreamState.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ExpireStreamState.java new file mode 100644 index 000000000000..bdd871d98871 --- /dev/null +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/ExpireStreamState.java @@ -0,0 +1,54 @@ +package com.navercorp.pinpoint.profiler.sender.grpc; + +import io.grpc.internal.ExponentialBackoffPolicy; + +public class ExpireStreamState implements StreamState { + + private long checkTime; + private final long maxAge; + private final ExponentialBackoffPolicy policy = new ExponentialBackoffPolicy(); + private int failCount; + + public ExpireStreamState(long maxAge) { + this.checkTime = currentTimeMillis(); + this.maxAge = maxAge; + } + + + long currentTimeMillis() { + return System.currentTimeMillis(); + } + + @Override + public void fail() { + this.failCount++; + } + + @Override + public long getFailCount() { + return this.failCount; + } + + @Override + public boolean isFailure() { + final long currentTimeMillis = currentTimeMillis(); + final long expireTime = checkTime + maxAge; + if (currentTimeMillis > expireTime) { + checkTime = currentTimeMillis; + return true; + } + + return false; + } + + @Override + public void success() { + this.failCount = 0; + } + + @Override + public String toString() { + return "SimpleStreamState{"; + + } +} diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/FinishState.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/FinishState.java new file mode 100644 index 000000000000..9ef75d15c369 --- /dev/null +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/FinishState.java @@ -0,0 +1,37 @@ +package com.navercorp.pinpoint.profiler.sender.grpc; + +public class FinishState { + + private volatile State state = State.RUN; + + public enum State { + RUN, + COMPLETED, + ERROR; + + public boolean isRun() { + return this == RUN; + } + + public boolean isFinish() { + return this != RUN; + } + } + + public void completed() { + state = State.COMPLETED; + } + + public void error() { + state = State.ERROR; + } + + public State current() { + return state; + } + + public boolean isRun() { + return state.isRun(); + } + +} diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/FinishStateResponseObserver.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/FinishStateResponseObserver.java new file mode 100644 index 000000000000..626d8ee1ccfd --- /dev/null +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/FinishStateResponseObserver.java @@ -0,0 +1,37 @@ +package com.navercorp.pinpoint.profiler.sender.grpc; + +import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.Logger; + +import java.util.Objects; + +public class FinishStateResponseObserver implements StreamObserver { + private final Logger logger; + + private final FinishState state = new FinishState(); + + public FinishStateResponseObserver(Logger logger) { + this.logger = Objects.requireNonNull(logger, "logger"); + } + + @Override + public void onNext(V value) { + logger.debug("onNext {}", value); + } + + @Override + public void onError(Throwable t) { + logger.info("onError", t); + this.state.error(); + } + + @Override + public void onCompleted() { + logger.debug("onCompleted"); + this.state.completed(); + } + + public FinishState getState() { + return state; + } +}; diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SimpleSpanGrpcDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SimpleSpanGrpcDataSender.java new file mode 100644 index 000000000000..e52dc8ed39bc --- /dev/null +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SimpleSpanGrpcDataSender.java @@ -0,0 +1,177 @@ +/* + * Copyright 2019 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.profiler.sender.grpc; + + +import com.google.protobuf.Empty; +import com.google.protobuf.GeneratedMessageV3; +import com.navercorp.pinpoint.common.profiler.message.MessageConverter; +import com.navercorp.pinpoint.grpc.client.ChannelFactory; +import com.navercorp.pinpoint.grpc.trace.PSpan; +import com.navercorp.pinpoint.grpc.trace.PSpanChunk; +import com.navercorp.pinpoint.grpc.trace.PSpanMessage; +import com.navercorp.pinpoint.grpc.trace.SpanGrpc; +import com.navercorp.pinpoint.profiler.context.SpanType; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.StreamObserver; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static com.navercorp.pinpoint.grpc.MessageFormatUtils.debugLog; + +/** + * @author jaehong.kim + */ +public class SimpleSpanGrpcDataSender extends GrpcDataSender { + + private final static String id = "SimpleSpanStream"; + private final AtomicLong streamId = new AtomicLong(0); + private final Thread dispathThread; + private final StreamState state; + + public SimpleSpanGrpcDataSender(String host, int port, + int executorQueueSize, + MessageConverter messageConverter, + ChannelFactory channelFactory, + StreamState state, + long maxRpcAgeMillis) { + super(host, port, executorQueueSize, messageConverter, channelFactory); + this.state = Objects.requireNonNull(state, "state"); + + this.dispathThread = new Thread(this::dispatch, "Pinpoint grpc-span-dispatch"); + this.dispathThread.start(); + } + + private void dispatch() { + FinishStateResponseObserver response = new FinishStateResponseObserver<>(logger); + ClientCallStreamObserver callStream = newStream(response); + while (!shutdown) { + try { + final SpanType message = queue.poll(3000, TimeUnit.MILLISECONDS); + if (message != null) { + logger.debug("--------------dispatch:{}", message); + boolean ready = callStream.isReady(); + boolean runState = response.getState().isRun(); + if (ready && runState) { + try { + onDispatch(callStream, message); + } catch (Throwable th) { + logger.warn("Unexpected onDispatch error error", th); + } + state.success(); + } else { + logger.info("dispatch failed isReady:{}, runState:{}", ready, runState); + state.fail(); + } + } + if (state.isFailure()) { + logger.info("renewStream: {}", this); + long failCount = state.getFailCount(); + if (failCount == 0) { + callStream.onCompleted(); + } else { + String errorMessage = "failState detected failCount" + failCount; + callStream.cancel(errorMessage, new Exception(errorMessage)); + } + response = new FinishStateResponseObserver<>(logger); + callStream = newStream(response); + } + continue; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // terminate signal + logger.debug("Dispatch thread interrupted {}", Thread.currentThread().getName()); + break; + } catch (Throwable th) { + logger.error("Unexpected outer onDispatch error", th); + } + logger.info("dispatch finished"); +// callStream.onCompleted(); + } + } + + public ClientCallStreamObserver newStream(StreamObserver response) { + final ManagedChannel managedChannel = this.managedChannel; + String authority = managedChannel.authority(); + final ConnectivityState state = managedChannel.getState(false); + this.logger.info("newStream {}/{} state:{} isShutdown:{} isTerminated:{}", id, authority, state, managedChannel.isShutdown(), managedChannel.isTerminated()); + + SpanGrpc.SpanStub spanStub = SpanGrpc.newStub(managedChannel); + return (ClientCallStreamObserver) spanStub.sendSpan(response); + } + + public void onDispatch(ClientCallStreamObserver stream, SpanType data) { + final GeneratedMessageV3 message = this.messageConverter.toMessage(data); + if (isDebug) { + logger.debug("Send message={}", debugLog(message)); + } + final PSpanMessage.Builder builder = PSpanMessage.newBuilder(); + if (message instanceof PSpanChunk) { + final PSpanChunk spanChunk = (PSpanChunk) message; + final PSpanMessage spanMessage = builder.setSpanChunk(spanChunk).build(); + stream.onNext(spanMessage); + return; + } + if (message instanceof PSpan) { + final PSpan pSpan = (PSpan) message; + final PSpanMessage spanMessage = builder.setSpan(pSpan).build(); + stream.onNext(spanMessage); + return; + } + throw new IllegalStateException("unsupported message " + data); + } + + @Override + public boolean send(SpanType data) { + return super.send(data); + } + + + @Override + public void stop() { + if (shutdown) { + return; + } + this.shutdown = true; + + try { + this.dispathThread.interrupt(); + this.dispathThread.join(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + logger.info("Stop {}, channel={}", name, managedChannel); + +// StreamUtils.close(this.stream); + release(); + } + + @Override + public String toString() { + return "SpanGrpcDataSender{" + + "name='" + name + '\'' + + ", host='" + host + '\'' + + ", port=" + port + + "} " + super.toString(); + } + + +} \ No newline at end of file diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SimpleStreamState.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SimpleStreamState.java index fb02a9935329..75fd816fb6e0 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SimpleStreamState.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SimpleStreamState.java @@ -15,17 +15,26 @@ public SimpleStreamState(int limitCount, long limitTime) { @Override public void fail() { if (failureTime == 0) { - failureTime = System.currentTimeMillis(); + failureTime = currentTimeMills(); } failCount++; } + @Override + public long getFailCount() { + return failCount; + } + @Override public boolean isFailure() { - final long errorDuration = System.currentTimeMillis() - failureTime; + final long errorDuration = currentTimeMills() - failureTime; return errorDuration > limitTime && failCount > limitCount; } + long currentTimeMills() { + return System.currentTimeMillis(); + } + @Override public void success() { failureTime = 0; diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamState.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamState.java index 0d96d321e78f..d5c38af2c2c4 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamState.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StreamState.java @@ -3,6 +3,8 @@ public interface StreamState { void fail(); + long getFailCount(); + boolean isFailure(); void success(); diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/ExpireStreamStateTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/ExpireStreamStateTest.java new file mode 100644 index 000000000000..078cf279845e --- /dev/null +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/ExpireStreamStateTest.java @@ -0,0 +1,18 @@ +package com.navercorp.pinpoint.profiler.sender.grpc; + +import org.junit.jupiter.api.Test; + +class ExpireStreamStateTest { + + @Test + void fail() { + } + + @Test + void getFailCount() { + } + + @Test + void isFailure() { + } +} \ No newline at end of file diff --git a/web/pom.xml b/web/pom.xml index e36f9404dc6b..ff93e16ed588 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -233,7 +233,7 @@ org.springframework.boot spring-boot-starter-tomcat - provided + runtime