Skip to content

Commit

Permalink
Add go-micro adapter (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhou-Haowei authored Sep 1, 2020
1 parent 1a987a8 commit a28b746
Show file tree
Hide file tree
Showing 10 changed files with 1,013 additions and 0 deletions.
81 changes: 81 additions & 0 deletions adapter/go_micro/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package go_micro

import (
"context"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/micro/go-micro/v2/client"
)

type clientWrapper struct {
client.Client
Opts []Option
}

func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
resourceName := req.Method()
options := evaluateOptions(c.Opts)

if options.clientResourceExtract != nil {
resourceName = options.clientResourceExtract(ctx, req)
}

entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
)

if blockErr != nil {
if options.clientBlockFallback != nil {
return options.clientBlockFallback(ctx, req, blockErr)
}
return blockErr
}
defer entry.Exit()

err := c.Client.Call(ctx, req, rsp, opts...)
if err != nil {
sentinel.TraceError(entry, err)
}

return err
}

func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
options := evaluateOptions(c.Opts)
resourceName := req.Method()

if options.streamClientResourceExtract != nil {
resourceName = options.streamClientResourceExtract(ctx, req)
}

entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
)

if blockErr != nil {
if options.streamClientBlockFallback != nil {
return options.streamClientBlockFallback(ctx, req, blockErr)
}
return nil, blockErr
}
defer entry.Exit()

stream, err := c.Client.Stream(ctx, req, opts...)
if err != nil {
sentinel.TraceError(entry, err)
}

return stream, err
}

// NewClientWrapper returns a sentinel client Wrapper.
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
return &clientWrapper{c, opts}
}
}
69 changes: 69 additions & 0 deletions adapter/go_micro/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package go_micro

import (
"context"
"errors"
"log"
"testing"

"github.com/alibaba/sentinel-golang/adapter/go_micro/proto"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/micro/go-micro/v2/client"
"github.com/micro/go-micro/v2/client/selector"
"github.com/micro/go-micro/v2/registry/memory"
"github.com/stretchr/testify/assert"
)

func TestClientLimiter(t *testing.T) {
// setup
r := memory.NewRegistry()
s := selector.NewSelector(selector.Registry(r))

c := client.NewClient(
// set the selector
client.Selector(s),
// add the breaker wrapper
client.Wrap(NewClientWrapper(
// add custom fallback function to return a fake error for assertion
WithClientBlockFallback(
func(ctx context.Context, request client.Request, blockError *base.BlockError) error {
return errors.New(FakeErrorMsg)
}),
)),
)

req := c.NewRequest("sentinel.test.server", "Test.Ping", &proto.Request{}, client.WithContentType("application/json"))

err := sentinel.InitDefault()
if err != nil {
log.Fatal(err)
}

rsp := &proto.Response{}

t.Run("success", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: req.Method(),
MetricType: flow.QPS,
Count: 1,
ControlBehavior: flow.Reject,
},
})
assert.Nil(t, err)
err = c.Call(context.TODO(), req, rsp)
// No server started, the return err should not be nil
assert.NotNil(t, err)
assert.NotEqual(t, FakeErrorMsg, err.Error())
assert.EqualValues(t, 1, int(stat.GetResourceNode(req.Method()).GetQPS(base.MetricEventPass)))

t.Run("second fail", func(t *testing.T) {
err := c.Call(context.TODO(), req, rsp)
assert.EqualError(t, err, FakeErrorMsg)
assert.EqualValues(t, 1, int(stat.GetResourceNode(req.Method()).GetQPS(base.MetricEventPass)))
})
})
}
93 changes: 93 additions & 0 deletions adapter/go_micro/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package go_micro

import (
"context"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/micro/go-micro/v2/client"
"github.com/micro/go-micro/v2/server"
)

type (
Option func(*options)

options struct {
clientResourceExtract func(context.Context, client.Request) string
serverResourceExtract func(context.Context, server.Request) string

streamClientResourceExtract func(context.Context, client.Request) string
streamServerResourceExtract func(server.Stream) string

clientBlockFallback func(context.Context, client.Request, *base.BlockError) error
serverBlockFallback func(context.Context, server.Request, *base.BlockError) error

streamClientBlockFallback func(context.Context, client.Request, *base.BlockError) (client.Stream, error)
streamServerBlockFallback func(server.Stream, *base.BlockError) server.Stream
}
)

// WithUnaryClientResourceExtractor sets the resource extractor of unary client request.
// The second string parameter is the full method name of current invocation.
func WithClientResourceExtractor(fn func(context.Context, client.Request) string) Option {
return func(opts *options) {
opts.clientResourceExtract = fn
}
}

// WithUnaryServerResourceExtractor sets the resource extractor of unary server request.
func WithServerResourceExtractor(fn func(context.Context, server.Request) string) Option {
return func(opts *options) {
opts.serverResourceExtract = fn
}
}

// WithStreamClientResourceExtractor sets the resource extractor of stream client request.
func WithStreamClientResourceExtractor(fn func(context.Context, client.Request) string) Option {
return func(opts *options) {
opts.streamClientResourceExtract = fn
}
}

// WithStreamServerResourceExtractor sets the resource extractor of stream server request.
func WithStreamServerResourceExtractor(fn func(server.Stream) string) Option {
return func(opts *options) {
opts.streamServerResourceExtract = fn
}
}

// WithUnaryClientBlockFallback sets the block fallback handler of unary client request.
// The second string parameter is the full method name of current invocation.
func WithClientBlockFallback(fn func(context.Context, client.Request, *base.BlockError) error) Option {
return func(opts *options) {
opts.clientBlockFallback = fn
}
}

// WithUnaryServerBlockFallback sets the block fallback handler of unary server request.
func WithServerBlockFallback(fn func(context.Context, server.Request, *base.BlockError) error) Option {
return func(opts *options) {
opts.serverBlockFallback = fn
}
}

// WithStreamClientBlockFallback sets the block fallback handler of stream client request.
func WithStreamClientBlockFallback(fn func(context.Context, client.Request, *base.BlockError) (client.Stream, error)) Option {
return func(opts *options) {
opts.streamClientBlockFallback = fn
}
}

// WithStreamServerBlockFallback sets the block fallback handler of stream server request.
func WithStreamServerBlockFallback(fn func(server.Stream, *base.BlockError) server.Stream) Option {
return func(opts *options) {
opts.streamServerBlockFallback = fn
}
}

func evaluateOptions(opts []Option) *options {
optCopy := &options{}
for _, o := range opts {
o(optCopy)
}
return optCopy
}
113 changes: 113 additions & 0 deletions adapter/go_micro/proto/test.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a28b746

Please sign in to comment.