Skip to content

Commit

Permalink
Introduce interceptor to log client errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mkysel committed Dec 18, 2024
1 parent 7efb028 commit 3f160d3
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"github.com/xmtp/xmtpd/pkg/interceptors/client"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -61,6 +62,8 @@ func NewAPIServer(
prometheus.EnableHandlingTimeHistogram()
})

loggingInterceptor := client.NewLoggingInterceptor(log)

unary := []grpc.UnaryServerInterceptor{prometheus.UnaryServerInterceptor}
stream := []grpc.StreamServerInterceptor{prometheus.StreamServerInterceptor}

Expand All @@ -75,6 +78,9 @@ func NewAPIServer(
PermitWithoutStream: true,
MinTime: 15 * time.Second,
}),
grpc.ChainUnaryInterceptor(loggingInterceptor.Unary()),
grpc.ChainStreamInterceptor(loggingInterceptor.Stream()),

// grpc.MaxRecvMsgSize(s.Config.Options.MaxMsgSize),
}

Expand Down
80 changes: 80 additions & 0 deletions pkg/interceptors/client/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package client

import (
"context"
"go.uber.org/zap"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/status"
)

// LoggingInterceptor logs errors for unary and stream RPCs.
type LoggingInterceptor struct {
logger *zap.Logger
}

// NewLoggingInterceptor creates a new instance of LoggingInterceptor.
func NewLoggingInterceptor(logger *zap.Logger) *LoggingInterceptor {
if logger == nil {
panic("logger is required")
}

return &LoggingInterceptor{
logger: logger,
}
}

// Unary intercepts unary RPC calls to log errors.
func (i *LoggingInterceptor) Unary() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req) // Call the actual RPC handler
duration := time.Since(start)

if err != nil {
st, _ := status.FromError(err)
i.logger.Error(
"Client Unary RPC Error",
zap.String("method", info.FullMethod),
zap.Duration("duration", duration),
zap.Any("code", st.Code()),
zap.String("message", st.Message()),
)
}

return resp, err
}
}

// Stream intercepts stream RPC calls to log errors.
func (i *LoggingInterceptor) Stream() grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
start := time.Now()
err := handler(srv, ss) // Call the actual stream handler
duration := time.Since(start)

if err != nil {
st, _ := status.FromError(err)
i.logger.Error(
"Stream Client RPC Error",
zap.String("method", info.FullMethod),
zap.Duration("duration", duration),
zap.Any("code", st.Code()),
zap.String("message", st.Message()),
)
}

return err
}
}
106 changes: 106 additions & 0 deletions pkg/interceptors/client/logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package client

import (
"context"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"testing"
)

func createTestLogger() (*zap.Logger, *observer.ObservedLogs) {
core, observedLogs := observer.New(zapcore.DebugLevel)
logger := zap.New(core)
return logger, observedLogs
}

type mockServerStream struct {
grpc.ServerStream
}

func (m *mockServerStream) Context() context.Context {
return context.Background()
}

func TestUnaryLoggingInterceptor(t *testing.T) {
logger, logs := createTestLogger()

interceptor := NewLoggingInterceptor(logger).Unary()
// Mock unary handler
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, status.Errorf(codes.Internal, "mock internal error")
}

ctx := context.Background()
info := &grpc.UnaryServerInfo{
FullMethod: "/test.TestService/TestMethod",
}
req := struct{}{}

// Call the interceptor
_, err := interceptor(ctx, req, info, handler)

require.Error(t, err)
require.Equal(t, 1, logs.Len(), "expected one log entry but got none")

logEntry := logs.All()[0]

require.Equal(t, zapcore.ErrorLevel, logEntry.Level, "expected log level 'Error'")
require.Contains(t, logEntry.ContextMap(), "method")
require.Equal(
t,
"/test.TestService/TestMethod",
logEntry.ContextMap()["method"],
"expected log to contain correct method",
)
require.Contains(t, logEntry.ContextMap(), "message")
require.Equal(
t,
"mock internal error",
logEntry.ContextMap()["message"],
"expected log to contain correct error message",
)
}
func TestStreamLoggingInterceptor(t *testing.T) {
logger, logs := createTestLogger()
interceptor := NewLoggingInterceptor(logger).Stream()

// Mock stream handler
handler := func(srv interface{}, ss grpc.ServerStream) error {
return status.Errorf(codes.NotFound, "mock stream error")
}

info := &grpc.StreamServerInfo{
FullMethod: "/test.TestService/TestStream",
}

stream := &mockServerStream{}

// Call the interceptor
err := interceptor(nil, stream, info, handler)

require.Error(t, err)
require.Equal(t, 1, logs.Len(), "expected one log entry but got none")

logEntry := logs.All()[0]

require.Equal(t, zapcore.ErrorLevel, logEntry.Level, "expected log level 'Error'")
require.Contains(t, logEntry.ContextMap(), "method")
require.Equal(
t,
"/test.TestService/TestStream",
logEntry.ContextMap()["method"],
"expected log to contain correct method",
)
require.Contains(t, logEntry.ContextMap(), "message")
require.Equal(
t,
"mock stream error",
logEntry.ContextMap()["message"],
"expected log to contain correct error message",
)
}

0 comments on commit 3f160d3

Please sign in to comment.