Skip to content

Commit

Permalink
[pinpoint-apm#9264] Guarded ping only when stream is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjin.kim2 committed Oct 13, 2022
1 parent 9f1bd6b commit 8c04203
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.collector.receiver.DispatchHandler;
import com.navercorp.pinpoint.common.util.ThrottledRunnable;
import com.navercorp.pinpoint.grpc.MessageFormatUtils;
import com.navercorp.pinpoint.grpc.StatusError;
import com.navercorp.pinpoint.grpc.StatusErrors;
import com.navercorp.pinpoint.grpc.server.ServerContext;
import com.navercorp.pinpoint.grpc.server.lifecycle.PingEventHandler;
import com.navercorp.pinpoint.grpc.trace.AgentGrpc;
import com.navercorp.pinpoint.grpc.trace.PAgentInfo;
Expand All @@ -34,6 +36,7 @@
import com.navercorp.pinpoint.thrift.io.DefaultTBaseLocator;

import io.grpc.Context;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -91,6 +94,10 @@ public void run() {
public StreamObserver<PPing> pingSession(final StreamObserver<PPing> responseObserver) {
final StreamObserver<PPing> request = new StreamObserver<PPing>() {
private final AtomicBoolean first = new AtomicBoolean(false);
private final Runnable warnNotReady = new ThrottledRunnable(() -> {
logger.warn("ping message is ignored: stream is not ready: {}", ServerContext.getAgentInfo());
}, 100);

private final long id = nextSessionId();
@Override
public void onNext(PPing ping) {
Expand All @@ -107,7 +114,11 @@ public void onNext(PPing ping) {
logger.debug("PingSession:{} onNext:{}", id, MessageFormatUtils.debugLog(ping));
}
PPing replay = newPing();
responseObserver.onNext(replay);
if (isReady(responseObserver)) {
responseObserver.onNext(replay);
} else {
warnNotReady.run();
}
}

private PPing newPing() {
Expand Down Expand Up @@ -143,6 +154,14 @@ private void disconnect() {
return request;
}

private static boolean isReady(StreamObserver<PPing> responseObserver) {
if (responseObserver instanceof ServerCallStreamObserver<?>) {
ServerCallStreamObserver<PPing> observer = (ServerCallStreamObserver<PPing>) responseObserver;
return observer.isReady();
}
return true;
}

private long nextSessionId() {
return idAllocator.getAndIncrement();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.common.util;

import java.util.concurrent.atomic.AtomicLong;

/**
* @author youngjin.kim2
*/
public class ThrottledRunnable implements Runnable {
private final Runnable delegate;
private final long ratio;
private final AtomicLong counter = new AtomicLong(0);

public ThrottledRunnable(Runnable r, long ratio) {
this.delegate = r;
this.ratio = ratio;
}

@Override
public void run() {
if (trigger()) {
this.delegate.run();
}
}

private boolean trigger() {
return counter.getAndIncrement() % ratio == 0;
}
}

0 comments on commit 8c04203

Please sign in to comment.