diff --git a/ASCII.go b/ASCII.go index ae87d57..9c64e9a 100644 --- a/ASCII.go +++ b/ASCII.go @@ -1,16 +1,25 @@ -package go_kitx +package kitx -const ( - version = "v0.0.1" - - logo = ` - - ██████╗ ██████╗ ██╗ ██╗██╗████████╗██╗ ██╗ -██╔════╝ ██╔═══██╗ ██║ ██╔╝██║╚══██╔══╝╚██╗██╔╝ -██║ ███╗██║ ██║█████╗█████╔╝ ██║ ██║ ╚███╔╝ -██║ ██║██║ ██║╚════╝██╔═██╗ ██║ ██║ ██╔██╗ -╚██████╔╝╚██████╔╝ ██║ ██╗██║ ██║ ██╔╝ ██╗ - ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═╝ ╚═╝ +import "fmt" +const ( + version = "v0.0.2" + logo = ` + _ _ + | | (_) _ + ____ ___ _____ | | _ _ _| |_ _ _ + / _ | / _ \ (_____) | |_/ )| |(_ _)( \ / ) + ( (_| || |_| | | _ ( | | | |_ ) X ( + \___ | \___/ |_| \_)|_| \__)(_/ \_) + (_____| ` ) + +func startingPrint(id, name string) { + fmt.Printf("%s \n", logo) + fmt.Printf("\x1b[%dmKitx Version: %s\x1b[0m \n", 36, version) + fmt.Printf("\x1b[%dmApp ID: %s\x1b[0m \n", 36, id) + fmt.Printf("\x1b[%dmApp Name: %s\x1b[0m \n", 36, name) + fmt.Printf("\x1b[%dmStarting App ...\x1b[0m \n", 34) + fmt.Println("") +} diff --git a/ASCII_test.go b/ASCII_test.go index 716767c..9ce5ae5 100644 --- a/ASCII_test.go +++ b/ASCII_test.go @@ -1,7 +1,53 @@ -package go_kitx +package kitx -import "testing" +import ( + "context" + "sync" + "testing" + "time" + + "github.com/sado0823/go-kitx/transport/grpc" + "github.com/sado0823/go-kitx/transport/http" +) func Test_New(t *testing.T) { t.Log(logo) } + +func Test_NewApp(t *testing.T) { + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + hs := http.NewServer(http.WithServerAddress("0.0.0.0:7001")) + gs := grpc.NewServer(grpc.WithServerAddress("0.0.0.0:7002")) + + app := New( + WithName("demo.app"), + WithVersion("v0.0.00001"), + WithMetadata(map[string]string{}), + WithServer(hs, gs), + ) + + wg.Done() + err := app.Run() + if err != nil { + t.Log(err) + return + } + }() + wg.Wait() + time.Sleep(time.Second) + + client, err := grpc.DialInsecure(context.Background(), + grpc.WithClientEndpoint("direct:///0.0.0.0:7002,0.0.0.0:7001"), + ) + if err != nil { + t.Fatal(err) + } + t.Log(client.Target()) + + err = client.Invoke(context.Background(), "/abc", 1, map[string]interface{}{}) + t.Log(err) +} diff --git a/app.go b/app.go new file mode 100644 index 0000000..b493a51 --- /dev/null +++ b/app.go @@ -0,0 +1,201 @@ +package kitx + +import ( + "context" + "errors" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/sado0823/go-kitx/kit/registry" + "github.com/sado0823/go-kitx/transport" + + "golang.org/x/sync/errgroup" +) + +type ( + AppI interface { + ID() string + Name() string + Version() string + Metadata() map[string]string + Endpoint() []string + } + + App struct { + ctxWithCancel context.Context + opt *option + ctxCancel context.CancelFunc + lock sync.Mutex + registrySvc *registry.Service + } + + Option func(o *option) + + appKey struct{} +) + +func NewContext(ctx context.Context, s AppI) context.Context { + return context.WithValue(ctx, appKey{}, s) +} + +func FromContext(ctx context.Context) (s AppI, ok bool) { + s, ok = ctx.Value(appKey{}).(AppI) + return +} + +func New(opts ...Option) *App { + opt := &option{ + ctx: context.Background(), + signals: []os.Signal{syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT}, + registrarTimeout: time.Second * 5, + stopTimeout: time.Second * 10, + servers: nil, + } + for _, o := range opts { + o(opt) + } + + opt.Fix() + + ctx, cancelFunc := context.WithCancel(opt.ctx) + + return &App{ + ctxWithCancel: ctx, + opt: opt, + ctxCancel: cancelFunc, + } +} + +func (a *App) ID() string { + return a.opt.id +} + +func (a *App) Name() string { + return a.opt.name +} + +func (a *App) Version() string { + return a.opt.version +} + +func (a *App) Metadata() map[string]string { + return a.opt.metadata +} + +func (a *App) Endpoint() []string { + if a.registrySvc != nil { + return a.registrySvc.Endpoints + } + return []string{} +} + +func (a *App) Stop() error { + a.lock.Lock() + svc := a.registrySvc + a.lock.Unlock() + + if a.opt.registrar != nil && svc != nil { + ctx, cancel := context.WithTimeout(NewContext(a.ctxWithCancel, a), a.opt.registrarTimeout) + defer cancel() + if err := a.opt.registrar.Deregister(ctx, svc); err != nil { + return err + } + } + + if a.ctxCancel != nil { + a.ctxCancel() + } + + return nil +} + +func (a *App) Run() error { + registrySvc, err := a.genRegistrySvc() + if err != nil { + return err + } + + a.lock.Lock() + a.registrySvc = registrySvc + a.lock.Unlock() + + startingPrint(a.ID(), a.Name()) + + var ( + eg, ctxWithApp = errgroup.WithContext(NewContext(a.ctxWithCancel, a)) + wg sync.WaitGroup + ) + + for _, server := range a.opt.servers { + server := server + // server stop go-routine + eg.Go(func() error { + <-ctxWithApp.Done() // ctx will be canceled when app stop + stopCtx, stopCancel := context.WithTimeout(NewContext(a.opt.ctx, a), a.opt.stopTimeout) + defer stopCancel() + return server.Stop(stopCtx) + }) + wg.Add(1) + // server start go-routine + eg.Go(func() error { + wg.Done() + return server.Start(NewContext(a.opt.ctx, a)) + }) + } + // wait all server started + wg.Wait() + + // use registry + if a.opt.registrar != nil { + regisCtx, regisCancel := context.WithTimeout(ctxWithApp, a.opt.registrarTimeout) + defer regisCancel() + return a.opt.registrar.Register(regisCtx, a.registrySvc) + } + + // wait signals for stop + sig := make(chan os.Signal, 1) + signal.Notify(sig, a.opt.signals...) + eg.Go(func() error { + select { + case <-ctxWithApp.Done(): + return nil + case <-sig: + return a.Stop() + } + }) + + err = eg.Wait() + if err != nil && !errors.Is(err, context.Canceled) { + return err + } + + return nil +} + +func (a *App) genRegistrySvc() (*registry.Service, error) { + endpoints := make([]string, 0, len(a.opt.endpoints)) + for _, endpoint := range a.opt.endpoints { + endpoints = append(endpoints, endpoint.String()) + } + if len(endpoints) == 0 { + for _, server := range a.opt.servers { + if endpoint, ok := server.(transport.Endpointer); ok { + url, err := endpoint.Endpoint() + if err != nil { + return nil, err + } + endpoints = append(endpoints, url.String()) + } + } + } + return ®istry.Service{ + ID: a.opt.id, + Name: a.opt.name, + Version: a.opt.version, + Metadata: a.opt.metadata, + Endpoints: endpoints, + }, nil +} diff --git a/example/main.go b/example/main.go index 39fbe2e..80942e4 100644 --- a/example/main.go +++ b/example/main.go @@ -3,6 +3,7 @@ package main import ( "context" + "github.com/sado0823/go-kitx" "github.com/sado0823/go-kitx/kit/log" logrusV "github.com/sado0823/go-kitx/plugin/logger/logrus" zapV "github.com/sado0823/go-kitx/plugin/logger/zap" @@ -12,6 +13,7 @@ import ( ) func init() { + kitx.New() v := logrus.New() v.Level = logrus.DebugLevel logger := logrusV.New(v) diff --git a/go.mod b/go.mod index 8e92ba7..0b04273 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/alicebob/miniredis/v2 v2.23.0 github.com/go-redis/redis/v8 v8.11.5 github.com/goccy/go-graphviz v0.0.9 + github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9 // indirect golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 diff --git a/go.sum b/go.sum index 082e07e..48710e5 100644 --- a/go.sum +++ b/go.sum @@ -72,6 +72,8 @@ github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= diff --git a/kit/log/log.go b/kit/log/log.go index 86611c6..0b7438f 100644 --- a/kit/log/log.go +++ b/kit/log/log.go @@ -29,15 +29,15 @@ func (l *logger) Log(level Level, kvs ...interface{}) error { } // WithFields add new fields to the logger -func WithFields(l Logger, fields ...interface{}) Logger { +func WithFields(l Logger, kvs ...interface{}) Logger { from, ok := l.(*logger) if !ok { - return &logger{ctx: context.Background(), internal: l, prefix: fields, hasValuer: containValuer(fields)} + return &logger{ctx: context.Background(), internal: l, prefix: kvs, hasValuer: containValuer(kvs)} } - fullKvs := make([]interface{}, 0, len(fields)+len(from.prefix)) + fullKvs := make([]interface{}, 0, len(kvs)+len(from.prefix)) fullKvs = append(fullKvs, from.prefix...) - fullKvs = append(fullKvs, fields...) + fullKvs = append(fullKvs, kvs...) return &logger{ctx: from.ctx, internal: from.internal, prefix: fullKvs, hasValuer: containValuer(fullKvs)} } diff --git a/kit/ratelimit/bbr.go b/kit/ratelimit/bbr.go index f909b14..c6992ab 100644 --- a/kit/ratelimit/bbr.go +++ b/kit/ratelimit/bbr.go @@ -3,11 +3,11 @@ package ratelimit import ( "errors" "fmt" - "github.com/sado0823/go-kitx/kit/log" "math" "sync/atomic" "time" + "github.com/sado0823/go-kitx/kit/log" "github.com/sado0823/go-kitx/pkg/atomicx" rollingwindow "github.com/sado0823/go-kitx/pkg/rollingwindow/v2" "github.com/sado0823/go-kitx/pkg/syncx" diff --git a/kit/registry/itf.go b/kit/registry/interface.go similarity index 100% rename from kit/registry/itf.go rename to kit/registry/interface.go diff --git a/kit/rule/parser.go b/kit/rule/parser.go index 2eb8a36..fa536c7 100644 --- a/kit/rule/parser.go +++ b/kit/rule/parser.go @@ -5,22 +5,15 @@ import ( "fmt" "go/scanner" "go/token" - "io" - "log" - "os" "strings" - "time" + + "github.com/sado0823/go-kitx/kit/log" ) var ( - logger = log.New(os.Stdout, fmt.Sprintf("[DEBUG][pkg=rule][%s] ", time.Now().Format(time.StampMilli)), log.Lshortfile) + logger = log.NewHelper(log.WithFields(log.GetGlobal(), "pkg", "rule")) ) -func init() { - logger.SetFlags(0) - logger.SetOutput(io.Discard) -} - type ( Parser struct { ctx context.Context @@ -122,7 +115,7 @@ func (p *Parser) read() ([]Token, error) { ) for { pos, tok, lit := p.sc.Scan() - logger.Printf("pos_o=%d pos=%s\t token=%#v\t token_str=%q\t lit=%q\n", pos, p.fSet.Position(pos), tok, tok.String(), lit) + logger.Debugf("pos_o=%d pos=%s\t token=%#v\t token_str=%q\t lit=%q\n", pos, p.fSet.Position(pos), tok, tok.String(), lit) if tok == token.EOF { if !beforeToken.CanEOF() { return nil, fmt.Errorf("%s can NOT be last", beforeToken.String()) diff --git a/kit/rule/parser_test.go b/kit/rule/parser_test.go index c490e97..e816b47 100644 --- a/kit/rule/parser_test.go +++ b/kit/rule/parser_test.go @@ -27,9 +27,9 @@ func Test_New(t *testing.T) { Hobby: nil, } parser, err := New(context.Background(), "Name", WithCustomFn("test", func(evalParam interface{}, arguments ...interface{}) (interface{}, error) { - logger.Println("i am test func") - logger.Println("evalParam: ", evalParam) - logger.Println(arguments...) + t.Log("i am test func") + t.Log("evalParam: ", evalParam) + t.Log(arguments...) return true, nil })) if err != nil { @@ -39,7 +39,7 @@ func Test_New(t *testing.T) { if err != nil { panic(err) } - logger.Printf("res=%v\t type=%T\t err=%+v \n", res, res, err) + t.Logf("res=%v\t type=%T\t err=%+v \n", res, res, err) } func Test_Do(t *testing.T) { @@ -56,5 +56,5 @@ func Test_Do(t *testing.T) { if err != nil { panic(err) } - logger.Printf("res=%v\t type=%T\t err=%+v \n", res, res, err) + t.Logf("res=%v\t type=%T\t err=%+v \n", res, res, err) } diff --git a/kit/rule/token_interface.go b/kit/rule/token_interface.go index a57e7d4..1386801 100644 --- a/kit/rule/token_interface.go +++ b/kit/rule/token_interface.go @@ -159,7 +159,7 @@ func (t *tokenFunc) SymbolFn() SymbolFn { return wrap2Float(fn(param)) } - logger.Printf("func right %v, %T \n", right, right) + logger.Debugf("func right %v, %T \n", right, right) params := make([]interface{}, 0) diff --git a/option.go b/option.go new file mode 100644 index 0000000..65825c4 --- /dev/null +++ b/option.go @@ -0,0 +1,104 @@ +package kitx + +import ( + "context" + "fmt" + "net/url" + "os" + "time" + + "github.com/sado0823/go-kitx/kit/log" + "github.com/sado0823/go-kitx/kit/registry" + "github.com/sado0823/go-kitx/transport" + + "github.com/google/uuid" +) + +type ( + option struct { + id string + name string + version string + metadata map[string]string + endpoints []*url.URL + ctx context.Context + signals []os.Signal + logger log.Logger + registrar registry.Registrar + registrarTimeout time.Duration + stopTimeout time.Duration + servers []transport.Server + } +) + +func (opt *option) Fix() { + if opt.id == "" { + if newUUID, err := uuid.NewUUID(); err == nil { + opt.id = newUUID.String() + } else { + opt.id = fmt.Sprintf("kitx.app.id.%d", time.Now().UnixNano()) + } + } + + if opt.name == "" { + opt.name = "kitx.app.name.unknown" + } + + if opt.logger != nil { + log.SetGlobal(opt.logger) + } + +} + +// WithID id should be unique +func WithID(id string) Option { + return func(o *option) { o.id = id } +} + +func WithName(name string) Option { + return func(o *option) { o.name = name } +} + +func WithVersion(version string) Option { + return func(o *option) { o.version = version } +} + +func WithMetadata(md map[string]string) Option { + return func(o *option) { o.metadata = md } +} + +func WithEndpoint(endpoints ...*url.URL) Option { + return func(o *option) { o.endpoints = endpoints } +} + +func WithContext(ctx context.Context) Option { + return func(o *option) { o.ctx = ctx } +} + +func WithLogger(logger log.Logger) Option { + return func(o *option) { o.logger = logger } +} + +func WithServer(srv ...transport.Server) Option { + return func(o *option) { o.servers = srv } +} + +// WithSignal with exit signals. +func WithSignal(sigs ...os.Signal) Option { + return func(o *option) { o.signals = sigs } +} + +// WithRegistrar with service registry. +func WithRegistrar(r registry.Registrar) Option { + return func(o *option) { o.registrar = r } +} + +// WithRegistrarTimeout with registrar timeout. +func WithRegistrarTimeout(t time.Duration) Option { + return func(o *option) { o.registrarTimeout = t } +} + +// WithStopTimeout with app stop timeout. +func WithStopTimeout(t time.Duration) Option { + return func(o *option) { o.stopTimeout = t } +} diff --git a/plugin/registry/etcd/registry.go b/plugin/registry/etcd/registry.go index dac8c62..1be4d1f 100644 --- a/plugin/registry/etcd/registry.go +++ b/plugin/registry/etcd/registry.go @@ -8,6 +8,7 @@ import ( "github.com/sado0823/go-kitx/kit/log" "github.com/sado0823/go-kitx/kit/registry" + clientv3 "go.etcd.io/etcd/client/v3" ) @@ -61,7 +62,7 @@ type Registry struct { func New(client *clientv3.Client, opts ...Option) *Registry { op := &options{ ctx: context.Background(), - namespace: "/microservices", + namespace: "/kitx-services", ttl: time.Second * 15, maxRetry: 5, } diff --git a/transport/grpc/balancer/p2c/p2c.go b/transport/grpc/balancer/p2c/p2c.go index bf175ef..2f4bf44 100644 --- a/transport/grpc/balancer/p2c/p2c.go +++ b/transport/grpc/balancer/p2c/p2c.go @@ -2,15 +2,14 @@ package p2c import ( "fmt" - "log" "math" "math/rand" - "os" "strings" "sync" "sync/atomic" "time" + "github.com/sado0823/go-kitx/kit/log" "github.com/sado0823/go-kitx/pkg/atomicx" "google.golang.org/grpc/balancer" @@ -34,8 +33,8 @@ const ( ) var ( - logger = log.New(os.Stdout, fmt.Sprintf("[DEBUG][pkg=p2c][%s] ", time.Now().Format(time.StampMilli)), log.Lshortfile) initTime = time.Now().AddDate(-1, -1, -1) + logger = log.NewHelper(log.WithFields(log.GetGlobal(), "pkg", "p2c")) ) func newBuilder() balancer.Builder { @@ -50,7 +49,7 @@ type p2cPickBuilder struct { } func (p *p2cPickBuilder) Build(info base.PickerBuildInfo) balancer.Picker { - logger.Printf("p2cPickBuilder: Build called with info: %v", info) + logger.Debugf("p2cPickBuilder: Build called with info: %+v", info) if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } @@ -184,7 +183,7 @@ func (p *p2cPicker) logStats() { conn.addr.Addr, conn.load(), atomic.SwapInt64(&conn.requests, 0))) } - logger.Printf("%s", strings.Join(stats, "; ")) + logger.Debugf("%s", strings.Join(stats, "; ")) } func (p *p2cPicker) choose(c1, c2 *p2cSubConn) *p2cSubConn { diff --git a/transport/grpc/client.go b/transport/grpc/client.go index 7987f64..b87159e 100644 --- a/transport/grpc/client.go +++ b/transport/grpc/client.go @@ -4,16 +4,17 @@ import ( "context" "crypto/tls" "fmt" - "google.golang.org/grpc/credentials" "time" "github.com/sado0823/go-kitx/kit/middleware" "github.com/sado0823/go-kitx/kit/registry" "github.com/sado0823/go-kitx/transport" "github.com/sado0823/go-kitx/transport/grpc/balancer/p2c" + _ "github.com/sado0823/go-kitx/transport/grpc/resolver/direct" "github.com/sado0823/go-kitx/transport/grpc/resolver/discovery" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" grpcinsecure "google.golang.org/grpc/credentials/insecure" grpcmd "google.golang.org/grpc/metadata" ) @@ -91,6 +92,7 @@ func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.Clien for _, op := range opts { op(opt) } + ints := []grpc.UnaryClientInterceptor{ unaryClientInterceptor(opt.middleware, opt.timeout), } diff --git a/transport/grpc/resolver/direct/builder.go b/transport/grpc/resolver/direct/builder.go index e8e03eb..920ac78 100644 --- a/transport/grpc/resolver/direct/builder.go +++ b/transport/grpc/resolver/direct/builder.go @@ -3,6 +3,8 @@ package direct import ( "strings" + "github.com/sado0823/go-kitx/kit/log" + googleResolver "google.golang.org/grpc/resolver" ) @@ -10,6 +12,10 @@ const ( Scheme = "direct" ) +func init() { + googleResolver.Register(NewBuilder()) +} + type builder struct{} // NewBuilder example direct:///127.0.0.1:9000,127.0.0.2:9000 @@ -19,6 +25,7 @@ func NewBuilder() googleResolver.Builder { func (b *builder) Build(target googleResolver.Target, cc googleResolver.ClientConn, opts googleResolver.BuildOptions) (googleResolver.Resolver, error) { addrs := make([]googleResolver.Address, 0) + log.Infof("direct build:%s", target.URL.Path) paths := strings.Split(strings.TrimPrefix(target.URL.Path, "/"), ",") for _, path := range paths { diff --git a/transport/grpc/resolver/discovery/builder.go b/transport/grpc/resolver/discovery/builder.go index 592d852..729baee 100644 --- a/transport/grpc/resolver/discovery/builder.go +++ b/transport/grpc/resolver/discovery/builder.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/sado0823/go-kitx/kit/log" "github.com/sado0823/go-kitx/kit/registry" googleResolver "google.golang.org/grpc/resolver" @@ -37,6 +38,7 @@ func WithInsecure(insecure bool) Option { } } +// NewBuilder example discovery:///name func NewBuilder(dis registry.Discovery, opts ...Option) googleResolver.Builder { b := &builder{ discovery: dis, @@ -56,6 +58,8 @@ func (b *builder) Build(target googleResolver.Target, cc googleResolver.ClientCo w registry.Watcher }{} + log.Infof("discovery build:%s", target.URL.Path) + done := make(chan struct{}, 1) ctx, cancelFunc := context.WithCancel(context.Background()) go func() {