Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#10701] Refactor SpanGrpcDataSender #11448

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import com.navercorp.pinpoint.profiler.context.SpanType;
import com.navercorp.pinpoint.profiler.context.grpc.config.GrpcTransportConfig;
import com.navercorp.pinpoint.profiler.context.module.SpanDataSender;
import com.navercorp.pinpoint.profiler.sender.grpc.ExpireStreamState;
import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor;
import com.navercorp.pinpoint.profiler.sender.grpc.SimpleSpanGrpcDataSender;
import com.navercorp.pinpoint.profiler.sender.grpc.SimpleStreamState;
import com.navercorp.pinpoint.profiler.sender.grpc.SpanGrpcDataSender;
import com.navercorp.pinpoint.profiler.sender.grpc.StreamState;
Expand Down Expand Up @@ -108,13 +110,17 @@ public DataSender<SpanType> get() {
final StreamState failState = new SimpleStreamState(spanClientOption.getLimitCount(), spanClientOption.getLimitTime());
logger.info("failState:{}", failState);

final SpanGrpcDataSender spanGrpcDataSender = new SpanGrpcDataSender(collectorIp, collectorPort,
senderExecutorQueueSize, messageConverter,
reconnectExecutor, channelFactory, failState, grpcTransportConfig.getSpanRpcMaxAgeMillis());
// final SpanGrpcDataSender spanGrpcDataSender = new SpanGrpcDataSender(collectorIp, collectorPort,
// senderExecutorQueueSize, messageConverter,
// reconnectExecutor, channelFactory, failState, grpcTransportConfig.getSpanRpcMaxAgeMillis());
// if (grpcTransportConfig.isSpanEnableStatLogging()) {
// registerChannelzReporter(spanGrpcDataSender);
// }

if (grpcTransportConfig.isSpanEnableStatLogging()) {
registerChannelzReporter(spanGrpcDataSender);
}
SimpleSpanGrpcDataSender spanGrpcDataSender = new SimpleSpanGrpcDataSender(collectorIp, collectorPort,
senderExecutorQueueSize, messageConverter,
channelFactory, new ExpireStreamState(6000 * 10)
, grpcTransportConfig.getSpanRpcMaxAgeMillis());

return spanGrpcDataSender;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.navercorp.pinpoint.profiler.sender.grpc;

import io.grpc.internal.ExponentialBackoffPolicy;

public class ExpireStreamState implements StreamState {

private long checkTime;
private final long maxAge;
private final ExponentialBackoffPolicy policy = new ExponentialBackoffPolicy();
private int failCount;

public ExpireStreamState(long maxAge) {
this.checkTime = currentTimeMillis();
this.maxAge = maxAge;
}


long currentTimeMillis() {
return System.currentTimeMillis();
}

@Override
public void fail() {
this.failCount++;
}

@Override
public long getFailCount() {
return this.failCount;
}

@Override
public boolean isFailure() {
final long currentTimeMillis = currentTimeMillis();
final long expireTime = checkTime + maxAge;
if (currentTimeMillis > expireTime) {
checkTime = currentTimeMillis;
return true;
}

return false;
}

@Override
public void success() {
this.failCount = 0;
}

@Override
public String toString() {
return "SimpleStreamState{";

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.navercorp.pinpoint.profiler.sender.grpc;

public class FinishState {

private volatile State state = State.RUN;

public enum State {
RUN,
COMPLETED,
ERROR;

public boolean isRun() {
return this == RUN;
}

public boolean isFinish() {
return this != RUN;
}
}

public void completed() {
state = State.COMPLETED;
}

public void error() {
state = State.ERROR;
}

public State current() {
return state;
}

public boolean isRun() {
return state.isRun();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.navercorp.pinpoint.profiler.sender.grpc;

import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.Logger;

import java.util.Objects;

public class FinishStateResponseObserver<V> implements StreamObserver<V> {
private final Logger logger;

private final FinishState state = new FinishState();

public FinishStateResponseObserver(Logger logger) {
this.logger = Objects.requireNonNull(logger, "logger");
}

@Override
public void onNext(V value) {
logger.debug("onNext {}", value);
}

@Override
public void onError(Throwable t) {
logger.info("onError", t);
this.state.error();
}

@Override
public void onCompleted() {
logger.debug("onCompleted");
this.state.completed();
}

public FinishState getState() {
return state;
}
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright 2019 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.profiler.sender.grpc;


import com.google.protobuf.Empty;
import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
import com.navercorp.pinpoint.grpc.client.ChannelFactory;
import com.navercorp.pinpoint.grpc.trace.PSpan;
import com.navercorp.pinpoint.grpc.trace.PSpanChunk;
import com.navercorp.pinpoint.grpc.trace.PSpanMessage;
import com.navercorp.pinpoint.grpc.trace.SpanGrpc;
import com.navercorp.pinpoint.profiler.context.SpanType;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.navercorp.pinpoint.grpc.MessageFormatUtils.debugLog;

/**
* @author jaehong.kim
*/
public class SimpleSpanGrpcDataSender extends GrpcDataSender<SpanType> {

private final static String id = "SimpleSpanStream";
private final AtomicLong streamId = new AtomicLong(0);
private final Thread dispathThread;
private final StreamState state;

public SimpleSpanGrpcDataSender(String host, int port,
int executorQueueSize,
MessageConverter<SpanType, GeneratedMessageV3> messageConverter,
ChannelFactory channelFactory,
StreamState state,
long maxRpcAgeMillis) {
super(host, port, executorQueueSize, messageConverter, channelFactory);
this.state = Objects.requireNonNull(state, "state");

this.dispathThread = new Thread(this::dispatch, "Pinpoint grpc-span-dispatch");
this.dispathThread.start();
}

private void dispatch() {
FinishStateResponseObserver<Empty> response = new FinishStateResponseObserver<>(logger);
ClientCallStreamObserver<PSpanMessage> callStream = newStream(response);
while (!shutdown) {
try {
final SpanType message = queue.poll(3000, TimeUnit.MILLISECONDS);
if (message != null) {
logger.debug("--------------dispatch:{}", message);
boolean ready = callStream.isReady();
boolean runState = response.getState().isRun();
if (ready && runState) {
try {
onDispatch(callStream, message);
} catch (Throwable th) {
logger.warn("Unexpected onDispatch error error", th);
}
state.success();
} else {
logger.info("dispatch failed isReady:{}, runState:{}", ready, runState);
state.fail();
}
}
if (state.isFailure()) {
logger.info("renewStream: {}", this);
long failCount = state.getFailCount();
if (failCount == 0) {
callStream.onCompleted();
} else {
String errorMessage = "failState detected failCount" + failCount;
callStream.cancel(errorMessage, new Exception(errorMessage));
}
response = new FinishStateResponseObserver<>(logger);
callStream = newStream(response);
}
continue;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// terminate signal
logger.debug("Dispatch thread interrupted {}", Thread.currentThread().getName());
break;
} catch (Throwable th) {
logger.error("Unexpected outer onDispatch error", th);
}
logger.info("dispatch finished");
// callStream.onCompleted();
}
}

public ClientCallStreamObserver<PSpanMessage> newStream(StreamObserver<Empty> response) {
final ManagedChannel managedChannel = this.managedChannel;
String authority = managedChannel.authority();
final ConnectivityState state = managedChannel.getState(false);
this.logger.info("newStream {}/{} state:{} isShutdown:{} isTerminated:{}", id, authority, state, managedChannel.isShutdown(), managedChannel.isTerminated());

SpanGrpc.SpanStub spanStub = SpanGrpc.newStub(managedChannel);
return (ClientCallStreamObserver<PSpanMessage>) spanStub.sendSpan(response);
}

public void onDispatch(ClientCallStreamObserver<PSpanMessage> stream, SpanType data) {
final GeneratedMessageV3 message = this.messageConverter.toMessage(data);
if (isDebug) {
logger.debug("Send message={}", debugLog(message));
}
final PSpanMessage.Builder builder = PSpanMessage.newBuilder();
if (message instanceof PSpanChunk) {
final PSpanChunk spanChunk = (PSpanChunk) message;
final PSpanMessage spanMessage = builder.setSpanChunk(spanChunk).build();
stream.onNext(spanMessage);
return;
}
if (message instanceof PSpan) {
final PSpan pSpan = (PSpan) message;
final PSpanMessage spanMessage = builder.setSpan(pSpan).build();
stream.onNext(spanMessage);
return;
}
throw new IllegalStateException("unsupported message " + data);
}

@Override
public boolean send(SpanType data) {
return super.send(data);
}


@Override
public void stop() {
if (shutdown) {
return;
}
this.shutdown = true;

try {
this.dispathThread.interrupt();
this.dispathThread.join(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("Stop {}, channel={}", name, managedChannel);

// StreamUtils.close(this.stream);
release();
}

@Override
public String toString() {
return "SpanGrpcDataSender{" +
"name='" + name + '\'' +
", host='" + host + '\'' +
", port=" + port +
"} " + super.toString();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@ public SimpleStreamState(int limitCount, long limitTime) {
@Override
public void fail() {
if (failureTime == 0) {
failureTime = System.currentTimeMillis();
failureTime = currentTimeMills();
}
failCount++;
}

@Override
public long getFailCount() {
return failCount;
}

@Override
public boolean isFailure() {
final long errorDuration = System.currentTimeMillis() - failureTime;
final long errorDuration = currentTimeMills() - failureTime;
return errorDuration > limitTime && failCount > limitCount;
}

long currentTimeMills() {
return System.currentTimeMillis();
}

@Override
public void success() {
failureTime = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
public interface StreamState {
void fail();

long getFailCount();

boolean isFailure();

void success();
Expand Down
Loading