Skip to content

Commit

Permalink
add grpc and http transport
Browse files Browse the repository at this point in the history
  • Loading branch information
sado committed Oct 9, 2022
1 parent 39061d2 commit e8a2040
Show file tree
Hide file tree
Showing 18 changed files with 414 additions and 43 deletions.
33 changes: 21 additions & 12 deletions ASCII.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package go_kitx
package kitx

const (
version = "v0.0.1"

logo = `
██████╗ ██████╗ ██╗ ██╗██╗████████╗██╗ ██╗
██╔════╝ ██╔═══██╗ ██║ ██╔╝██║╚══██╔══╝╚██╗██╔╝
██║ ███╗██║ ██║█████╗█████╔╝ ██║ ██║ ╚███╔╝
██║ ██║██║ ██║╚════╝██╔═██╗ ██║ ██║ ██╔██╗
╚██████╔╝╚██████╔╝ ██║ ██╗██║ ██║ ██╔╝ ██╗
╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═╝ ╚═╝
import "fmt"

const (
version = "v0.0.2"
logo = `
_ _
| | (_) _
____ ___ _____ | | _ _ _| |_ _ _
/ _ | / _ \ (_____) | |_/ )| |(_ _)( \ / )
( (_| || |_| | | _ ( | | | |_ ) X (
\___ | \___/ |_| \_)|_| \__)(_/ \_)
(_____|
`
)

func startingPrint(id, name string) {
fmt.Printf("%s \n", logo)
fmt.Printf("\x1b[%dmKitx Version: %s\x1b[0m \n", 36, version)
fmt.Printf("\x1b[%dmApp ID: %s\x1b[0m \n", 36, id)
fmt.Printf("\x1b[%dmApp Name: %s\x1b[0m \n", 36, name)
fmt.Printf("\x1b[%dmStarting App ...\x1b[0m \n", 34)
fmt.Println("")
}
50 changes: 48 additions & 2 deletions ASCII_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,53 @@
package go_kitx
package kitx

import "testing"
import (
"context"
"sync"
"testing"
"time"

"github.com/sado0823/go-kitx/transport/grpc"
"github.com/sado0823/go-kitx/transport/http"
)

func Test_New(t *testing.T) {
t.Log(logo)
}

func Test_NewApp(t *testing.T) {

var wg sync.WaitGroup

wg.Add(1)
go func() {
hs := http.NewServer(http.WithServerAddress("0.0.0.0:7001"))
gs := grpc.NewServer(grpc.WithServerAddress("0.0.0.0:7002"))

app := New(
WithName("demo.app"),
WithVersion("v0.0.00001"),
WithMetadata(map[string]string{}),
WithServer(hs, gs),
)

wg.Done()
err := app.Run()
if err != nil {
t.Log(err)
return
}
}()
wg.Wait()
time.Sleep(time.Second)

client, err := grpc.DialInsecure(context.Background(),
grpc.WithClientEndpoint("direct:///0.0.0.0:7002,0.0.0.0:7001"),
)
if err != nil {
t.Fatal(err)
}
t.Log(client.Target())

err = client.Invoke(context.Background(), "/abc", 1, map[string]interface{}{})
t.Log(err)
}
201 changes: 201 additions & 0 deletions app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package kitx

import (
"context"
"errors"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/sado0823/go-kitx/kit/registry"
"github.com/sado0823/go-kitx/transport"

"golang.org/x/sync/errgroup"
)

type (
AppI interface {
ID() string
Name() string
Version() string
Metadata() map[string]string
Endpoint() []string
}

App struct {
ctxWithCancel context.Context
opt *option
ctxCancel context.CancelFunc
lock sync.Mutex
registrySvc *registry.Service
}

Option func(o *option)

appKey struct{}
)

func NewContext(ctx context.Context, s AppI) context.Context {
return context.WithValue(ctx, appKey{}, s)
}

func FromContext(ctx context.Context) (s AppI, ok bool) {
s, ok = ctx.Value(appKey{}).(AppI)
return
}

func New(opts ...Option) *App {
opt := &option{
ctx: context.Background(),
signals: []os.Signal{syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT},
registrarTimeout: time.Second * 5,
stopTimeout: time.Second * 10,
servers: nil,
}
for _, o := range opts {
o(opt)
}

opt.Fix()

ctx, cancelFunc := context.WithCancel(opt.ctx)

return &App{
ctxWithCancel: ctx,
opt: opt,
ctxCancel: cancelFunc,
}
}

func (a *App) ID() string {
return a.opt.id
}

func (a *App) Name() string {
return a.opt.name
}

func (a *App) Version() string {
return a.opt.version
}

func (a *App) Metadata() map[string]string {
return a.opt.metadata
}

func (a *App) Endpoint() []string {
if a.registrySvc != nil {
return a.registrySvc.Endpoints
}
return []string{}
}

func (a *App) Stop() error {
a.lock.Lock()
svc := a.registrySvc
a.lock.Unlock()

if a.opt.registrar != nil && svc != nil {
ctx, cancel := context.WithTimeout(NewContext(a.ctxWithCancel, a), a.opt.registrarTimeout)
defer cancel()
if err := a.opt.registrar.Deregister(ctx, svc); err != nil {
return err
}
}

if a.ctxCancel != nil {
a.ctxCancel()
}

return nil
}

func (a *App) Run() error {
registrySvc, err := a.genRegistrySvc()
if err != nil {
return err
}

a.lock.Lock()
a.registrySvc = registrySvc
a.lock.Unlock()

startingPrint(a.ID(), a.Name())

var (
eg, ctxWithApp = errgroup.WithContext(NewContext(a.ctxWithCancel, a))
wg sync.WaitGroup
)

for _, server := range a.opt.servers {
server := server
// server stop go-routine
eg.Go(func() error {
<-ctxWithApp.Done() // ctx will be canceled when app stop
stopCtx, stopCancel := context.WithTimeout(NewContext(a.opt.ctx, a), a.opt.stopTimeout)
defer stopCancel()
return server.Stop(stopCtx)
})
wg.Add(1)
// server start go-routine
eg.Go(func() error {
wg.Done()
return server.Start(NewContext(a.opt.ctx, a))
})
}
// wait all server started
wg.Wait()

// use registry
if a.opt.registrar != nil {
regisCtx, regisCancel := context.WithTimeout(ctxWithApp, a.opt.registrarTimeout)
defer regisCancel()
return a.opt.registrar.Register(regisCtx, a.registrySvc)
}

// wait signals for stop
sig := make(chan os.Signal, 1)
signal.Notify(sig, a.opt.signals...)
eg.Go(func() error {
select {
case <-ctxWithApp.Done():
return nil
case <-sig:
return a.Stop()
}
})

err = eg.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
return err
}

return nil
}

func (a *App) genRegistrySvc() (*registry.Service, error) {
endpoints := make([]string, 0, len(a.opt.endpoints))
for _, endpoint := range a.opt.endpoints {
endpoints = append(endpoints, endpoint.String())
}
if len(endpoints) == 0 {
for _, server := range a.opt.servers {
if endpoint, ok := server.(transport.Endpointer); ok {
url, err := endpoint.Endpoint()
if err != nil {
return nil, err
}
endpoints = append(endpoints, url.String())
}
}
}
return &registry.Service{
ID: a.opt.id,
Name: a.opt.name,
Version: a.opt.version,
Metadata: a.opt.metadata,
Endpoints: endpoints,
}, nil
}
2 changes: 2 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"

"github.com/sado0823/go-kitx"
"github.com/sado0823/go-kitx/kit/log"
logrusV "github.com/sado0823/go-kitx/plugin/logger/logrus"
zapV "github.com/sado0823/go-kitx/plugin/logger/zap"
Expand All @@ -12,6 +13,7 @@ import (
)

func init() {
kitx.New()
v := logrus.New()
v.Level = logrus.DebugLevel
logger := logrusV.New(v)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/alicebob/miniredis/v2 v2.23.0
github.com/go-redis/redis/v8 v8.11.5
github.com/goccy/go-graphviz v0.0.9
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
Expand Down
8 changes: 4 additions & 4 deletions kit/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ func (l *logger) Log(level Level, kvs ...interface{}) error {
}

// WithFields add new fields to the logger
func WithFields(l Logger, fields ...interface{}) Logger {
func WithFields(l Logger, kvs ...interface{}) Logger {
from, ok := l.(*logger)
if !ok {
return &logger{ctx: context.Background(), internal: l, prefix: fields, hasValuer: containValuer(fields)}
return &logger{ctx: context.Background(), internal: l, prefix: kvs, hasValuer: containValuer(kvs)}
}

fullKvs := make([]interface{}, 0, len(fields)+len(from.prefix))
fullKvs := make([]interface{}, 0, len(kvs)+len(from.prefix))
fullKvs = append(fullKvs, from.prefix...)
fullKvs = append(fullKvs, fields...)
fullKvs = append(fullKvs, kvs...)
return &logger{ctx: from.ctx, internal: from.internal, prefix: fullKvs, hasValuer: containValuer(fullKvs)}
}

Expand Down
2 changes: 1 addition & 1 deletion kit/ratelimit/bbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package ratelimit
import (
"errors"
"fmt"
"github.com/sado0823/go-kitx/kit/log"
"math"
"sync/atomic"
"time"

"github.com/sado0823/go-kitx/kit/log"
"github.com/sado0823/go-kitx/pkg/atomicx"
rollingwindow "github.com/sado0823/go-kitx/pkg/rollingwindow/v2"
"github.com/sado0823/go-kitx/pkg/syncx"
Expand Down
File renamed without changes.
15 changes: 4 additions & 11 deletions kit/rule/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,15 @@ import (
"fmt"
"go/scanner"
"go/token"
"io"
"log"
"os"
"strings"
"time"

"github.com/sado0823/go-kitx/kit/log"
)

var (
logger = log.New(os.Stdout, fmt.Sprintf("[DEBUG][pkg=rule][%s] ", time.Now().Format(time.StampMilli)), log.Lshortfile)
logger = log.NewHelper(log.WithFields(log.GetGlobal(), "pkg", "rule"))
)

func init() {
logger.SetFlags(0)
logger.SetOutput(io.Discard)
}

type (
Parser struct {
ctx context.Context
Expand Down Expand Up @@ -122,7 +115,7 @@ func (p *Parser) read() ([]Token, error) {
)
for {
pos, tok, lit := p.sc.Scan()
logger.Printf("pos_o=%d pos=%s\t token=%#v\t token_str=%q\t lit=%q\n", pos, p.fSet.Position(pos), tok, tok.String(), lit)
logger.Debugf("pos_o=%d pos=%s\t token=%#v\t token_str=%q\t lit=%q\n", pos, p.fSet.Position(pos), tok, tok.String(), lit)
if tok == token.EOF {
if !beforeToken.CanEOF() {
return nil, fmt.Errorf("%s can NOT be last", beforeToken.String())
Expand Down
Loading

0 comments on commit e8a2040

Please sign in to comment.