Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#11265] Improve logging of StreamObserver.onError #11270

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.examples.manualflowcontrol.StreamingGreeterGrpc;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* @author Taejin Koo
*/
public class HelloWorldStreamClient {
private final Logger logger = Logger.getLogger(HelloWorldStreamClient.class.getName());
private final Logger logger = LogManager.getLogger(HelloWorldStreamClient.class);

private final ManagedChannel channel;
private final StreamingGreeterGrpc.StreamingGreeterStub stub;
Expand Down Expand Up @@ -125,7 +127,8 @@ public void onNext(io.grpc.examples.manualflowcontrol.HelloReply value) {

@Override
public void onError(Throwable t) {
t.printStackTrace();
Status status = Status.fromThrowable(t);
logger.info("onError:{}", status);
done.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void onNext(HelloRequest request) {
@Override
public void onError(Throwable t) {
// End the response stream if the client presents an error.
t.printStackTrace();
Status status = Status.fromThrowable(t);
logger.info("onError:{}", status);
responseObserver.onCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.navercorp.pinpoint.it.plugin.utils.ExecutorUtils;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.examples.manualflowcontrol.HelloReply;
import io.grpc.examples.manualflowcontrol.HelloRequest;
import io.grpc.examples.manualflowcontrol.StreamingGreeterGrpc;
Expand All @@ -29,6 +30,8 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.Iterator;
Expand All @@ -38,15 +41,14 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

/**
* copy grpc framework
* - https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
*/
public class HelloWorldStreamClient implements HelloWorldClient {

private final Logger logger = Logger.getLogger(this.getClass().getName());
private final Logger logger = LogManager.getLogger(this.getClass().getName());

private final ManagedChannel channel;
private final StreamingGreeterGrpc.StreamingGreeterStub stub;
Expand Down Expand Up @@ -142,14 +144,15 @@ public void run() {

@Override
public void onNext(HelloReply value) {
logger.info("<-- " + value.getMessage());
logger.info("<-- {}", value.getMessage());
// Signal the sender to send one message.
requestStream.request(1);
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
Status status = Status.fromThrowable(t);
logger.info("onError:{}", status);
done.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,22 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

/**
* copy grpc framework
* - https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java
*/
public class HelloWorldStreamServer implements HelloWorldServer {

private final Logger logger = Logger.getLogger(this.getClass().getName());
private final Logger logger = LogManager.getLogger(this.getClass().getName());

private int requestCount;

Expand Down Expand Up @@ -141,7 +142,8 @@ public void onNext(HelloRequest request) {
@Override
public void onError(Throwable t) {
// End the response stream if the client presents an error.
t.printStackTrace();
Status status = Status.fromThrowable(t);
logger.info("onError:{}", status);
responseObserver.onCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.navercorp.pinpoint.it.plugin.utils.ExecutorUtils;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.examples.manualflowcontrol.HelloReply;
import io.grpc.examples.manualflowcontrol.HelloRequest;
import io.grpc.examples.manualflowcontrol.StreamingGreeterGrpc;
Expand All @@ -29,6 +29,8 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -39,15 +41,14 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

/**
* copy grpc framework
* - https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlClient.java
*/
public class HelloWorldStreamClient implements HelloWorldClient {

private final Logger logger = Logger.getLogger(this.getClass().getName());
private final Logger logger = LogManager.getLogger(this.getClass().getName());

private final ManagedChannel channel;
private final StreamingGreeterGrpc.StreamingGreeterStub stub;
Expand All @@ -74,7 +75,7 @@ private static ManagedChannel newChannel(String host, int port, EventLoopGroup e
builder.eventLoopGroup(eventExecutors);
builder.channelType(NioSocketChannel.class);

builder.intercept(MetadataUtils.newCaptureMetadataInterceptor(new AtomicReference<Metadata>(), new AtomicReference<Metadata>()));
builder.intercept(MetadataUtils.newCaptureMetadataInterceptor(new AtomicReference<>(), new AtomicReference<>()));
return builder.build();
}

Expand Down Expand Up @@ -143,14 +144,15 @@ public void run() {

@Override
public void onNext(HelloReply value) {
logger.info("<-- " + value.getMessage());
logger.info("<-- {}", value.getMessage());
// Signal the sender to send one message.
requestStream.request(1);
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
Status status = Status.fromThrowable(t);
logger.info("onError:{}", status);
done.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.grpc.stub.StreamObserver;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -36,15 +38,14 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

/**
* copy grpc framework
* - https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java
*/
public class HelloWorldStreamServer implements HelloWorldServer {

private final Logger logger = Logger.getLogger(this.getClass().getName());
private final Logger logger = LogManager.getLogger(this.getClass().getName());

private int requestCount;

Expand Down Expand Up @@ -141,7 +142,8 @@ public void onNext(HelloRequest request) {
@Override
public void onError(Throwable t) {
// End the response stream if the client presents an error.
t.printStackTrace();
Status status = Status.fromThrowable(t);
logger.info("onError:{}", status);
responseObserver.onCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -35,7 +36,8 @@

@Override
public void onError(Throwable t) {
logger.info("onError. message:{}", t.getMessage(), t);
Status status = Status.fromThrowable(t);
logger.info("onError:{}", status);

Check warning on line 40 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/EmptyStreamObserver.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/EmptyStreamObserver.java#L39-L40

Added lines #L39 - L40 were not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.navercorp.pinpoint.profiler.receiver.grpc;

import com.navercorp.pinpoint.grpc.StatusError;
import com.navercorp.pinpoint.grpc.StatusErrors;
import com.navercorp.pinpoint.grpc.client.SupportCommandCodeClientInterceptor;
import com.navercorp.pinpoint.grpc.trace.PCmdMessage;
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
Expand All @@ -26,6 +24,8 @@
import com.navercorp.pinpoint.profiler.sender.grpc.ReconnectExecutor;
import com.navercorp.pinpoint.profiler.sender.grpc.Reconnector;
import com.navercorp.pinpoint.profiler.sender.grpc.StreamUtils;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -140,12 +140,9 @@ public void onNext(PCmdRequest request) {

@Override
public void onError(Throwable t) {
final StatusError statusError = StatusErrors.throwable(t);
if (statusError.isSimpleError()) {
logger.info("Failed to command stream, cause={}", statusError.getMessage());
} else {
logger.warn("Failed to command stream, cause={}", statusError.getMessage(), statusError.getThrowable());
}
Status status = Status.fromThrowable(t);
Metadata metadata = Status.trailersFromThrowable(t);
logger.info("Failed to command stream, {} {}", status, metadata);

if (requestStream != null) {
requestStream.onError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.TextFormat;
import com.navercorp.pinpoint.grpc.StatusError;
import com.navercorp.pinpoint.grpc.StatusErrors;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.Logger;

Expand All @@ -46,18 +46,10 @@ public void onNext(ResT response) {

@Override
public void onError(Throwable throwable) {
Status status = Status.fromThrowable(throwable);
Metadata metadata = Status.trailersFromThrowable(throwable);
if (logger.isInfoEnabled()) {
final StatusError statusError = StatusErrors.throwable(throwable);
if (statusError.isSimpleError()) {
logger.info("{} Error. requestId={}, cause={}", name, requestId, statusError.getMessage());
} else {
if (logger.isDebugEnabled()) {
logger.debug("{} Error. requestId={}, cause={}", name, requestId, statusError.getMessage(), statusError.getThrowable());
} else {
logger.info("{} Error. requestId={}, cause={}", name, requestId, statusError.getMessage());
}

}
logger.info("{} Error. requestId={}, {} {}", name, requestId, status, metadata);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package com.navercorp.pinpoint.profiler.sender.grpc;

import com.navercorp.pinpoint.grpc.MessageFormatUtils;
import com.navercorp.pinpoint.grpc.StatusError;
import com.navercorp.pinpoint.grpc.StatusErrors;
import com.navercorp.pinpoint.grpc.trace.AgentGrpc;
import com.navercorp.pinpoint.grpc.trace.PPing;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -80,12 +80,11 @@ public void onNext(PPing ping) {

@Override
public void onError(Throwable t) {
final StatusError statusError = StatusErrors.throwable(t);
if (statusError.isSimpleError()) {
logger.info("Failed to ping stream, streamId={}, cause={}", streamId, statusError.getMessage());
} else {
logger.info("Failed to ping stream, streamId={}, cause={}", streamId, statusError.getMessage(), statusError.getThrowable());
}
final Status status = Status.fromThrowable(t);
Metadata metadata = Status.trailersFromThrowable(t);

logger.info("Failed to ping stream, streamId={}, {} {}", streamId, status, metadata);

cancelPingScheduler();
PingStreamContext.this.reconnector.reconnect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.navercorp.pinpoint.profiler.sender.grpc;

import com.navercorp.pinpoint.grpc.StatusError;
import com.navercorp.pinpoint.grpc.StatusErrors;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -65,12 +65,11 @@ public void onNext(ResT value) {

@Override
public void onError(Throwable t) {
final StatusError statusError = StatusErrors.throwable(t);
if (statusError.isSimpleError()) {
logger.info("Failed to stream, name={}, cause={}", listener, statusError.getMessage());
} else {
logger.info("Failed to stream, name={}, cause={}", listener, statusError.getMessage(), statusError.getThrowable());
}
Status status = Status.fromThrowable(t);
Metadata metadata = Status.trailersFromThrowable(t);

logger.info("Failed to stream, name={}, {} {}", listener, status, metadata);

listener.onError(t);
}

Expand Down
Loading