Skip to content

Commit

Permalink
feat: add grpc client interceptor to parse error (#598)
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai authored Apr 20, 2023
1 parent 0604c09 commit 7f8f352
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 38 deletions.
7 changes: 7 additions & 0 deletions client/internal/vanus/net/connection/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
package connection

import (
// standard libraries.
"context"

// third-party libraries.
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

// first-party libraries.
"github.com/vanus-labs/vanus/internal/primitive/interceptor/errinterceptor"
)

func Connect(ctx context.Context, endpoint string) (*grpc.ClientConn, error) {
Expand All @@ -27,6 +33,7 @@ func Connect(ctx context.Context, endpoint string) (*grpc.ClientConn, error) {
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
grpc.WithUnaryInterceptor(errinterceptor.UnaryClientInterceptor()),
}
conn, err := grpc.DialContext(ctx, endpoint, opts...)
if err != nil {
Expand Down
12 changes: 5 additions & 7 deletions client/pkg/eventlog/eventlog_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (l *eventlog) updateReadableSegments(ctx context.Context, rs []*record.Segm
if err != nil {
// FIXME: create or update segment failed
log.Debug().
Uint64("segment", segment.id).
Interface("segment", segment).
Msg("update readable segment failed")
continue
}
Expand Down Expand Up @@ -342,22 +342,20 @@ type logWriter struct {
}

func (w *logWriter) Append(ctx context.Context, events *cloudevents.CloudEventBatch) (offs []int64, err error) {
retryTimes := defaultRetryTimes
for i := 1; i <= retryTimes; i++ {
offsets, err := w.doAppend(ctx, events)
for i := 1; i <= defaultRetryTimes; i++ {
offs, err = w.doAppend(ctx, events)
if err == nil {
return offsets, nil
return offs, nil
}
if !errors.Is(err, errors.ErrSegmentFull) {
log.Error(ctx).Err(err).Msg("log-writer append failed")
return nil, err
}
log.Debug(ctx).
Int("retry_time", i).
Ints64("retry_time", offsets).
Err(err).Msg("log-writer append failed cause segment full")
}
return nil, errors.ErrUnknown
return nil, err
}

func (w *logWriter) Log() Eventlog {
Expand Down
8 changes: 4 additions & 4 deletions internal/gateway/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (cp *ControllerProxy) Publish(ctx context.Context, req *proxypb.PublishRequ
err := checkExtension(e.Attributes)
if err != nil {
responseCode = http.StatusBadRequest
return nil, v2.NewHTTPResult(http.StatusBadRequest, err.Error())
return nil, errors.ErrInvalidArgument.WithMessage(err.Error())
}
if e.Attributes == nil {
e.Attributes = make(map[string]*cloudevents.CloudEvent_CloudEventAttributeValue, 0)
Expand All @@ -208,14 +208,14 @@ func (cp *ControllerProxy) Publish(ctx context.Context, req *proxypb.PublishRequ
Stringer("eventTime", eventTime).
Msg("invalid format of event time")
responseCode = http.StatusBadRequest
return nil, v2.NewHTTPResult(http.StatusBadRequest, "invalid delivery time")
return nil, errors.ErrInvalidArgument.WithMessage("invalid delivery time")
}
eventTime.Attr = &cloudevents.CloudEvent_CloudEventAttributeValue_CeTimestamp{
CeTimestamp: timestamppb.New(t),
}
default:
responseCode = http.StatusBadRequest
return nil, v2.NewHTTPResult(http.StatusBadRequest, "invalid delivery time")
return nil, errors.ErrInvalidArgument.WithMessage("invalid delivery time")
}
tID, err := cp.ctrl.EventbusService().GetSystemEventbusByName(ctx, primitive.TimerEventbusName)
if err != nil {
Expand Down Expand Up @@ -246,7 +246,7 @@ func (cp *ControllerProxy) writeEvents(
log.Warn(ctx).Err(err).
Stringer("eventbus", eventbusID).
Msg("append to failed")
return v2.NewHTTPResult(http.StatusInternalServerError, err.Error())
return err
}
return nil
}
Expand Down
18 changes: 18 additions & 0 deletions internal/primitive/interceptor/errinterceptor/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,21 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return res, errors.ConvertToGRPCError(err)
}
}

func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req interface{},
reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
err := invoker(ctx, method, req, reply, cc, opts...)
if et, ok := errors.FromError(err); ok {
return et
}
return err
}
}
5 changes: 2 additions & 3 deletions pkg/cluster/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/vanus-labs/vanus/pkg/cluster/raw_client"
"github.com/vanus-labs/vanus/pkg/errors"
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"
)
Expand All @@ -32,15 +31,15 @@ type authService struct {
func (a *authService) GetUserRole(ctx context.Context, user string) ([]*metapb.UserRole, error) {
resp, err := a.client.GetUserRole(ctx, &ctrlpb.GetUserRoleRequest{UserIdentifier: user})
if err != nil {
return nil, errors.UnwrapOrUnknown(err)
return nil, err
}
return resp.GetUserRole(), nil
}

func (a *authService) GetUserByToken(ctx context.Context, token string) (string, error) {
user, err := a.client.GetUserByToken(ctx, wrapperspb.String(token))
if err != nil {
return "", errors.UnwrapOrUnknown(err)
return "", err
}
return user.GetValue(), nil
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/cluster/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func (es *eventbusService) GetEventbusByName(ctx context.Context, ns, name strin

pb, err := es.nsSvc.GetNamespaceByName(ctx, ns)
if err != nil {
return nil, errors.UnwrapOrUnknown(err)
return nil, err
}
eb, err := es.client.GetEventbusWithHumanFriendly(ctx, &ctrlpb.GetEventbusWithHumanFriendlyRequest{
NamespaceId: pb.Id,
EventbusName: name,
})
if err != nil {
return nil, errors.UnwrapOrUnknown(err)
return nil, err
}

// es.cache.Store(key, eb) unmask when dirty cache is resolved
Expand All @@ -66,7 +66,7 @@ func (es *eventbusService) GetEventbus(ctx context.Context, id uint64) (*meta.Ev

eb, err := es.client.GetEventbus(ctx, wrapperspb.UInt64(id))
if err != nil {
return nil, errors.UnwrapOrUnknown(err)
return nil, err
}

// es.cache.Store(id, eb) unmask when dirty cache is resolved
Expand All @@ -93,21 +93,20 @@ func (es *eventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, n

nsPb, err := es.nsSvc.GetSystemNamespace(ctx)
if err != nil {
return nil, errors.UnwrapOrUnknown(err)
return nil, err
}

eventbus, err := es.client.CreateSystemEventbus(ctx, &ctrlpb.CreateEventbusRequest{
return es.client.CreateSystemEventbus(ctx, &ctrlpb.CreateEventbusRequest{
Name: name,
LogNumber: int32(defaultSystemEventbusEventlog),
Description: desc,
NamespaceId: nsPb.Id,
})
return eventbus, errors.UnwrapOrUnknown(err)
}

func (es *eventbusService) Delete(ctx context.Context, id uint64) error {
_, err := es.client.DeleteEventbus(ctx, wrapperspb.UInt64(id))
return errors.UnwrapOrUnknown(err)
return err
}

func (es *eventbusService) RawClient() ctrlpb.EventbusControllerClient {
Expand Down
5 changes: 2 additions & 3 deletions pkg/cluster/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/vanus-labs/vanus/pkg/cluster/raw_client"
"github.com/vanus-labs/vanus/pkg/errors"
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"
)
Expand All @@ -43,7 +42,7 @@ func (ns *namespaceService) GetNamespace(ctx context.Context, id uint64) (*metap
}
n, err := ns.client.GetNamespace(ctx, &ctrlpb.GetNamespaceRequest{Id: id})
if err != nil {
return nil, errors.UnwrapOrUnknown(err)
return nil, err
}
// ns.cache.Store(id, n) unmask when dirty cache is resolved
return n, nil
Expand All @@ -64,7 +63,7 @@ func (ns *namespaceService) GetNamespaceByName(ctx context.Context, name string)
}
n, err := ns.client.GetNamespaceWithHumanFriendly(ctx, wrapperspb.String(name))
if err != nil {
return nil, errors.UnwrapOrUnknown(err)
return nil, err
}
// ns.cache.Store(name, n) unmask when dirty cache is resolved
return n, nil
Expand Down
14 changes: 8 additions & 6 deletions pkg/cluster/raw_client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package raw_client
import (
"context"
"fmt"
"github.com/vanus-labs/vanus/observability/log"
"os"
"strconv"
"sync"
Expand All @@ -30,9 +29,10 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"

"github.com/vanus-labs/vanus/internal/primitive/interceptor/errinterceptor"
"github.com/vanus-labs/vanus/observability/log"
"github.com/vanus-labs/vanus/pkg/errors"
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
)

const (
Expand Down Expand Up @@ -159,9 +159,11 @@ func (c *Conn) getGRPCConn(ctx context.Context, addr string) *grpc.ClientConn {
_ = conn.Close() // make sure it's closed
}

var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(c.credentials))
opts = append(opts, grpc.WithBlock())
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(c.credentials),
grpc.WithUnaryInterceptor(errinterceptor.UnaryClientInterceptor()),
}
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
conn, err = grpc.DialContext(ctx, addr, opts...)
Expand Down
4 changes: 1 addition & 3 deletions pkg/cluster/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"

"github.com/vanus-labs/vanus/pkg/cluster/raw_client"
"github.com/vanus-labs/vanus/pkg/errors"
)

type triggerService struct {
Expand All @@ -28,6 +27,5 @@ func (es *triggerService) RegisterHeartbeat(ctx context.Context, interval time.D
}

func (es *triggerService) GetSubscription(ctx context.Context, id uint64) (*metapb.Subscription, error) {
subscription, err := es.client.GetSubscription(ctx, &ctrlpb.GetSubscriptionRequest{Id: id})
return subscription, errors.UnwrapOrUnknown(err)
return es.client.GetSubscription(ctx, &ctrlpb.GetSubscriptionRequest{Id: id})
}
4 changes: 3 additions & 1 deletion pkg/errors/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package errors

import (
stderr "errors"

"github.com/pkg/errors"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -96,7 +98,7 @@ func ConvertToGRPCError(err error) error {
}
e, ok := err.(*ErrorType)
if ok {
return errors.New(e.JSON())
return stderr.New(e.JSON())
}
return err
}
10 changes: 6 additions & 4 deletions vsctl/command/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ func Error(err error) string {
if err == nil {
return ""
}
e, _ := errors.FromError(err)
if e.Message == "" {
return e.Description
if et, ok := errors.FromError(err); ok {
if et.Message == "" {
return et.Description
}
return et.Message
}
return e.Message
return err.Error()
}

func formatID(id uint64) string {
Expand Down

0 comments on commit 7f8f352

Please sign in to comment.